This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8ea425e0cf1 [improve] [broker] Do not try to open ML when the topic
meta does not exist and do not expect to create a new one. #21995 (#22004)
8ea425e0cf1 is described below
commit 8ea425e0cf1602cb9a84b88cc02f8a443e5f24cd
Author: fengyubiao <[email protected]>
AuthorDate: Fri Feb 23 23:13:08 2024 +0800
[improve] [broker] Do not try to open ML when the topic meta does not exist
and do not expect to create a new one. #21995 (#22004)
Co-authored-by: Jiwe Guo <[email protected]>
(cherry picked from commit 1c652f5519e013340e08950fc9705da4e54bf22a)
---
.../pulsar/broker/service/BrokerService.java | 80 ++++++++++++----------
.../pulsar/broker/TopicEventsListenerTest.java | 33 +++++----
.../apache/pulsar/broker/admin/AdminApi2Test.java | 28 ++++++++
.../pulsar/broker/admin/TopicAutoCreationTest.java | 14 ++--
.../service/persistent/PersistentTopicTest.java | 3 +-
5 files changed, 99 insertions(+), 59 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c5775e023bb..8997e1e98ee 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1051,43 +1051,49 @@ public class BrokerService implements Closeable {
}
final boolean isPersistentTopic =
topicName.getDomain().equals(TopicDomain.persistent);
if (isPersistentTopic) {
- final CompletableFuture<Optional<TopicPolicies>>
topicPoliciesFuture =
- getTopicPoliciesBypassSystemTopic(topicName);
- return topicPoliciesFuture.exceptionally(ex -> {
- final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
- final String errorInfo = String.format("Topic creation
encountered an exception by initialize"
- + " topic policies service. topic_name=%s
error_message=%s", topicName, rc.getMessage());
- log.error(errorInfo, rc);
- throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
- }).thenCompose(optionalTopicPolicies -> {
- final TopicPolicies topicPolicies =
optionalTopicPolicies.orElse(null);
- return topics.computeIfAbsent(topicName.toString(),
(tpName) -> {
- if (topicName.isPartitioned()) {
- final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
- return
fetchPartitionedTopicMetadataAsync(topicNameEntity)
- .thenCompose((metadata) -> {
- // Allow crate non-partitioned
persistent topic that name includes `partition`
- if (metadata.partitions == 0
- ||
topicName.getPartitionIndex() < metadata.partitions) {
- return
loadOrCreatePersistentTopic(tpName, createIfMissing,
- properties, topicPolicies);
- }
- final String errorMsg =
- String.format("Illegal topic
partition name %s with max allowed "
- + "%d partitions",
topicName, metadata.partitions);
- log.warn(errorMsg);
- return FutureUtil
- .failedFuture(new
BrokerServiceException.NotAllowedException(errorMsg));
- });
- }
- return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
- }).thenCompose(optionalTopic -> {
- if (!optionalTopic.isPresent() && createIfMissing) {
- log.warn("[{}] Try to recreate the topic with
createIfMissing=true "
- + "but the returned topic is empty",
topicName);
- return getTopic(topicName, createIfMissing,
properties);
- }
- return
CompletableFuture.completedFuture(optionalTopic);
+ return
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
+ .thenCompose(exists -> {
+ if (!exists && !createIfMissing) {
+ return
CompletableFuture.completedFuture(Optional.empty());
+ }
+ return
getTopicPoliciesBypassSystemTopic(topicName).exceptionally(ex -> {
+ final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
+ final String errorInfo = String.format("Topic creation
encountered an exception by initialize"
+ + " topic policies service. topic_name=%s
error_message=%s", topicName,
+ rc.getMessage());
+ log.error(errorInfo, rc);
+ throw FutureUtil.wrapToCompletionException(new
ServiceUnitNotReadyException(errorInfo));
+ }).thenCompose(optionalTopicPolicies -> {
+ final TopicPolicies topicPolicies =
optionalTopicPolicies.orElse(null);
+ return topics.computeIfAbsent(topicName.toString(),
(tpName) -> {
+ if (topicName.isPartitioned()) {
+ final TopicName topicNameEntity =
TopicName.get(topicName.getPartitionedTopicName());
+ return
fetchPartitionedTopicMetadataAsync(topicNameEntity)
+ .thenCompose((metadata) -> {
+ // Allow crate non-partitioned
persistent topic that name includes
+ // `partition`
+ if (metadata.partitions == 0
+ ||
topicName.getPartitionIndex() < metadata.partitions) {
+ return
loadOrCreatePersistentTopic(tpName, createIfMissing,
+ properties,
topicPolicies);
+ }
+ final String errorMsg =
+ String.format("Illegal
topic partition name %s with max allowed "
+ + "%d partitions",
topicName, metadata.partitions);
+ log.warn(errorMsg);
+ return FutureUtil.failedFuture(
+ new
BrokerServiceException.NotAllowedException(errorMsg));
+ });
+ }
+ return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
+ }).thenCompose(optionalTopic -> {
+ if (!optionalTopic.isPresent() && createIfMissing)
{
+ log.warn("[{}] Try to recreate the topic with
createIfMissing=true "
+ + "but the returned topic is empty",
topicName);
+ return getTopic(topicName, createIfMissing,
properties);
+ }
+ return
CompletableFuture.completedFuture(optionalTopic);
+ });
});
});
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
index e6459bbf74c..ceb3c1d0d93 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/TopicEventsListenerTest.java
@@ -126,7 +126,7 @@ public class TopicEventsListenerTest extends BrokerTestBase
{
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
- createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+ createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned,
topicName);
events.clear();
if (topicTypePartitioned.equals("partitioned")) {
@@ -150,7 +150,7 @@ public class TopicEventsListenerTest extends BrokerTestBase
{
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
- createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+ createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned,
topicName);
events.clear();
admin.topics().unload(topicName);
@@ -182,7 +182,7 @@ public class TopicEventsListenerTest extends BrokerTestBase
{
boolean forceDelete) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
- createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+ createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned,
topicName);
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
@@ -238,7 +238,7 @@ public class TopicEventsListenerTest extends BrokerTestBase
{
public void testTopicAutoGC(String topicTypePersistence, String
topicTypePartitioned) throws Exception {
String topicName = topicTypePersistence + "://" + namespace + "/" +
"topic-" + UUID.randomUUID();
- createTopicAndVerifyEvents(topicTypePartitioned, topicName);
+ createTopicAndVerifyEvents(topicTypePersistence, topicTypePartitioned,
topicName);
admin.namespaces().setInactiveTopicPolicies(namespace,
new
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1,
true));
@@ -262,25 +262,21 @@ public class TopicEventsListenerTest extends
BrokerTestBase {
);
}
- private void createTopicAndVerifyEvents(String topicTypePartitioned,
String topicName) throws Exception {
+ private void createTopicAndVerifyEvents(String topicDomain, String
topicTypePartitioned, String topicName) throws Exception {
final String[] expectedEvents;
- if (topicTypePartitioned.equals("partitioned")) {
- topicNameToWatch = topicName + "-partition-1";
- admin.topics().createPartitionedTopic(topicName, 2);
- triggerPartitionsCreation(topicName);
-
+ if (topicDomain.equalsIgnoreCase("persistent") ||
topicTypePartitioned.equals("partitioned")) {
expectedEvents = new String[]{
"LOAD__BEFORE",
"CREATE__BEFORE",
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
-
} else {
- topicNameToWatch = topicName;
- admin.topics().createNonPartitionedTopic(topicName);
-
expectedEvents = new String[]{
+ // Before https://github.com/apache/pulsar/pull/21995,
Pulsar will skip create topic if the topic
+ // was already exists, and the action "check topic
exists" will try to load Managed ledger,
+ // the check triggers two exrtra events: [LOAD__BEFORE,
LOAD__FAILURE].
+ // #21995 fixed this wrong behavior, so remove these two
events.
"LOAD__BEFORE",
"LOAD__FAILURE",
"LOAD__BEFORE",
@@ -288,7 +284,14 @@ public class TopicEventsListenerTest extends
BrokerTestBase {
"CREATE__SUCCESS",
"LOAD__SUCCESS"
};
-
+ }
+ if (topicTypePartitioned.equals("partitioned")) {
+ topicNameToWatch = topicName + "-partition-1";
+ admin.topics().createPartitionedTopic(topicName, 2);
+ triggerPartitionsCreation(topicName);
+ } else {
+ topicNameToWatch = topicName;
+ admin.topics().createNonPartitionedTopic(topicName);
}
Awaitility.waitAtMost(10, TimeUnit.SECONDS).untilAsserted(() ->
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 11564cdf721..fe3c8b591ed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3417,4 +3417,32 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
producer.close();
admin.topics().delete(topic);
}
+
+ @Test
+ public void testGetStatsIfPartitionNotExists() throws Exception {
+ // create topic.
+ final String partitionedTp =
BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
+ admin.topics().createPartitionedTopic(partitionedTp, 1);
+ TopicName partition0 = TopicName.get(partitionedTp).getPartition(0);
+ boolean topicExists1 =
pulsar.getBrokerService().getTopic(partition0.toString(),
false).join().isPresent();
+ assertTrue(topicExists1);
+ // Verify topics-stats works.
+ TopicStats topicStats = admin.topics().getStats(partition0.toString());
+ assertNotNull(topicStats);
+
+ // Delete partition and call topic-stats again.
+ admin.topics().delete(partition0.toString());
+ boolean topicExists2 =
pulsar.getBrokerService().getTopic(partition0.toString(),
false).join().isPresent();
+ assertFalse(topicExists2);
+ // Verify: respond 404.
+ try {
+ admin.topics().getStats(partition0.toString());
+ fail("Should respond 404 after the partition was deleted");
+ } catch (Exception ex) {
+ assertTrue(ex.getMessage().contains("Topic partitions were not yet
created"));
+ }
+
+ // cleanup.
+ admin.topics().deletePartitionedTopic(partitionedTp);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c9138beee52..a75ae78cef3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -149,10 +149,11 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
.sendTimeout(1, TimeUnit.SECONDS)
.topic(topic)
.create()) {
- } catch (PulsarClientException.LookupException expected) {
- String msg = "Namespace bundle for topic (%s) not served by
this instance";
+ } catch (PulsarClientException.TopicDoesNotExistException
expected) {
+ // Since the "policies.deleted" is "true", the value of
"isAllowAutoTopicCreationAsync" will be false,
+ // so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
- assertTrue(expected.getMessage().contains(String.format(msg,
topic))
+ assertTrue(expected.getMessage().contains(topic)
||
expected.getMessage().contains(topicPoliciesServiceInitException));
}
@@ -160,10 +161,11 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
.topic(topic)
.subscriptionName("test")
.subscribe()) {
- } catch (PulsarClientException.LookupException expected) {
- String msg = "Namespace bundle for topic (%s) not served by
this instance";
+ } catch (PulsarClientException.TopicDoesNotExistException
expected) {
+ // Since the "policies.deleted" is "true", the value of
"isAllowAutoTopicCreationAsync" will be false,
+ // so the "TopicDoesNotExistException" is expected.
log.info("Expected error", expected);
- assertTrue(expected.getMessage().contains(String.format(msg,
topic))
+ assertTrue(expected.getMessage().contains(topic)
||
expected.getMessage().contains(topicPoliciesServiceInitException));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 4f3e1e930bd..4b4aa5b45d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -295,7 +295,8 @@ public class PersistentTopicTest extends BrokerTestBase {
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
pulsar.getBrokerService().getTopicIfExists(topicName).get();
-
assertTrue(pulsar.getBrokerService().getTopics().containsKey(topicName));
+ // The map topics should only contain partitions, does not contain
partitioned topic.
+
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topicName));
// ref of partitioned-topic name should be empty
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());