This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 721275a5011 [fix][broker] Fix namespace deletion if __change_events 
topic has not been created yet (#18804)
721275a5011 is described below

commit 721275a5011d295e1b1f7e8ae811f393d81aaaf9
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Dec 13 20:07:57 2022 +0100

    [fix][broker] Fix namespace deletion if __change_events topic has not been 
created yet (#18804)
---
 .../pulsar/broker/service/BrokerService.java       |  12 +-
 .../SystemTopicBasedTopicPoliciesService.java      | 134 ++++++++++++---------
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |   8 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  43 +++++--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  12 ++
 5 files changed, 129 insertions(+), 80 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7dcb9a6968f..588ea391c5e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3217,16 +3217,8 @@ public class BrokerService implements Closeable {
         if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
-        return pulsarService.getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(topicName.getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                    if (optPolicies.isPresent() && optPolicies.get().deleted) {
-                        // We can return the completed future directly if the 
namespace is already deleted.
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    TopicName cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-                    return 
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
-                });
+        return pulsar.getTopicPoliciesService()
+                
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 98c89d0226c..8d25603ccc5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -106,49 +106,53 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             return CompletableFuture.failedFuture(
                     new BrokerServiceException.NotAllowedException("Not 
allowed to send event to health check topic"));
         }
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        try {
-            createSystemTopicFactoryIfNeeded();
-        } catch (PulsarServerException e) {
-            result.completeExceptionally(e);
-            return result;
-        }
+        return pulsarService.getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(namespacePolicies -> {
+                    if (namespacePolicies.isPresent() && 
namespacePolicies.get().deleted) {
+                        log.debug("[{}] skip sending topic policy event since 
the namespace is deleted", topicName);
+                        return CompletableFuture.completedFuture(null);
+                    }
 
-        SystemTopicClient<PulsarEvent> systemTopicClient =
-                
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+                    try {
+                        createSystemTopicFactoryIfNeeded();
+                    } catch (PulsarServerException e) {
+                        return CompletableFuture.failedFuture(e);
+                    }
 
-        CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture 
= systemTopicClient.newWriterAsync();
-        writerFuture.whenComplete((writer, ex) -> {
-            if (ex != null) {
-                result.completeExceptionally(ex);
-            } else {
-                PulsarEvent event = getPulsarEvent(topicName, actionType, 
policies);
-                CompletableFuture<MessageId> actionFuture =
-                        ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(event) : writer.writeAsync(event);
-                actionFuture.whenComplete(((messageId, e) -> {
-                            if (e != null) {
-                                result.completeExceptionally(e);
-                            } else {
-                                if (messageId != null) {
-                                    result.complete(null);
-                                } else {
-                                    result.completeExceptionally(new 
RuntimeException("Got message id is null."));
-                                }
-                            }
-                            writer.closeAsync().whenComplete((v, cause) -> {
-                                if (cause != null) {
-                                    log.error("[{}] Close writer error.", 
topicName, cause);
+                    SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+                                    
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+
+                    return systemTopicClient.newWriterAsync()
+                            .thenCompose(writer -> {
+                            PulsarEvent event = getPulsarEvent(topicName, 
actionType, policies);
+                            CompletableFuture<MessageId> writeFuture =
+                                    ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(event)
+                                            : writer.writeAsync(event);
+                            return writeFuture.handle((messageId, e) -> {
+                                if (e != null) {
+                                    return CompletableFuture.failedFuture(e);
                                 } else {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("[{}] Close writer 
success.", topicName);
+                                    if (messageId != null) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    } else {
+                                        return CompletableFuture.failedFuture(
+                                                new RuntimeException("Got 
message id is null."));
                                     }
                                 }
-                            });
-                        })
-                );
-            }
-        });
-        return result;
+                            }).thenRun(() ->
+                                        writer.closeAsync().whenComplete((v, 
cause) -> {
+                                            if (cause != null) {
+                                                log.error("[{}] Close writer 
error.", topicName, cause);
+                                            } else {
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("[{}] Close 
writer success.", topicName);
+                                                }
+                                            }
+                                        })
+                            );
+                    });
+                });
     }
 
     private PulsarEvent getPulsarEvent(TopicName topicName, ActionType 
actionType, TopicPolicies policies) {
@@ -418,25 +422,25 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         reader.readNextAsync()
-              .thenAccept(msg -> {
-                  refreshTopicPoliciesCache(msg);
-                  notifyListener(msg);
-              })
-              .whenComplete((__, ex) -> {
-                  if (ex == null) {
-                      readMorePolicies(reader);
-                  } else {
-                      Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
-                      if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
-                          log.error("Read more topic policies exception, close 
the read now!", ex);
-                          cleanCacheAndCloseReader(
-                                  
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                      } else {
-                          log.warn("Read more topic polices exception, read 
again.", ex);
-                          readMorePolicies(reader);
-                      }
-                  }
-              });
+                .thenAccept(msg -> {
+                    refreshTopicPoliciesCache(msg);
+                    notifyListener(msg);
+                })
+                .whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        readMorePolicies(reader);
+                    } else {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
+                            log.warn("Read more topic policies exception, 
close the read now!", ex);
+                            cleanCacheAndCloseReader(
+                                    
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+                        } else {
+                            log.warn("Read more topic polices exception, read 
again.", ex);
+                            readMorePolicies(reader);
+                        }
+                    }
+                });
     }
 
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
@@ -504,7 +508,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         if (message instanceof MessageImpl) {
             return ((MessageImpl<?>) message).hasReplicateTo()
                     ? (((MessageImpl<?>) message).getReplicateTo().size() == 1
-                        ? !((MessageImpl<?>) 
message).getReplicateTo().contains(clusterName) : true)
+                    ? !((MessageImpl<?>) 
message).getReplicateTo().contains(clusterName) : true)
                     : false;
         }
         if (message instanceof TopicMessageImpl) {
@@ -570,6 +574,20 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         });
     }
 
+    public static String getEventKey(PulsarEvent event) {
+        return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
+                event.getTopicPoliciesEvent().getTenant(),
+                event.getTopicPoliciesEvent().getNamespace(),
+                event.getTopicPoliciesEvent().getTopic()).toString();
+    }
+
+    public static String getEventKey(TopicName topicName) {
+        return TopicName.get(topicName.getDomain().toString(),
+                topicName.getTenant(),
+                topicName.getNamespace(),
+                
TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
+    }
+
     @VisibleForTesting
     long getPoliciesCacheSize() {
         return policiesCache.size();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 3523c56c0da..45b5134de6b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1483,7 +1483,12 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 
         // create namespace2
         String namespace = tenant + "/test-ns2";
-        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace, Set.of("test"));
+        admin.topics().createNonPartitionedTopic(namespace + "/tobedeleted");
+        // verify namespace can be deleted even without topic policy events
+        admin.namespaces().deleteNamespace(namespace, true);
+
+        admin.namespaces().createNamespace(namespace, Set.of("test"));
         // create topic
         String topic = namespace + "/test-topic2";
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
@@ -1717,7 +1722,6 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testForceDeleteNamespace() throws Exception {
-        conf.setForceDeleteNamespaceAllowed(true);
         final String namespaceName = "prop-xyz2/ns1";
         TenantInfoImpl tenantInfo = new 
TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
         admin.tenants().createTenant("prop-xyz2", tenantInfo);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 4022daca02b..c349e902882 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -171,6 +171,10 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     public void setup() throws Exception {
+        setupConfigAndStart(null);
+    }
+
+    private void applyDefaultConfig() {
         conf.setSystemTopicEnabled(false);
         conf.setTopicLevelPoliciesEnabled(false);
         conf.setLoadBalancerEnabled(true);
@@ -182,6 +186,13 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
         conf.setBrokerDeleteInactiveTopicsEnabled(false);
         conf.setNumExecutorThreadPoolSize(5);
+    }
+
+    private void 
setupConfigAndStart(java.util.function.Consumer<ServiceConfiguration> 
configurationConsumer) throws Exception {
+        applyDefaultConfig();
+        if (configurationConsumer != null) {
+            configurationConsumer.accept(conf);
+        }
 
         super.internalSetup();
 
@@ -1751,7 +1762,10 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void 
testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws 
Exception {
-        
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
+        cleanup();
+        setupConfigAndStart(conf -> conf
+                
.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE));
+
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         List<String> topicNames = Lists.newArrayList(
@@ -1784,8 +1798,9 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         for (int i = 0; i < bundles.getBundles().size(); i++) {
             assertNotEquals(bundles.getBundles().get(i).toString(), 
splitRange[i]);
         }
-        producers.forEach(Producer::closeAsync);
-        
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
+        for (Producer<byte[]> producer : producers) {
+            producer.close();
+        }
     }
 
     @Test
@@ -3265,7 +3280,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testGetTtlDurationDefaultInSeconds() throws Exception {
-        conf.setTtlDurationDefaultInSeconds(3600);
+        cleanup();
+        setupConfigAndStart(conf -> conf.setTtlDurationDefaultInSeconds(3600));
         Integer seconds = 
admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
         assertNull(seconds);
     }
@@ -3308,8 +3324,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         final String topic = 
"persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" + 
UUID.randomUUID().toString();
         final String subName = "my-sub";
         final int numPartitions = 2;
-        conf.setSubscriptionRedeliveryTrackerEnabled(true);
-        conf.setDelayedDeliveryEnabled(true);
+        cleanup();
+        setupConfigAndStart(conf -> {
+            conf.setSubscriptionRedeliveryTrackerEnabled(true);
+            conf.setDelayedDeliveryEnabled(true);
+        });
         admin.topics().createPartitionedTopic(topic, numPartitions);
 
         for (int i = 0; i < 2; i++) {
@@ -3364,7 +3383,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testPartitionedTopicTruncate() throws Exception {
-        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic2-" + UUID.randomUUID().toString();
         final String subName = "my-sub";
         admin.topics().createPartitionedTopic(topicName,6);
         admin.namespaces().setRetention("prop-xyz/ns1", new 
RetentionPolicies(60, 50));
@@ -3384,9 +3403,13 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testNonPartitionedTopicTruncate() throws Exception {
-        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic1-" + UUID.randomUUID().toString();
         final String subName = "my-sub";
-        this.conf.setTopicLevelPoliciesEnabled(true);
+        cleanup();
+        setupConfigAndStart(conf -> {
+            conf.setTopicLevelPoliciesEnabled(true);
+            conf.setSystemTopicEnabled(true);
+        });
         admin.topics().createNonPartitionedTopic(topicName);
         admin.namespaces().setRetention("prop-xyz/ns1", new 
RetentionPolicies(60, 50));
         List<MessageId> messageIds = 
publishMessagesOnPersistentTopic(topicName, 10);
@@ -3402,7 +3425,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testNonPersistentTopicTruncate() throws Exception {
-        final String topicName = 
"non-persistent://prop-xyz/ns1/testTruncateTopic-" + 
UUID.randomUUID().toString();
+        final String topicName = 
"non-persistent://prop-xyz/ns1/testTruncateTopic3-" + 
UUID.randomUUID().toString();
         admin.topics().createNonPartitionedTopic(topicName);
         assertThrows(() -> {admin.topics().truncate(topicName);});
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index f4ad2ebd91c..4f3feb778af 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -379,6 +379,18 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         });
     }
 
+
+    @Test
+    public void testHandleNamespaceBeingDeleted() throws Exception {
+        SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+        
pulsar.getPulsarResources().getNamespaceResources().setPolicies(NamespaceName.get(NAMESPACE1),
+                old -> {
+                    old.deleted = true;
+                    return old;
+                });
+        service.deleteTopicPoliciesAsync(TOPIC1).get();
+    }
+
     @Test
     public void testGetTopicPoliciesWithCleanCache() throws Exception {
         final String topic = "persistent://" + NAMESPACE1 + "/test" + 
UUID.randomUUID();

Reply via email to