This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 3b164f5ac0c Revert "[improve][broker] Avoid reconnection when a
partitioned topic was created concurrently (#16043)"
3b164f5ac0c is described below
commit 3b164f5ac0c900d6c162596892ec7e702e48f937
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jul 29 11:25:00 2022 +0800
Revert "[improve][broker] Avoid reconnection when a partitioned topic was
created concurrently (#16043)"
This reverts commit b01d5d2da269c5f072167e8f21d569e25a35306f.
---
.../pulsar/broker/service/BrokerService.java | 48 +++++-----------------
1 file changed, 10 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d943734e0a8..bdaded637b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2412,44 +2412,16 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
.thenCompose(topicExists -> {
return fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
- CompletableFuture<PartitionedTopicMetadata>
future = new CompletableFuture<>();
-
- // There are a couple of potentially blocking
calls, which we cannot make from the
- // MetadataStore callback thread.
- pulsar.getExecutor().execute(() -> {
- // If topic is already exist, creating
partitioned topic is not allowed.
-
- if (metadata.partitions == 0
- && !topicExists
- && !topicName.isPartitioned()
- &&
pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
- &&
pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-
-
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
- .thenAccept(md ->
future.complete(md))
- .exceptionally(ex -> {
- if (ex.getCause()
- instanceof
MetadataStoreException.AlreadyExistsException) {
- // The partitioned
topic might be created concurrently
-
fetchPartitionedTopicMetadataAsync(topicName)
-
.whenComplete((metadata2, ex2) -> {
- if (ex2 ==
null) {
-
future.complete(metadata2);
- } else {
-
future.completeExceptionally(ex2);
- }
- });
- } else {
-
future.completeExceptionally(ex);
- }
- return null;
- });
- } else {
- future.complete(metadata);
- }
- });
-
- return future;
+ // If topic is already exist, creating
partitioned topic is not allowed.
+ if (metadata.partitions == 0
+ && !topicExists
+ && !topicName.isPartitioned()
+ &&
pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+ &&
pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+ return
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+ } else {
+ return
CompletableFuture.completedFuture(metadata);
+ }
});
});
}