This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3c067ce2802 [fix][broker] Fix create topic with different auto
creation strategies causes race condition (#21545)
3c067ce2802 is described below
commit 3c067ce28025e116146977118312a1471ba284f5
Author: Qiang Zhao <[email protected]>
AuthorDate: Fri Nov 10 13:30:05 2023 +0800
[fix][broker] Fix create topic with different auto creation strategies
causes race condition (#21545)
---
.../pulsar/broker/service/BrokerService.java | 14 +++++-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +-
.../pulsar/broker/admin/TopicAutoCreationTest.java | 57 ++++++++++++++++++++++
.../service/persistent/PersistentTopicTest.java | 3 +-
4 files changed, 72 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4f64d5aab86..b9a8e74b9a4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1075,10 +1075,22 @@ public class BrokerService implements Closeable {
return
loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
- return
CompletableFuture.completedFuture(Optional.empty());
+ final String errorMsg =
+ String.format("Illegal topic
partition name %s with max allowed "
+ + "%d partitions",
topicName, metadata.partitions);
+ log.warn(errorMsg);
+ return FutureUtil
+ .failedFuture(new
BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName,
createIfMissing, properties, topicPolicies);
+ }).thenCompose(optionalTopic -> {
+ if (!optionalTopic.isPresent() && createIfMissing) {
+ log.warn("[{}] Try to recreate the topic with
createIfMissing=true "
+ + "but the returned topic is empty",
topicName);
+ return getTopic(topicName, createIfMissing,
properties);
+ }
+ return
CompletableFuture.completedFuture(optionalTopic);
});
});
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 5abb0e02e58..2a49c14e355 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3193,7 +3193,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.topics().createSubscription(partitionedTopicName +
"-partition-" + startPartitions, subName1,
MessageId.earliest);
fail("Unexpected behaviour");
- } catch (PulsarAdminException.PreconditionFailedException ex) {
+ } catch (PulsarAdminException.ConflictException ex) {
// OK
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 590edc2d3f3..c9138beee52 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -27,7 +27,10 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -40,6 +43,7 @@ import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -55,6 +59,7 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setDefaultNumPartitions(3);
+ conf.setForceDeleteNamespaceAllowed(true);
super.internalSetup();
super.producerBaseSetup();
}
@@ -186,4 +191,56 @@ public class TopicAutoCreationTest extends
ProducerConsumerBase {
}
}
+
+ @Test
+ public void testClientWithAutoCreationGotNotFoundException() throws
PulsarAdminException, PulsarClientException {
+ final String namespace = "public/test_1";
+ final String topicName =
"persistent://public/test_1/test_auto_creation_got_not_found"
+ + System.currentTimeMillis();
+ final int retryTimes = 30;
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setAutoTopicCreation(namespace,
AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType("non-partitioned")
+ .build());
+
+ @Cleanup("shutdown")
+ final ExecutorService executor1 = Executors.newSingleThreadExecutor();
+
+ @Cleanup("shutdown")
+ final ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+ for (int i = 0; i < retryTimes; i++) {
+ final CompletableFuture<Void> adminListSub =
CompletableFuture.runAsync(() -> {
+ try {
+ admin.topics().getSubscriptions(topicName);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ }, executor1);
+
+ final CompletableFuture<Consumer<byte[]>> consumerSub =
CompletableFuture.supplyAsync(() -> {
+ try {
+ return pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("sub-1")
+ .subscribe();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }, executor2);
+
+ try {
+ adminListSub.join();
+ } catch (Throwable ex) {
+ // we don't care the exception.
+ }
+
+ consumerSub.join().close();
+ admin.topics().delete(topicName, true);
+ }
+
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index 9995b6a28a9..6f60a13fd48 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -457,8 +457,7 @@ public class PersistentTopicTest extends BrokerTestBase {
.topic(partition.toString())
.create();
fail("unexpected behaviour");
- } catch (PulsarClientException.TopicDoesNotExistException ignored) {
-
+ } catch (PulsarClientException.NotAllowedException ex) {
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
4);
}