This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5b993ba1f9e135e57724141d54ba3c10f2d6990c Author: feynmanlin <[email protected]> AuthorDate: Mon Sep 20 20:11:20 2021 +0800 Fix messages in TopicPolicies will never be cleaned up (#11928) (cherry picked from commit 93e2db0a07b632032a36130e81d32b72136ef331) --- .../SystemTopicBasedTopicPoliciesService.java | 50 ++++++++++---- .../systopic/TopicPoliciesSystemTopicClient.java | 19 ++++++ .../pulsar/broker/admin/TopicPoliciesTest.java | 79 ++++++++++++++++++++++ 3 files changed, 135 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 49f934a..0e8bdd0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; @@ -95,19 +96,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic if (ex != null) { result.completeExceptionally(ex); } else { - writer.writeAsync( - PulsarEvent.builder() - .actionType(actionType) - .eventType(EventType.TOPIC_POLICY) - .topicPoliciesEvent( - TopicPoliciesEvent.builder() - .domain(topicName.getDomain().toString()) - .tenant(topicName.getTenant()) - .namespace(topicName.getNamespaceObject().getLocalName()) - .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()) - .policies(policies) - .build()) - .build()).whenComplete(((messageId, e) -> { + PulsarEvent event = getPulsarEvent(topicName, actionType, policies); + CompletableFuture<MessageId> actionFuture = + ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event); + actionFuture.whenComplete(((messageId, e) -> { if (e != null) { result.completeExceptionally(e); } else { @@ -133,6 +125,21 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return result; } + private PulsarEvent getPulsarEvent(TopicName topicName, ActionType actionType, TopicPolicies policies) { + return PulsarEvent.builder() + .actionType(actionType) + .eventType(EventType.TOPIC_POLICY) + .topicPoliciesEvent( + TopicPoliciesEvent.builder() + .domain(topicName.getDomain().toString()) + .tenant(topicName.getTenant()) + .namespace(topicName.getNamespaceObject().getLocalName()) + .topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName()) + .policies(policies) + .build()) + .build(); + } + private void notifyListener(Message<PulsarEvent> msg) { if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { return; @@ -314,6 +321,11 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) { + // delete policies + if (msg.getValue() == null) { + policiesCache.remove(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName())); + return; + } if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) { TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent(); TopicName topicName = @@ -329,7 +341,19 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic policiesCache.put(topicName, event.getPolicies()); break; case DELETE: + // Since PR #11928, this branch is no longer needed. + // However, due to compatibility, it is temporarily retained here + // and can be deleted in the future. policiesCache.remove(topicName); + SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory + .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject()); + systemTopicClient.newWriterAsync().thenAccept(writer + -> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null)) + .whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> { + if (ex != null) { + log.error("close writer failed ", ex); + } + }))); break; case NONE: break; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java index 58462ea..847e4d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java @@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.TopicName; import org.slf4j.Logger; @@ -88,6 +89,18 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar return producer.newMessage().key(getEventKey(event)).value(event).sendAsync(); } + @Override + public MessageId delete(PulsarEvent event) throws PulsarClientException { + validateActionType(event); + return producer.newMessage().key(getEventKey(event)).value(null).send(); + } + + @Override + public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) { + validateActionType(event); + return producer.newMessage().key(getEventKey(event)).value(null).sendAsync(); + } + private String getEventKey(PulsarEvent event) { return TopicName.get(event.getTopicPoliciesEvent().getDomain(), event.getTopicPoliciesEvent().getTenant(), @@ -115,6 +128,12 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar } } + private static void validateActionType(PulsarEvent event) { + if (event == null || !ActionType.DELETE.equals(event.getActionType())) { + throw new UnsupportedOperationException("The only supported ActionType is DELETE"); + } + } + private static class TopicPolicyReader implements Reader<PulsarEvent> { private final org.apache.pulsar.client.api.Reader<PulsarEvent> reader; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 9e2b9cf..b570bff 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -59,6 +60,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; @@ -2473,6 +2475,83 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { } @Test + public void testPoliciesCanBeDeletedWithTopic() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + final String topic2 = testTopic + UUID.randomUUID(); + pulsarClient.newProducer().topic(topic).create().close(); + pulsarClient.newProducer().topic(topic2).create().close(); + + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull(); + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNull(); + }); + // Init Topic Policies. Send 4 messages in a row, there should be only 2 messages left after compression + admin.topics().setMaxConsumersPerSubscription(topic, 1); + admin.topics().setMaxConsumersPerSubscription(topic2, 2); + admin.topics().setMaxConsumersPerSubscription(topic, 3); + admin.topics().setMaxConsumersPerSubscription(topic2, 4); + Awaitility.await().untilAsserted(() -> { + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull(); + Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull(); + }); + String topicPoliciesTopic = "persistent://" + myNamespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME; + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get(); + // Trigger compaction and make sure it is finished. + persistentTopic.triggerCompaction(); + Field field = PersistentTopic.class.getDeclaredField("currentCompaction"); + field.setAccessible(true); + CompletableFuture<Long> future = (CompletableFuture<Long>)field.get(persistentTopic); + Awaitility.await().untilAsserted(() -> assertTrue(future.isDone())); + + Consumer consumer = pulsarClient.newConsumer() + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .readCompacted(true) + .topic(topicPoliciesTopic).subscriptionName("sub").subscribe(); + int count = 0; + while (true) { + Message message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + count++; + consumer.acknowledge(message); + } else { + break; + } + } + consumer.close(); + assertEquals(count, 2); + + // Delete topic, there should be only 1 message left after compression + admin.topics().delete(topic, true); + + Awaitility.await().untilAsserted(() -> + assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic)))); + persistentTopic.triggerCompaction(); + field = PersistentTopic.class.getDeclaredField("currentCompaction"); + field.setAccessible(true); + CompletableFuture<Long> future2 = (CompletableFuture<Long>)field.get(persistentTopic); + Awaitility.await().untilAsserted(() -> assertTrue(future2.isDone())); + + consumer = pulsarClient.newConsumer() + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .readCompacted(true) + .topic(topicPoliciesTopic).subscriptionName("sub").subscribe(); + count = 0; + while (true) { + Message message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + count++; + consumer.acknowledge(message); + } else { + break; + } + } + consumer.close(); + assertEquals(count, 1); + + } + + @Test public void testPolicyIsDeleteTogetherAutomatically() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close();
