This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 66e18626a8ae1b9d118dbd6bc5f74b9ec6f8bbe9
Author: Kai Wang <[email protected]>
AuthorDate: Wed Jul 13 11:06:00 2022 +0800

    [fix][broker] Retry when DistributedIdGenerator has BadVersion error 
(#16491)
    
    (cherry picked from commit 1fcf9eddc0f65d926e929b48cd196d6f975c1554)
---
 .../api/coordination/CoordinationService.java      |  4 ++
 .../coordination/impl/CoordinationServiceImpl.java | 32 +++++++++++
 .../org/apache/pulsar/metadata/CounterTest.java    | 64 +++++++++++++++++++++-
 3 files changed, 99 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java
index 7b42f34cd8e..52e2bd98b33 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java
@@ -48,7 +48,10 @@ public interface CoordinationService extends AutoCloseable {
      * Increment a counter identified by the specified path and return the 
current value.
      *
      * The counter value will be guaranteed to be unique within the context of 
the path.
+     * It will retry when {@link 
org.apache.pulsar.metadata.api.MetadataStoreException} happened.
      *
+     * If the maximum number of retries is reached and still failed,
+     * the feature will complete with exception {@link 
org.apache.pulsar.metadata.api.MetadataStoreException}.
      * @param path
      *            the path that identifies a particular counter
      * @return a future that will track the completion of the operation
@@ -56,4 +59,5 @@ public interface CoordinationService extends AutoCloseable {
      *             if there's a failure in incrementing the counter
      */
     CompletableFuture<Long> getNextCounterValue(String path);
+
 }
\ No newline at end of file
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
index 2b7e38b6c44..c8ce40e37c7 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -42,10 +43,14 @@ import 
org.apache.pulsar.metadata.api.coordination.LockManager;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
+@Slf4j
 @SuppressWarnings("unchecked")
 public class CoordinationServiceImpl implements CoordinationService {
 
     private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
+
+    private static final int GET_NEXT_COUNTER_VALUE_RETRY_COUNT = 5;
+
     private final MetadataStoreExtended store;
 
     private final Map<Object, LockManager<?>> lockManagers = new 
ConcurrentHashMap<>();
@@ -94,6 +99,12 @@ public class CoordinationServiceImpl implements 
CoordinationService {
 
     @Override
     public CompletableFuture<Long> getNextCounterValue(String path) {
+        CompletableFuture<Long> future = new CompletableFuture<>();
+        internalGetNextCounterValueWithRetry(path, future, 
GET_NEXT_COUNTER_VALUE_RETRY_COUNT);
+        return future;
+    }
+
+    private CompletableFuture<Long> internalGetNextCounterValue(String path) {
         return store.exists(path)
                 .thenCompose(exists -> {
                     if (exists) {
@@ -106,6 +117,27 @@ public class CoordinationServiceImpl implements 
CoordinationService {
                 });
     }
 
+    private void internalGetNextCounterValueWithRetry(String path, 
CompletableFuture<Long> future, int count) {
+        if (count == 0) {
+            log.error("The number of retries has exhausted when get next 
counter value from path {}", path);
+            future.completeExceptionally(new MetadataStoreException("The 
number of retries has exhausted"));
+            return;
+        }
+        this.internalGetNextCounterValue(path)
+                .thenAccept(future::complete)
+                .exceptionally(ex -> {
+                    if (ex.getCause() instanceof 
MetadataStoreException.BadVersionException) {
+                        log.warn("Failed to get next counter value because of 
bad version. "
+                                + "Retry to get next counter value from path 
{}", path);
+                        internalGetNextCounterValueWithRetry(path, future, 
count - 1);
+                    } else {
+                        log.error("Failed to get next counter value from path 
{}", path, ex);
+                        future.completeExceptionally(ex);
+                    }
+                    return null;
+                });
+    }
+
     private CompletableFuture<Long> incrementCounter(String path) {
         String counterBasePath = path + "/-";
         return store
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
index 9a15b2ba2dd..2ce855a5dea 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java
@@ -18,10 +18,22 @@
  */
 package org.apache.pulsar.metadata;
 
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.fail;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 import lombok.Cleanup;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
@@ -48,7 +60,7 @@ public class CounterTest extends BaseMetadataStoreTest {
         @Cleanup
         CoordinationService cs2 = new CoordinationServiceImpl(store);
 
-        long l4 = cs1.getNextCounterValue("/my/path").join();
+        long l4 = cs2.getNextCounterValue("/my/path").join();
         assertNotEquals(l3, l4);
     }
 
@@ -85,4 +97,54 @@ public class CounterTest extends BaseMetadataStoreTest {
         assertNotEquals(l2, l4);
         assertNotEquals(l3, l4);
     }
+
+    @Test(dataProvider = "impl")
+    public void testGetNextCounterRetry(String provider, Supplier<String> 
urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStoreExtended store = 
MetadataStoreExtended.create(urlSupplier.get(),
+                MetadataStoreConfig.builder().build());
+
+        MetadataStoreExtended spy = spy(store);
+
+        @Cleanup
+        CoordinationService cs1 = new CoordinationServiceImpl(spy);
+
+        AtomicInteger count = new AtomicInteger(0);
+        CompletableFuture<Stat> future = new CompletableFuture<>();
+        future.completeExceptionally(new 
MetadataStoreException.BadVersionException(""));
+        when(spy.put(eq("/my/path"), eq(new byte[0]), eq(Optional.empty())))
+                .thenAnswer(__ -> {
+                    // Retry three times, then it will return success.
+                    if (count.incrementAndGet() <= 3) {
+                        return future;
+                    }
+                    reset(spy);
+                    return CompletableFuture.completedFuture(null);
+                });
+
+        long l1 = cs1.getNextCounterValue("/my/path").join();
+        long l2 = cs1.getNextCounterValue("/my/path").join();
+        long l3 = cs1.getNextCounterValue("/my/path").join();
+
+        assertNotEquals(l1, l2);
+        assertNotEquals(l2, l3);
+
+        when(spy.put(eq("/my/path1"), eq(new byte[0]), eq(Optional.empty())))
+                .thenReturn(future);
+
+        try {
+            cs1.getNextCounterValue("/my/path1").join();
+            fail("Should fail with MetadataStoreException.");
+        } catch (Exception ex) {
+            assertEquals(ex.getCause().getMessage(), "The number of retries 
has exhausted");
+        }
+
+        reset(spy);
+
+        @Cleanup
+        CoordinationService cs2 = new CoordinationServiceImpl(store);
+
+        long l4 = cs2.getNextCounterValue("/my/path").join();
+        assertNotEquals(l3, l4);
+    }
 }

Reply via email to