This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b993ba1f9e135e57724141d54ba3c10f2d6990c
Author: feynmanlin <[email protected]>
AuthorDate: Mon Sep 20 20:11:20 2021 +0800

    Fix messages in TopicPolicies will never be cleaned up (#11928)
    
    
    (cherry picked from commit 93e2db0a07b632032a36130e81d32b72136ef331)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 50 ++++++++++----
 .../systopic/TopicPoliciesSystemTopicClient.java   | 19 ++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 79 ++++++++++++++++++++++
 3 files changed, 135 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 49f934a..0e8bdd0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -33,6 +33,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCach
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
@@ -95,19 +96,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             if (ex != null) {
                 result.completeExceptionally(ex);
             } else {
-                writer.writeAsync(
-                        PulsarEvent.builder()
-                                .actionType(actionType)
-                        .eventType(EventType.TOPIC_POLICY)
-                        .topicPoliciesEvent(
-                            TopicPoliciesEvent.builder()
-                                .domain(topicName.getDomain().toString())
-                                .tenant(topicName.getTenant())
-                                
.namespace(topicName.getNamespaceObject().getLocalName())
-                                
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
-                                .policies(policies)
-                                .build())
-                        .build()).whenComplete(((messageId, e) -> {
+                PulsarEvent event = getPulsarEvent(topicName, actionType, 
policies);
+                CompletableFuture<MessageId> actionFuture =
+                        ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(event) : writer.writeAsync(event);
+                actionFuture.whenComplete(((messageId, e) -> {
                             if (e != null) {
                                 result.completeExceptionally(e);
                             } else {
@@ -133,6 +125,21 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         return result;
     }
 
+    private PulsarEvent getPulsarEvent(TopicName topicName, ActionType 
actionType, TopicPolicies policies) {
+        return PulsarEvent.builder()
+                .actionType(actionType)
+                .eventType(EventType.TOPIC_POLICY)
+                .topicPoliciesEvent(
+                        TopicPoliciesEvent.builder()
+                                .domain(topicName.getDomain().toString())
+                                .tenant(topicName.getTenant())
+                                
.namespace(topicName.getNamespaceObject().getLocalName())
+                                
.topic(TopicName.get(topicName.getPartitionedTopicName()).getLocalName())
+                                .policies(policies)
+                                .build())
+                .build();
+    }
+
     private void notifyListener(Message<PulsarEvent> msg) {
         if (!EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
             return;
@@ -314,6 +321,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     }
 
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
+        // delete policies
+        if (msg.getValue() == null) {
+            
policiesCache.remove(TopicName.get(TopicName.get(msg.getKey()).getPartitionedTopicName()));
+            return;
+        }
         if (EventType.TOPIC_POLICY.equals(msg.getValue().getEventType())) {
             TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
             TopicName topicName =
@@ -329,7 +341,19 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                     policiesCache.put(topicName, event.getPolicies());
                     break;
                 case DELETE:
+                    // Since PR #11928, this branch is no longer needed.
+                    // However, due to compatibility, it is temporarily 
retained here
+                    // and can be deleted in the future.
                     policiesCache.remove(topicName);
+                    SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+                            
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+                    systemTopicClient.newWriterAsync().thenAccept(writer
+                            -> writer.deleteAsync(getPulsarEvent(topicName, 
ActionType.DELETE, null))
+                            .whenComplete((result, e) -> 
writer.closeAsync().whenComplete((res, ex) -> {
+                                if (ex != null) {
+                                    log.error("close writer failed ", ex);
+                                }
+                            })));
                     break;
                 case NONE:
                     break;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 58462ea..847e4d2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
@@ -88,6 +89,18 @@ public class TopicPoliciesSystemTopicClient extends 
SystemTopicClientBase<Pulsar
             return 
producer.newMessage().key(getEventKey(event)).value(event).sendAsync();
         }
 
+        @Override
+        public MessageId delete(PulsarEvent event) throws 
PulsarClientException {
+            validateActionType(event);
+            return 
producer.newMessage().key(getEventKey(event)).value(null).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+            validateActionType(event);
+            return 
producer.newMessage().key(getEventKey(event)).value(null).sendAsync();
+        }
+
         private String getEventKey(PulsarEvent event) {
             return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
                 event.getTopicPoliciesEvent().getTenant(),
@@ -115,6 +128,12 @@ public class TopicPoliciesSystemTopicClient extends 
SystemTopicClientBase<Pulsar
         }
     }
 
+    private static void validateActionType(PulsarEvent event) {
+        if (event == null || !ActionType.DELETE.equals(event.getActionType())) 
{
+            throw new UnsupportedOperationException("The only supported 
ActionType is DELETE");
+        }
+    }
+
     private static class TopicPolicyReader implements Reader<PulsarEvent> {
 
         private final org.apache.pulsar.client.api.Reader<PulsarEvent> reader;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 9e2b9cf..b570bff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -59,6 +60,7 @@ import 
org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -2473,6 +2475,83 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testPoliciesCanBeDeletedWithTopic() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        final String topic2 = testTopic + UUID.randomUUID();
+        pulsarClient.newProducer().topic(topic).create().close();
+        pulsarClient.newProducer().topic(topic2).create().close();
+
+        Awaitility.await().untilAsserted(() -> {
+            
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNull();
+            
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNull();
+        });
+        // Init Topic Policies. Send 4 messages in a row, there should be only 
2 messages left after compression
+        admin.topics().setMaxConsumersPerSubscription(topic, 1);
+        admin.topics().setMaxConsumersPerSubscription(topic2, 2);
+        admin.topics().setMaxConsumersPerSubscription(topic, 3);
+        admin.topics().setMaxConsumersPerSubscription(topic2, 4);
+        Awaitility.await().untilAsserted(() -> {
+            
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))).isNotNull();
+            
Assertions.assertThat(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic2))).isNotNull();
+        });
+        String topicPoliciesTopic = "persistent://" + myNamespace + "/" + 
EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopicIfExists(topicPoliciesTopic).get().get();
+        // Trigger compaction and make sure it is finished.
+        persistentTopic.triggerCompaction();
+        Field field = 
PersistentTopic.class.getDeclaredField("currentCompaction");
+        field.setAccessible(true);
+        CompletableFuture<Long> future = 
(CompletableFuture<Long>)field.get(persistentTopic);
+        Awaitility.await().untilAsserted(() -> assertTrue(future.isDone()));
+
+        Consumer consumer = pulsarClient.newConsumer()
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .readCompacted(true)
+                .topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
+        int count = 0;
+        while (true) {
+            Message message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                count++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        consumer.close();
+        assertEquals(count, 2);
+
+        // Delete topic, there should be only 1 message left after compression
+        admin.topics().delete(topic, true);
+
+        Awaitility.await().untilAsserted(() ->
+                
assertNull(pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(topic))));
+        persistentTopic.triggerCompaction();
+        field = PersistentTopic.class.getDeclaredField("currentCompaction");
+        field.setAccessible(true);
+        CompletableFuture<Long> future2 = 
(CompletableFuture<Long>)field.get(persistentTopic);
+        Awaitility.await().untilAsserted(() -> assertTrue(future2.isDone()));
+
+        consumer = pulsarClient.newConsumer()
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .readCompacted(true)
+                .topic(topicPoliciesTopic).subscriptionName("sub").subscribe();
+        count = 0;
+        while (true) {
+            Message message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                count++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        consumer.close();
+        assertEquals(count, 1);
+
+    }
+
+    @Test
     public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         pulsarClient.newProducer().topic(topic).create().close();

Reply via email to