This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d40619360337b4355d5ff0173bb1f39cefefe128 Author: fengyubiao <[email protected]> AuthorDate: Mon Aug 12 23:10:19 2024 +0800 [fix] [broker] Let Pending ack handler can retry to init when encounters a metadata store error (#23153) (cherry picked from commit 2dde4032127e228c34ac2c3729191b76220aec29) --- .../pendingack/impl/PendingAckHandleImpl.java | 6 +++- .../pendingack/PendingAckPersistentTest.java | 35 +++++++++++++++++----- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 9d07af4d26c..3481c6e39d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; @@ -67,6 +68,7 @@ import org.apache.pulsar.common.util.Backoff; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.RecoverTimeRecord; import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.transaction.common.exception.TransactionConflictException; /** @@ -990,7 +992,9 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi && !(realCause instanceof ManagedLedgerException.NonRecoverableLedgerException)) || realCause instanceof PulsarClientException.BrokerPersistenceException || realCause instanceof PulsarClientException.LookupException - || realCause instanceof PulsarClientException.ConnectException; + || realCause instanceof PulsarClientException.ConnectException + || realCause instanceof MetadataStoreException + || realCause instanceof BKException; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 93a2f274517..6d2bb0c77b8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -45,11 +45,13 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.map.LinkedMap; import org.apache.pulsar.PrometheusMetricsTestUtil; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; @@ -78,10 +80,12 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.metadata.api.MetadataStoreException; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -105,16 +109,26 @@ public class PendingAckPersistentTest extends TransactionTestBase { super.internalCleanup(); } + + @DataProvider(name = "retryableErrors") + public Object[][] retryableErrors() { + return new Object[][] { + {new ManagedLedgerException("mock retryable error")}, + {new MetadataStoreException("mock retryable error")}, + {new BKException(-1)}, + }; + } + /** * Test consumer can be built successfully with retryable exception * and get correct error with no-retryable exception. * @throws Exception */ - @Test(timeOut = 60000) - public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception { + @Test(timeOut = 60000, dataProvider = "retryableErrors") + public void testBuildConsumerEncounterPendingAckInitFailure(Exception retryableError) throws Exception { // 1. Prepare and make sure the consumer can be built successfully. - String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck"; - @Cleanup + String topic = BrokerTestUtil.newUniqueName(NAMESPACE1 + "/tp"); + admin.topics().createNonPartitionedTopic(topic); Consumer<byte[]> consumer1 = pulsarClient.newConsumer() .subscriptionName("subName1") .topic(topic) @@ -132,11 +146,10 @@ public class PendingAckPersistentTest extends TransactionTestBase { // The consumer will be built successfully after one time retry. when(mockProvider.checkInitializedBefore(any())) // First, the method checkInitializedBefore will fail with a retryable exception. - .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize"))) + .thenReturn(FutureUtil.failedFuture(retryableError)) // Then, the method will be executed successfully. .thenReturn(CompletableFuture.completedFuture(false)); transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); - @Cleanup Consumer<byte[]> consumer2 = pulsarClient.newConsumer() .subscriptionName("subName2") .topic(topic) @@ -153,7 +166,6 @@ public class PendingAckPersistentTest extends TransactionTestBase { // Then, the method will be executed successfully. .thenCallRealMethod(); transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); - @Cleanup Consumer<byte[]> consumer3 = pulsarClient.newConsumer() .subscriptionName("subName3") .topic(topic) @@ -166,7 +178,7 @@ public class PendingAckPersistentTest extends TransactionTestBase { .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException .NonRecoverableLedgerException("mock fail"))) .thenReturn(CompletableFuture.completedFuture(false)); - @Cleanup PulsarClient pulsarClient = PulsarClient.builder() + PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) .operationTimeout(3, TimeUnit.SECONDS) .build(); @@ -180,6 +192,13 @@ public class PendingAckPersistentTest extends TransactionTestBase { } catch (Exception exception) { assertTrue(exception.getMessage().contains("Failed to init transaction pending ack.")); } + + // cleanup. + consumer1.close(); + consumer2.close(); + consumer3.close(); + pulsarClient.close(); + admin.topics().delete(topic, false); } @Test
