This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 794f229 [Issue 12774][metadata] Fix race condition in
PulsarLedgerIdGenerator#generateShortLedgerId (#14118)
794f229 is described below
commit 794f22949cffeb16a18ca69758322fafb5018a2d
Author: JiangHaiting <[email protected]>
AuthorDate: Sun Feb 6 01:43:39 2022 +0800
[Issue 12774][metadata] Fix race condition in
PulsarLedgerIdGenerator#generateShortLedgerId (#14118)
* Fix race condition in
org.apache.pulsar.metadata.bookkeeper.PulsarLedgerIdGenerator#generateShortLedgerId
* restore testDoNotReplicateSystemTopic
* fix BadVersionException check in createHOBPathAndGenerateId
---
.../apache/pulsar/broker/service/ReplicatorTest.java | 1 -
.../metadata/bookkeeper/PulsarLedgerIdGenerator.java | 19 +++++++++++++++----
.../apache/pulsar/metadata/impl/ZKMetadataStore.java | 2 +-
3 files changed, 16 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 0c6e7e7..c34b641 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1317,7 +1317,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
});
cleanup();
setup();
-
}
private void initTransaction(int coordinatorSize, PulsarAdmin admin,
String ServiceUrl,
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
index 625385f..29a65b8 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
@@ -80,8 +80,19 @@ public class PulsarLedgerIdGenerator implements
LedgerIdGenerator {
// Proceed
return internalGenerateShortLedgerId();
} else {
- return store.put(shortIdGenPath, new byte[0],
Optional.empty())
- .thenCompose(__ ->
internalGenerateShortLedgerId());
+ CompletableFuture<Void> future = new
CompletableFuture<>();
+ store.put(shortIdGenPath, new byte[0],
Optional.of(-1L))
+ .whenComplete((stat, throwable) -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(throwable);
+ if (cause == null
+ || cause instanceof
MetadataStoreException.BadVersionException) {
+ // creat shortIdGenPath success or it
already created by others.
+ future.complete(null);
+ } else {
+
future.completeExceptionally(throwable);
+ }
+ });
+ return future.thenCompose(__ ->
internalGenerateShortLedgerId());
}
});
}
@@ -187,8 +198,8 @@ public class PulsarLedgerIdGenerator implements
LedgerIdGenerator {
CompletableFuture<Long> future = new CompletableFuture<>();
store.put(ledgerPrefix + formatHalfId(hob), new byte[0],
Optional.empty())
.whenComplete((__, ex) -> {
- if (ex != null && !(ex.getCause()
- .getCause() instanceof
MetadataStoreException.BadVersionException)) {
+ ex = FutureUtil.unwrapCompletionException(ex);
+ if (ex != null && !(ex instanceof
MetadataStoreException.BadVersionException)) {
// BadVersion is OK here because we can have multiple
threads (or nodes) trying to create the
// new HOB path
future.completeExceptionally(ex);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 0a64a33..cd20295 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -386,7 +386,7 @@ public class ZKMetadataStore extends
AbstractBatchedMetadataStore
put(opPut.getPath(), opPut.getData(),
Optional.of(-1L)).thenAccept(
s -> future.complete(s))
.exceptionally(ex -> {
- future.completeExceptionally(new
MetadataStoreException(ex.getCause()));
+
future.completeExceptionally(MetadataStoreException.wrap(ex.getCause()));
return null;
});
}