This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 789122b4d90 [fix][broker] Fix namespace deletion if __change_events 
topic has not been created yet (#18804)
789122b4d90 is described below

commit 789122b4d90601b1be47cfd2ccab2fe3124cf907
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Dec 13 20:07:57 2022 +0100

    [fix][broker] Fix namespace deletion if __change_events topic has not been 
created yet (#18804)
---
 .../pulsar/broker/service/BrokerService.java       |  12 +-
 .../SystemTopicBasedTopicPoliciesService.java      | 121 +++++++++++----------
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |   6 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  42 ++++---
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  11 ++
 5 files changed, 109 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c5f3d508a56..bcec8351733 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3285,16 +3285,8 @@ public class BrokerService implements Closeable {
         if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
-        return pulsarService.getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(topicName.getNamespaceObject())
-                .thenCompose(optPolicies -> {
-                    if (optPolicies.isPresent() && optPolicies.get().deleted) {
-                        // We can return the completed future directly if the 
namespace is already deleted.
-                        return CompletableFuture.completedFuture(null);
-                    }
-                    TopicName cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-                    return 
pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
-                });
+        return pulsar.getTopicPoliciesService()
+                
.deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName()));
     }
 
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName 
topicName, int numPartitions) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index d4c41230daf..9ec374264e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -105,50 +105,53 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             return CompletableFuture.failedFuture(
                     new BrokerServiceException.NotAllowedException("Not 
allowed to send event to health check topic"));
         }
-        CompletableFuture<Void> result = new CompletableFuture<>();
-        try {
-            createSystemTopicFactoryIfNeeded();
-        } catch (PulsarServerException e) {
-            result.completeExceptionally(e);
-            return result;
-        }
+        return pulsarService.getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(namespacePolicies -> {
+                    if (namespacePolicies.isPresent() && 
namespacePolicies.get().deleted) {
+                        log.debug("[{}] skip sending topic policy event since 
the namespace is deleted", topicName);
+                        return CompletableFuture.completedFuture(null);
+                    }
 
-        SystemTopicClient<PulsarEvent> systemTopicClient =
-                
namespaceEventsSystemTopicFactory.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+                    try {
+                        createSystemTopicFactoryIfNeeded();
+                    } catch (PulsarServerException e) {
+                        return CompletableFuture.failedFuture(e);
+                    }
 
-        CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writerFuture 
= systemTopicClient.newWriterAsync();
-        writerFuture.whenComplete((writer, ex) -> {
-            if (ex != null) {
-                result.completeExceptionally(ex);
-            } else {
-                PulsarEvent event = getPulsarEvent(topicName, actionType, 
policies);
-                CompletableFuture<MessageId> actionFuture =
-                        ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(getEventKey(event), event)
-                                : writer.writeAsync(getEventKey(event), event);
-                actionFuture.whenComplete(((messageId, e) -> {
-                            if (e != null) {
-                                result.completeExceptionally(e);
-                            } else {
-                                if (messageId != null) {
-                                    result.complete(null);
-                                } else {
-                                    result.completeExceptionally(new 
RuntimeException("Got message id is null."));
-                                }
-                            }
-                            writer.closeAsync().whenComplete((v, cause) -> {
-                                if (cause != null) {
-                                    log.error("[{}] Close writer error.", 
topicName, cause);
+                    SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+                                    
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
+
+                    return systemTopicClient.newWriterAsync()
+                            .thenCompose(writer -> {
+                            PulsarEvent event = getPulsarEvent(topicName, 
actionType, policies);
+                            CompletableFuture<MessageId> writeFuture =
+                                    ActionType.DELETE.equals(actionType) ? 
writer.deleteAsync(getEventKey(event), event)
+                                            : 
writer.writeAsync(getEventKey(event), event);
+                            return writeFuture.handle((messageId, e) -> {
+                                if (e != null) {
+                                    return CompletableFuture.failedFuture(e);
                                 } else {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("[{}] Close writer 
success.", topicName);
+                                    if (messageId != null) {
+                                        return 
CompletableFuture.completedFuture(null);
+                                    } else {
+                                        return CompletableFuture.failedFuture(
+                                                new RuntimeException("Got 
message id is null."));
                                     }
                                 }
-                            });
-                        })
-                );
-            }
-        });
-        return result;
+                            }).thenRun(() ->
+                                        writer.closeAsync().whenComplete((v, 
cause) -> {
+                                            if (cause != null) {
+                                                log.error("[{}] Close writer 
error.", topicName, cause);
+                                            } else {
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("[{}] Close 
writer success.", topicName);
+                                                }
+                                            }
+                                        })
+                            );
+                    });
+                });
     }
 
     private PulsarEvent getPulsarEvent(TopicName topicName, ActionType 
actionType, TopicPolicies policies) {
@@ -390,25 +393,25 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
 
     private void readMorePolicies(SystemTopicClient.Reader<PulsarEvent> 
reader) {
         reader.readNextAsync()
-              .thenAccept(msg -> {
-                  refreshTopicPoliciesCache(msg);
-                  notifyListener(msg);
-              })
-              .whenComplete((__, ex) -> {
-                  if (ex == null) {
-                      readMorePolicies(reader);
-                  } else {
-                      Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
-                      if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
-                          log.warn("Read more topic policies exception, close 
the read now!", ex);
-                          cleanCacheAndCloseReader(
-                                  
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
-                      } else {
-                          log.warn("Read more topic polices exception, read 
again.", ex);
-                          readMorePolicies(reader);
-                      }
-                  }
-              });
+                .thenAccept(msg -> {
+                    refreshTopicPoliciesCache(msg);
+                    notifyListener(msg);
+                })
+                .whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        readMorePolicies(reader);
+                    } else {
+                        Throwable cause = 
FutureUtil.unwrapCompletionException(ex);
+                        if (cause instanceof 
PulsarClientException.AlreadyClosedException) {
+                            log.warn("Read more topic policies exception, 
close the read now!", ex);
+                            cleanCacheAndCloseReader(
+                                    
reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
+                        } else {
+                            log.warn("Read more topic polices exception, read 
again.", ex);
+                            readMorePolicies(reader);
+                        }
+                    }
+                });
     }
 
     private void refreshTopicPoliciesCache(Message<PulsarEvent> msg) {
@@ -477,7 +480,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         if (message instanceof MessageImpl) {
             return ((MessageImpl<?>) message).hasReplicateTo()
                     ? (((MessageImpl<?>) message).getReplicateTo().size() == 1
-                        ? !((MessageImpl<?>) 
message).getReplicateTo().contains(clusterName) : true)
+                    ? !((MessageImpl<?>) 
message).getReplicateTo().contains(clusterName) : true)
                     : false;
         }
         if (message instanceof TopicMessageImpl) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a34e0c189bb..11c84d990f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1649,6 +1649,11 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 
         // create namespace2
         String namespace = tenant + "/test-ns2";
+        admin.namespaces().createNamespace(namespace, Set.of("test"));
+        admin.topics().createNonPartitionedTopic(namespace + "/tobedeleted");
+        // verify namespace can be deleted even without topic policy events
+        admin.namespaces().deleteNamespace(namespace, true);
+
         admin.namespaces().createNamespace(namespace, Set.of("test"));
         // create topic
         String topic = namespace + "/test-topic2";
@@ -1873,7 +1878,6 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testForceDeleteNamespace() throws Exception {
-        conf.setForceDeleteNamespaceAllowed(true);
         final String namespaceName = "prop-xyz2/ns1";
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", 
"role2"), Set.of("test"));
         admin.tenants().createTenant("prop-xyz2", tenantInfo);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index b303386c65f..fec0ce2da6e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -167,6 +167,10 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     @BeforeClass
     @Override
     public void setup() throws Exception {
+        setupConfigAndStart(null);
+    }
+
+    private void applyDefaultConfig() {
         conf.setSystemTopicEnabled(false);
         conf.setTopicLevelPoliciesEnabled(false);
         conf.setLoadBalancerEnabled(true);
@@ -178,6 +182,13 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
         conf.setBrokerDeleteInactiveTopicsEnabled(false);
         conf.setNumExecutorThreadPoolSize(5);
+    }
+
+    private void 
setupConfigAndStart(java.util.function.Consumer<ServiceConfiguration> 
configurationConsumer) throws Exception {
+        applyDefaultConfig();
+        if (configurationConsumer != null) {
+            configurationConsumer.accept(conf);
+        }
 
         super.internalSetup();
 
@@ -215,6 +226,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
 
         resetConfig();
+        applyDefaultConfig();
         setupClusters();
     }
 
@@ -1718,9 +1730,9 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void 
testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws 
Exception {
         cleanup();
-        setup();
+        setupConfigAndStart(conf -> conf
+                
.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE));
 
-        
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
         // Force to create a topic
         final String namespace = "prop-xyz/ns1";
         List<String> topicNames = Lists.newArrayList(
@@ -1756,7 +1768,6 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         for (Producer<byte[]> producer : producers) {
             producer.close();
         }
-        
conf.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
     }
 
     @Test
@@ -1911,9 +1922,6 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(dataProvider = "numBundles")
     public void testNamespaceBundleUnload(Integer numBundles) throws Exception 
{
-        cleanup();
-        setup();
-
         admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
         
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", 
Set.of("test"));
 
@@ -3259,7 +3267,8 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test
     public void testGetTtlDurationDefaultInSeconds() throws Exception {
-        conf.setTtlDurationDefaultInSeconds(3600);
+        cleanup();
+        setupConfigAndStart(conf -> conf.setTtlDurationDefaultInSeconds(3600));
         Integer seconds = 
admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
         assertNull(seconds);
     }
@@ -3309,8 +3318,11 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         final String topic = 
"persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" + 
UUID.randomUUID().toString();
         final String subName = "my-sub";
         final int numPartitions = 2;
-        conf.setSubscriptionRedeliveryTrackerEnabled(true);
-        conf.setDelayedDeliveryEnabled(true);
+        cleanup();
+        setupConfigAndStart(conf -> {
+            conf.setSubscriptionRedeliveryTrackerEnabled(true);
+            conf.setDelayedDeliveryEnabled(true);
+        });
         admin.topics().createPartitionedTopic(topic, numPartitions);
 
         for (int i = 0; i < 2; i++) {
@@ -3367,7 +3379,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testPartitionedTopicTruncate() throws Exception {
-        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic2-" + UUID.randomUUID().toString();
         final String subName = "my-sub";
         admin.topics().createPartitionedTopic(topicName,6);
         admin.namespaces().setRetention("prop-xyz/ns1", new 
RetentionPolicies(60, 50));
@@ -3387,9 +3399,13 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testNonPartitionedTopicTruncate() throws Exception {
-        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString();
+        final String topicName = 
"persistent://prop-xyz/ns1/testTruncateTopic1-" + UUID.randomUUID().toString();
         final String subName = "my-sub";
-        this.conf.setTopicLevelPoliciesEnabled(true);
+        cleanup();
+        setupConfigAndStart(conf -> {
+            conf.setTopicLevelPoliciesEnabled(true);
+            conf.setSystemTopicEnabled(true);
+        });
         admin.topics().createNonPartitionedTopic(topicName);
         admin.namespaces().setRetention("prop-xyz/ns1", new 
RetentionPolicies(60, 50));
         List<MessageId> messageIds = 
publishMessagesOnPersistentTopic(topicName, 10);
@@ -3405,7 +3421,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
     @Test(timeOut = 20000)
     public void testNonPersistentTopicTruncate() throws Exception {
-        final String topicName = 
"non-persistent://prop-xyz/ns1/testTruncateTopic-" + 
UUID.randomUUID().toString();
+        final String topicName = 
"non-persistent://prop-xyz/ns1/testTruncateTopic3-" + 
UUID.randomUUID().toString();
         admin.topics().createNonPartitionedTopic(topicName);
         assertThrows(() -> {admin.topics().truncate(topicName);});
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 7b66b6a6b51..f9fc717a817 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -373,4 +373,15 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
             }
         });
     }
+
+    @Test
+    public void testHandleNamespaceBeingDeleted() throws Exception {
+        SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+        
pulsar.getPulsarResources().getNamespaceResources().setPolicies(NamespaceName.get(NAMESPACE1),
+                old -> {
+                    old.deleted = true;
+                    return old;
+        });
+        service.deleteTopicPoliciesAsync(TOPIC1).get();
+    }
 }

Reply via email to