This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f1be956092c [fix] [broker] Let Pending ack handler can retry to init 
when encounters a metadata store error (#23153)
f1be956092c is described below

commit f1be956092c2f5d5744fabf69643743751eb88dd
Author: fengyubiao <[email protected]>
AuthorDate: Mon Aug 12 23:10:19 2024 +0800

    [fix] [broker] Let Pending ack handler can retry to init when encounters a 
metadata store error (#23153)
    
    (cherry picked from commit 2dde4032127e228c34ac2c3729191b76220aec29)
---
 .../pendingack/impl/PendingAckHandleImpl.java      |  6 +++-
 .../pendingack/PendingAckPersistentTest.java       | 37 ++++++++++++++++------
 2 files changed, 33 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 9d07af4d26c..3481c6e39d2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.Position;
@@ -67,6 +68,7 @@ import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RecoverTimeRecord;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import 
org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 
 /**
@@ -990,7 +992,9 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
                 && !(realCause instanceof 
ManagedLedgerException.NonRecoverableLedgerException))
                 || realCause instanceof 
PulsarClientException.BrokerPersistenceException
                 || realCause instanceof PulsarClientException.LookupException
-                || realCause instanceof PulsarClientException.ConnectException;
+                || realCause instanceof PulsarClientException.ConnectException
+                || realCause instanceof MetadataStoreException
+                || realCause instanceof BKException;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 7f6750a0e36..8591514f1b7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -43,12 +43,14 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Multimap;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.BKException;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.map.LinkedMap;
-import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.PrometheusMetricsTestUtil;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -77,10 +79,12 @@ import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -104,16 +108,26 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         super.internalCleanup();
     }
 
+
+    @DataProvider(name = "retryableErrors")
+    public Object[][] retryableErrors() {
+        return new Object[][] {
+            {new ManagedLedgerException("mock retryable error")},
+            {new MetadataStoreException("mock retryable error")},
+            {new BKException(-1)},
+        };
+    }
+
     /**
      * Test consumer can be built successfully with retryable exception
      * and get correct error with no-retryable exception.
      * @throws Exception
      */
-    @Test(timeOut = 60000)
-    public void testBuildConsumerEncounterPendingAckInitFailure() throws 
Exception {
+    @Test(timeOut = 60000, dataProvider = "retryableErrors")
+    public void testBuildConsumerEncounterPendingAckInitFailure(Exception 
retryableError) throws Exception {
         // 1. Prepare and make sure the consumer can be built successfully.
-        String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
-        @Cleanup
+        String topic = BrokerTestUtil.newUniqueName(NAMESPACE1 + "/tp");
+        admin.topics().createNonPartitionedTopic(topic);
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
                 .subscriptionName("subName1")
                 .topic(topic)
@@ -131,11 +145,10 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         // The consumer will be built successfully after one time retry.
         when(mockProvider.checkInitializedBefore(any()))
                 // First, the method checkInitializedBefore will fail with a 
retryable exception.
-                .thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail initialize")))
+                .thenReturn(FutureUtil.failedFuture(retryableError))
                 // Then, the method will be executed successfully.
                 .thenReturn(CompletableFuture.completedFuture(false));
         transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
-        @Cleanup
         Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
                 .subscriptionName("subName2")
                 .topic(topic)
@@ -152,7 +165,6 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
                 // Then, the method will be executed successfully.
                 .thenCallRealMethod();
         transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
-        @Cleanup
         Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
                 .subscriptionName("subName3")
                 .topic(topic)
@@ -165,7 +177,7 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
                 .thenReturn(FutureUtil.failedFuture(new ManagedLedgerException
                         .NonRecoverableLedgerException("mock fail")))
                 .thenReturn(CompletableFuture.completedFuture(false));
-        @Cleanup PulsarClient pulsarClient = PulsarClient.builder()
+        PulsarClient pulsarClient = PulsarClient.builder()
                 .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
                 .operationTimeout(3, TimeUnit.SECONDS)
                 .build();
@@ -179,6 +191,13 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         } catch (Exception exception) {
             assertTrue(exception.getMessage().contains("Failed to init 
transaction pending ack."));
         }
+
+        // cleanup.
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        pulsarClient.close();
+        admin.topics().delete(topic, false);
     }
 
     @Test

Reply via email to