This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 66e18626a8ae1b9d118dbd6bc5f74b9ec6f8bbe9 Author: Kai Wang <[email protected]> AuthorDate: Wed Jul 13 11:06:00 2022 +0800 [fix][broker] Retry when DistributedIdGenerator has BadVersion error (#16491) (cherry picked from commit 1fcf9eddc0f65d926e929b48cd196d6f975c1554) --- .../api/coordination/CoordinationService.java | 4 ++ .../coordination/impl/CoordinationServiceImpl.java | 32 +++++++++++ .../org/apache/pulsar/metadata/CounterTest.java | 64 +++++++++++++++++++++- 3 files changed, 99 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java index 7b42f34cd8e..52e2bd98b33 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/CoordinationService.java @@ -48,7 +48,10 @@ public interface CoordinationService extends AutoCloseable { * Increment a counter identified by the specified path and return the current value. * * The counter value will be guaranteed to be unique within the context of the path. + * It will retry when {@link org.apache.pulsar.metadata.api.MetadataStoreException} happened. * + * If the maximum number of retries is reached and still failed, + * the feature will complete with exception {@link org.apache.pulsar.metadata.api.MetadataStoreException}. * @param path * the path that identifies a particular counter * @return a future that will track the completion of the operation @@ -56,4 +59,5 @@ public interface CoordinationService extends AutoCloseable { * if there's a failure in incrementing the counter */ CompletableFuture<Long> getNextCounterValue(String path); + } \ No newline at end of file diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java index 2b7e38b6c44..c8ce40e37c7 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -42,10 +43,14 @@ import org.apache.pulsar.metadata.api.coordination.LockManager; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +@Slf4j @SuppressWarnings("unchecked") public class CoordinationServiceImpl implements CoordinationService { private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10); + + private static final int GET_NEXT_COUNTER_VALUE_RETRY_COUNT = 5; + private final MetadataStoreExtended store; private final Map<Object, LockManager<?>> lockManagers = new ConcurrentHashMap<>(); @@ -94,6 +99,12 @@ public class CoordinationServiceImpl implements CoordinationService { @Override public CompletableFuture<Long> getNextCounterValue(String path) { + CompletableFuture<Long> future = new CompletableFuture<>(); + internalGetNextCounterValueWithRetry(path, future, GET_NEXT_COUNTER_VALUE_RETRY_COUNT); + return future; + } + + private CompletableFuture<Long> internalGetNextCounterValue(String path) { return store.exists(path) .thenCompose(exists -> { if (exists) { @@ -106,6 +117,27 @@ public class CoordinationServiceImpl implements CoordinationService { }); } + private void internalGetNextCounterValueWithRetry(String path, CompletableFuture<Long> future, int count) { + if (count == 0) { + log.error("The number of retries has exhausted when get next counter value from path {}", path); + future.completeExceptionally(new MetadataStoreException("The number of retries has exhausted")); + return; + } + this.internalGetNextCounterValue(path) + .thenAccept(future::complete) + .exceptionally(ex -> { + if (ex.getCause() instanceof MetadataStoreException.BadVersionException) { + log.warn("Failed to get next counter value because of bad version. " + + "Retry to get next counter value from path {}", path); + internalGetNextCounterValueWithRetry(path, future, count - 1); + } else { + log.error("Failed to get next counter value from path {}", path, ex); + future.completeExceptionally(ex); + } + return null; + }); + } + private CompletableFuture<Long> incrementCounter(String path) { String counterBasePath = path + "/-"; return store diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java index 9a15b2ba2dd..2ce855a5dea 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/CounterTest.java @@ -18,10 +18,22 @@ */ package org.apache.pulsar.metadata; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.fail; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import lombok.Cleanup; import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; @@ -48,7 +60,7 @@ public class CounterTest extends BaseMetadataStoreTest { @Cleanup CoordinationService cs2 = new CoordinationServiceImpl(store); - long l4 = cs1.getNextCounterValue("/my/path").join(); + long l4 = cs2.getNextCounterValue("/my/path").join(); assertNotEquals(l3, l4); } @@ -85,4 +97,54 @@ public class CounterTest extends BaseMetadataStoreTest { assertNotEquals(l2, l4); assertNotEquals(l3, l4); } + + @Test(dataProvider = "impl") + public void testGetNextCounterRetry(String provider, Supplier<String> urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().build()); + + MetadataStoreExtended spy = spy(store); + + @Cleanup + CoordinationService cs1 = new CoordinationServiceImpl(spy); + + AtomicInteger count = new AtomicInteger(0); + CompletableFuture<Stat> future = new CompletableFuture<>(); + future.completeExceptionally(new MetadataStoreException.BadVersionException("")); + when(spy.put(eq("/my/path"), eq(new byte[0]), eq(Optional.empty()))) + .thenAnswer(__ -> { + // Retry three times, then it will return success. + if (count.incrementAndGet() <= 3) { + return future; + } + reset(spy); + return CompletableFuture.completedFuture(null); + }); + + long l1 = cs1.getNextCounterValue("/my/path").join(); + long l2 = cs1.getNextCounterValue("/my/path").join(); + long l3 = cs1.getNextCounterValue("/my/path").join(); + + assertNotEquals(l1, l2); + assertNotEquals(l2, l3); + + when(spy.put(eq("/my/path1"), eq(new byte[0]), eq(Optional.empty()))) + .thenReturn(future); + + try { + cs1.getNextCounterValue("/my/path1").join(); + fail("Should fail with MetadataStoreException."); + } catch (Exception ex) { + assertEquals(ex.getCause().getMessage(), "The number of retries has exhausted"); + } + + reset(spy); + + @Cleanup + CoordinationService cs2 = new CoordinationServiceImpl(store); + + long l4 = cs2.getNextCounterValue("/my/path").join(); + assertNotEquals(l3, l4); + } }
