This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b01d5d2da269c5f072167e8f21d569e25a35306f Author: Yunze Xu <[email protected]> AuthorDate: Tue Jun 14 17:03:37 2022 +0800 [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043) * [improve][broker] Avoid reconnection when a partitioned topic was created concurrently ### Motivation When a partitioned topic was created concurrently, especially when automatically created by many producers. This case can be reproduced easily by configuring `allowAutoTopicCreationType=non-partitioned` and starting a Pulsar standalone. Then, run the following code: ```java try (PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650").build()) { for (int i = 0; i < 10; i++) { client.newProducer().topic("topic").createAsync(); } Thread.sleep(1000); } ``` We can see a lot of "Could not get connection while getPartitionedTopicMetadata" warning logs at client side, while there were more warning logs with full stack traces at broker side: ``` 2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata [/127.0.0.1:64846] persistent://public/default/topic: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic ``` It's because when broker handles the partitioned metadata command, it calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and will try creating a partitioned topic if it doesn't exist. It's a race condition that if many connections are established during a short time interval and one of them created successfully, the following will fail with the `AlreadyExistsException`. ### Modifications Handles the `MetadataStoreException.AlreadyExistsException` in `unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke `fetchPartitionedTopicMetadataAsync` to get the partitioned metadata again. ### Verifying this change Even if without this patch, the creation of producers could also succeed because they will reconnect to broker again after 100 ms because broker will return a `ServiceNotReady` error in thiss case. The only way to verify this fix is reproducing the bug again with this patch, we can see no reconnection will happen from the logs. * Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently" This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087. * Handle AlreadyExistsException in fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6) --- .../pulsar/broker/service/BrokerService.java | 48 +++++++++++++++++----- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bdaded637b6..d943734e0a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2412,16 +2412,44 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies .thenCompose(topicExists -> { return fetchPartitionedTopicMetadataAsync(topicName) .thenCompose(metadata -> { - // If topic is already exist, creating partitioned topic is not allowed. - if (metadata.partitions == 0 - && !topicExists - && !topicName.isPartitioned() - && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName) - && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) { - return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName); - } else { - return CompletableFuture.completedFuture(metadata); - } + CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>(); + + // There are a couple of potentially blocking calls, which we cannot make from the + // MetadataStore callback thread. + pulsar.getExecutor().execute(() -> { + // If topic is already exist, creating partitioned topic is not allowed. + + if (metadata.partitions == 0 + && !topicExists + && !topicName.isPartitioned() + && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName) + && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) { + + pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName) + .thenAccept(md -> future.complete(md)) + .exceptionally(ex -> { + if (ex.getCause() + instanceof MetadataStoreException.AlreadyExistsException) { + // The partitioned topic might be created concurrently + fetchPartitionedTopicMetadataAsync(topicName) + .whenComplete((metadata2, ex2) -> { + if (ex2 == null) { + future.complete(metadata2); + } else { + future.completeExceptionally(ex2); + } + }); + } else { + future.completeExceptionally(ex); + } + return null; + }); + } else { + future.complete(metadata); + } + }); + + return future; }); }); }
