This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2a7a8555c0b [improve][broker] Avoid reconnection when a partitioned
topic was created concurrently (#16043)
2a7a8555c0b is described below
commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800
[improve][broker] Avoid reconnection when a partitioned topic was created
concurrently (#16043)
* [improve][broker] Avoid reconnection when a partitioned topic was created
concurrently
### Motivation
When a partitioned topic was created concurrently, especially when
automatically created by many producers. This case can be reproduced
easily by configuring `allowAutoTopicCreationType=non-partitioned` and
starting a Pulsar standalone. Then, run the following code:
```java
try (PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650").build()) {
for (int i = 0; i < 10; i++) {
client.newProducer().topic("topic").createAsync();
}
Thread.sleep(1000);
}
```
We can see a lot of "Could not get connection while
getPartitionedTopicMetadata" warning logs at client side, while there
were more warning logs with full stack traces at broker side:
```
2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN
org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata
[/127.0.0.1:64846] persistent://public/default/topic:
org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for /admin/partitioned-topics/public/default/persistent/topic
org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException:
org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for /admin/partitioned-topics/public/default/persistent/topic
```
It's because when broker handles the partitioned metadata command, it
calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and
will try creating a partitioned topic if it doesn't exist. It's a race
condition that if many connections are established during a short time
interval and one of them created successfully, the following will fail
with the `AlreadyExistsException`.
### Modifications
Handles the `MetadataStoreException.AlreadyExistsException` in
`unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke
`fetchPartitionedTopicMetadataAsync` to get the partitioned metadata
again.
### Verifying this change
Even if without this patch, the creation of producers could also succeed
because they will reconnect to broker again after 100 ms because broker
will return a `ServiceNotReady` error in thiss case. The only way to
verify this fix is reproducing the bug again with this patch, we can
see no reconnection will happen from the logs.
* Revert "[improve][broker] Avoid reconnection when a partitioned topic was
created concurrently"
This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087.
* Handle AlreadyExistsException in
fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync
---
.../org/apache/pulsar/broker/service/BrokerService.java | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b991c4378e8..7793c947b52 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2640,7 +2640,20 @@ public class BrokerService implements Closeable {
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
.thenAccept(md ->
future.complete(md))
.exceptionally(ex -> {
-
future.completeExceptionally(ex);
+ if (ex.getCause()
+ instanceof
MetadataStoreException.AlreadyExistsException) {
+ // The partitioned
topic might be created concurrently
+
fetchPartitionedTopicMetadataAsync(topicName)
+
.whenComplete((metadata2, ex2) -> {
+ if (ex2 ==
null) {
+
future.complete(metadata2);
+ } else {
+
future.completeExceptionally(ex2);
+ }
+ });
+ } else {
+
future.completeExceptionally(ex);
+ }
return null;
});
} else {