This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8ea425e0cf1 [improve] [broker] Do not try to open ML when the topic 
meta does not exist and do not expect to create a new one. #21995 (#22004)
8ea425e0cf1 is described below

commit 8ea425e0cf1602cb9a84b88cc02f8a443e5f24cd
Author: fengyubiao <[email protected]>
AuthorDate: Fri Feb 23 23:13:08 2024 +0800

    [improve] [broker] Do not try to open ML when the topic meta does not exist 
and do not expect to create a new one. #21995 (#22004)
    
    Co-authored-by: Jiwe Guo <[email protected]>
    (cherry picked from commit 1c652f5519e013340e08950fc9705da4e54bf22a)
---
 .../pulsar/broker/service/BrokerService.java       | 80 ++++++++++++----------
 .../pulsar/broker/TopicEventsListenerTest.java     | 33 +++++----
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 28 ++++++++
 .../pulsar/broker/admin/TopicAutoCreationTest.java | 14 ++--
 .../service/persistent/PersistentTopicTest.java    |  3 +-
 5 files changed, 99 insertions(+), 59 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c5775e023bb..8997e1e98ee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1051,43 +1051,49 @@ public class BrokerService implements Closeable {
             }
             final boolean isPersistentTopic = 
topicName.getDomain().equals(TopicDomain.persistent);
             if (isPersistentTopic) {
-                final CompletableFuture<Optional<TopicPolicies>> 
topicPoliciesFuture =
-                        getTopicPoliciesBypassSystemTopic(topicName);
-                return topicPoliciesFuture.exceptionally(ex -> {
-                    final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
-                    final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
-                            + " topic policies service. topic_name=%s 
error_message=%s", topicName, rc.getMessage());
-                    log.error(errorInfo, rc);
-                    throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
-                }).thenCompose(optionalTopicPolicies -> {
-                    final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
-                    return topics.computeIfAbsent(topicName.toString(), 
(tpName) -> {
-                        if (topicName.isPartitioned()) {
-                            final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
-                            return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
-                                    .thenCompose((metadata) -> {
-                                        // Allow crate non-partitioned 
persistent topic that name includes `partition`
-                                        if (metadata.partitions == 0
-                                                || 
topicName.getPartitionIndex() < metadata.partitions) {
-                                            return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
-                                                    properties, topicPolicies);
-                                        }
-                                        final String errorMsg =
-                                                String.format("Illegal topic 
partition name %s with max allowed "
-                                                        + "%d partitions", 
topicName, metadata.partitions);
-                                        log.warn(errorMsg);
-                                        return FutureUtil
-                                                .failedFuture(new 
BrokerServiceException.NotAllowedException(errorMsg));
-                                    });
-                        }
-                        return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
-                    }).thenCompose(optionalTopic -> {
-                        if (!optionalTopic.isPresent() && createIfMissing) {
-                            log.warn("[{}] Try to recreate the topic with 
createIfMissing=true "
-                                    + "but the returned topic is empty", 
topicName);
-                            return getTopic(topicName, createIfMissing, 
properties);
-                        }
-                        return 
CompletableFuture.completedFuture(optionalTopic);
+                return 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
+                        .thenCompose(exists -> {
+                    if (!exists && !createIfMissing) {
+                        return 
CompletableFuture.completedFuture(Optional.empty());
+                    }
+                    return 
getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
+                        final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
+                        final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
+                                + " topic policies service. topic_name=%s 
error_message=%s", topicName,
+                                rc.getMessage());
+                        log.error(errorInfo, rc);
+                        throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
+                    }).thenCompose(optionalTopicPolicies -> {
+                        final TopicPolicies topicPolicies = 
optionalTopicPolicies.orElse(null);
+                        return topics.computeIfAbsent(topicName.toString(), 
(tpName) -> {
+                            if (topicName.isPartitioned()) {
+                                final TopicName topicNameEntity = 
TopicName.get(topicName.getPartitionedTopicName());
+                                return 
fetchPartitionedTopicMetadataAsync(topicNameEntity)
+                                        .thenCompose((metadata) -> {
+                                            // Allow crate non-partitioned 
persistent topic that name includes
+                                            // `partition`
+                                            if (metadata.partitions == 0
+                                                    || 
topicName.getPartitionIndex() < metadata.partitions) {
+                                                return 
loadOrCreatePersistentTopic(tpName, createIfMissing,
+                                                        properties, 
topicPolicies);
+                                            }
+                                            final String errorMsg =
+                                                    String.format("Illegal 
topic partition name %s with max allowed "
+                                                            + "%d partitions", 
topicName, metadata.partitions);
+                                            log.warn(errorMsg);
+                                            return FutureUtil.failedFuture(
+                                                    new 
BrokerServiceException.NotAllowedException(errorMsg));
+                                        });
+                            }
+                            return loadOrCreatePersistentTopic(tpName, 
createIfMissing, properties, topicPolicies);
+                        }).thenCompose(optionalTopic -> {
+                            if (!optionalTopic.isPresent() && createIfMissing) 
{
+                                log.warn("[{}] Try to recreate the topic with 
createIfMissing=true "
+                                        + "but the returned topic is empty", 
topicName);
+                                return getTopic(topicName, createIfMissing, 
properties);
+                            }
+                            return 
CompletableFuture.completedFuture(optionalTopic);
+                        });
                     });
                 });
             } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index e6459bbf74c..ceb3c1d0d93 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -126,7 +126,7 @@ public class TopicEventsListenerTest extends BrokerTestBase 
{
                            boolean forceDelete) throws Exception {
         String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
 
-        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+        createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, 
topicName);
 
         events.clear();
         if (topicTypePartitioned.equals("partitioned")) {
@@ -150,7 +150,7 @@ public class TopicEventsListenerTest extends BrokerTestBase 
{
                                      boolean forceDelete) throws Exception {
         String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
 
-        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+        createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, 
topicName);
 
         events.clear();
         admin.topics().unload(topicName);
@@ -182,7 +182,7 @@ public class TopicEventsListenerTest extends BrokerTestBase 
{
                                     boolean forceDelete) throws Exception {
         String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
 
-        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+        createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, 
topicName);
 
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
@@ -238,7 +238,7 @@ public class TopicEventsListenerTest extends BrokerTestBase 
{
     public void testTopicAutoGC(String topicTypePersistence, String 
topicTypePartitioned) throws Exception {
         String topicName = topicTypePersistence + "://" + namespace + "/" + 
"topic-" + UUID.randomUUID();
 
-        createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+        createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned, 
topicName);
 
         admin.namespaces().setInactiveTopicPolicies(namespace,
                 new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true));
@@ -262,25 +262,21 @@ public class TopicEventsListenerTest extends 
BrokerTestBase {
         );
     }
 
-    private void createTopicAndVerifyEvents(String topicTypePartitioned, 
String topicName) throws Exception {
+    private void createTopicAndVerifyEvents(String topicDomain, String 
topicTypePartitioned, String topicName) throws Exception {
         final String[] expectedEvents;
-        if (topicTypePartitioned.equals("partitioned")) {
-            topicNameToWatch = topicName + "-partition-1";
-            admin.topics().createPartitionedTopic(topicName, 2);
-            triggerPartitionsCreation(topicName);
-
+        if (topicDomain.equalsIgnoreCase("persistent") || 
topicTypePartitioned.equals("partitioned")) {
             expectedEvents = new String[]{
                     "LOAD__BEFORE",
                     "CREATE__BEFORE",
                     "CREATE__SUCCESS",
                     "LOAD__SUCCESS"
             };
-
         } else {
-            topicNameToWatch = topicName;
-            admin.topics().createNonPartitionedTopic(topicName);
-
             expectedEvents = new String[]{
+                    // Before https://github.com/apache/pulsar/pull/21995, 
Pulsar will skip create topic if the topic
+                    //   was already exists, and the action "check topic 
exists" will try to load Managed ledger,
+                    //   the check triggers two exrtra events: [LOAD__BEFORE, 
LOAD__FAILURE].
+                    //   #21995 fixed this wrong behavior, so remove these two 
events.
                     "LOAD__BEFORE",
                     "LOAD__FAILURE",
                     "LOAD__BEFORE",
@@ -288,7 +284,14 @@ public class TopicEventsListenerTest extends 
BrokerTestBase {
                     "CREATE__SUCCESS",
                     "LOAD__SUCCESS"
             };
-
+        }
+        if (topicTypePartitioned.equals("partitioned")) {
+            topicNameToWatch = topicName + "-partition-1";
+            admin.topics().createPartitionedTopic(topicName, 2);
+            triggerPartitionsCreation(topicName);
+        } else {
+            topicNameToWatch = topicName;
+            admin.topics().createNonPartitionedTopic(topicName);
         }
 
         Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 11564cdf721..fe3c8b591ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3417,4 +3417,32 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         producer.close();
         admin.topics().delete(topic);
     }
+
+    @Test
+    public void testGetStatsIfPartitionNotExists() throws Exception {
+        // create topic.
+        final String partitionedTp = 
BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
+        admin.topics().createPartitionedTopic(partitionedTp, 1);
+        TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
+        boolean topicExists1 = 
pulsar.getBrokerService().getTopic(partition0.toString(), 
false).join().isPresent();
+        assertTrue(topicExists1);
+        // Verify topics-stats works.
+        TopicStats topicStats = admin.topics().getStats(partition0.toString());
+        assertNotNull(topicStats);
+
+        // Delete partition and call topic-stats again.
+        admin.topics().delete(partition0.toString());
+        boolean topicExists2 = 
pulsar.getBrokerService().getTopic(partition0.toString(), 
false).join().isPresent();
+        assertFalse(topicExists2);
+        // Verify: respond 404.
+        try {
+            admin.topics().getStats(partition0.toString());
+            fail("Should respond 404 after the partition was deleted");
+        } catch (Exception ex) {
+            assertTrue(ex.getMessage().contains("Topic partitions were not yet 
created"));
+        }
+
+        // cleanup.
+        admin.topics().deletePartitionedTopic(partitionedTp);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c9138beee52..a75ae78cef3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -149,10 +149,11 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
                     .sendTimeout(1, TimeUnit.SECONDS)
                     .topic(topic)
                     .create()) {
-            } catch (PulsarClientException.LookupException expected) {
-                String msg = "Namespace bundle for topic (%s) not served by 
this instance";
+            } catch (PulsarClientException.TopicDoesNotExistException 
expected) {
+                // Since the "policies.deleted" is "true", the value of 
"isAllowAutoTopicCreationAsync" will be false,
+                // so the "TopicDoesNotExistException" is expected.
                 log.info("Expected error", expected);
-                assertTrue(expected.getMessage().contains(String.format(msg, 
topic))
+                assertTrue(expected.getMessage().contains(topic)
                         || 
expected.getMessage().contains(topicPoliciesServiceInitException));
             }
 
@@ -160,10 +161,11 @@ public class TopicAutoCreationTest extends 
ProducerConsumerBase {
                     .topic(topic)
                     .subscriptionName("test")
                     .subscribe()) {
-            } catch (PulsarClientException.LookupException expected) {
-                String msg = "Namespace bundle for topic (%s) not served by 
this instance";
+            } catch (PulsarClientException.TopicDoesNotExistException 
expected) {
+                // Since the "policies.deleted" is "true", the value of 
"isAllowAutoTopicCreationAsync" will be false,
+                // so the "TopicDoesNotExistException" is expected.
                 log.info("Expected error", expected);
-                assertTrue(expected.getMessage().contains(String.format(msg, 
topic))
+                assertTrue(expected.getMessage().contains(topic)
                         || 
expected.getMessage().contains(topicPoliciesServiceInitException));
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 4f3e1e930bd..4b4aa5b45d3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -295,7 +295,8 @@ public class PersistentTopicTest extends BrokerTestBase {
 
         
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
         pulsar.getBrokerService().getTopicIfExists(topicName).get();
-        
assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName));
+        // The map topics should only contain partitions, does not contain 
partitioned topic.
+        
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
 
         // ref of partitioned-topic name should be empty
         
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

Reply via email to