This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 794f229  [Issue 12774][metadata] Fix race condition in 
PulsarLedgerIdGenerator#generateShortLedgerId (#14118)
794f229 is described below

commit 794f22949cffeb16a18ca69758322fafb5018a2d
Author: JiangHaiting <[email protected]>
AuthorDate: Sun Feb 6 01:43:39 2022 +0800

    [Issue 12774][metadata] Fix race condition in 
PulsarLedgerIdGenerator#generateShortLedgerId (#14118)
    
    * Fix race condition in 
org.apache.pulsar.metadata.bookkeeper.PulsarLedgerIdGenerator#generateShortLedgerId
    
    * restore testDoNotReplicateSystemTopic
    
    * fix BadVersionException check in createHOBPathAndGenerateId
---
 .../apache/pulsar/broker/service/ReplicatorTest.java  |  1 -
 .../metadata/bookkeeper/PulsarLedgerIdGenerator.java  | 19 +++++++++++++++----
 .../apache/pulsar/metadata/impl/ZKMetadataStore.java  |  2 +-
 3 files changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 0c6e7e7..c34b641 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -1317,7 +1317,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         });
         cleanup();
         setup();
-
     }
 
     private void initTransaction(int coordinatorSize, PulsarAdmin admin, 
String ServiceUrl,
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
index 625385f..29a65b8 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerIdGenerator.java
@@ -80,8 +80,19 @@ public class PulsarLedgerIdGenerator implements 
LedgerIdGenerator {
                         // Proceed
                         return internalGenerateShortLedgerId();
                     } else {
-                        return store.put(shortIdGenPath, new byte[0], 
Optional.empty())
-                                .thenCompose(__ -> 
internalGenerateShortLedgerId());
+                        CompletableFuture<Void> future = new 
CompletableFuture<>();
+                        store.put(shortIdGenPath, new byte[0], 
Optional.of(-1L))
+                                .whenComplete((stat, throwable) -> {
+                                    Throwable cause = 
FutureUtil.unwrapCompletionException(throwable);
+                                    if (cause == null
+                                            || cause instanceof 
MetadataStoreException.BadVersionException) {
+                                        // creat shortIdGenPath success or it 
already created by others.
+                                        future.complete(null);
+                                    } else {
+                                        
future.completeExceptionally(throwable);
+                                    }
+                                });
+                        return future.thenCompose(__ -> 
internalGenerateShortLedgerId());
                     }
                 });
     }
@@ -187,8 +198,8 @@ public class PulsarLedgerIdGenerator implements 
LedgerIdGenerator {
         CompletableFuture<Long> future = new CompletableFuture<>();
         store.put(ledgerPrefix + formatHalfId(hob), new byte[0], 
Optional.empty())
                 .whenComplete((__, ex) -> {
-                    if (ex != null && !(ex.getCause()
-                            .getCause() instanceof 
MetadataStoreException.BadVersionException)) {
+                    ex = FutureUtil.unwrapCompletionException(ex);
+                    if (ex != null && !(ex instanceof 
MetadataStoreException.BadVersionException)) {
                         // BadVersion is OK here because we can have multiple 
threads (or nodes) trying to create the
                         // new HOB path
                         future.completeExceptionally(ex);
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 0a64a33..cd20295 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -386,7 +386,7 @@ public class ZKMetadataStore extends 
AbstractBatchedMetadataStore
                                 put(opPut.getPath(), opPut.getData(), 
Optional.of(-1L)).thenAccept(
                                                 s -> future.complete(s))
                                         .exceptionally(ex -> {
-                                            future.completeExceptionally(new 
MetadataStoreException(ex.getCause()));
+                                            
future.completeExceptionally(MetadataStoreException.wrap(ex.getCause()));
                                             return null;
                                         });
                             }

Reply via email to