This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7535a31962b181803c7051294bc17e16b3a08a89
Author: atomchen <[email protected]>
AuthorDate: Tue Jul 30 15:11:05 2024 +0800

    [fix][client] TransactionCoordinatorClient support retry (#23081)
    
    (cherry picked from commit 6bbaec1f6b1cc09de42f14dccca1afd932c547d5)
---
 .../TransactionCoordinatorClientTest.java          | 24 ++++++++++++++++++++
 .../client/impl/TransactionMetaStoreHandler.java   | 26 ++++++++++++++++++----
 2 files changed, 46 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index c442c3a9014..36bc0e522c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -24,14 +24,18 @@ import java.lang.reflect.Field;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -107,4 +111,24 @@ public class TransactionCoordinatorClientTest extends 
TransactionMetaStoreTestBa
                     instanceof 
TransactionCoordinatorClientException.InvalidTxnStatusException);
         }
     }
+
+    @Test
+    public void testClientStartWithRetry() throws Exception{
+        String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl();
+        String invalidBrokerServiceUrl = "localhost:0";
+        String brokerServiceUrl = validBrokerServiceUrl + "," + 
invalidBrokerServiceUrl;
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
+        @Cleanup
+        TransactionCoordinatorClient transactionCoordinatorClient = new 
TransactionCoordinatorClientImpl(pulsarClient);
+
+        try {
+            transactionCoordinatorClient.start();
+        }catch (TransactionCoordinatorClientException e) {
+            Assert.fail("Shouldn't have exception at here", e);
+        }
+
+        Assert.assertEquals(transactionCoordinatorClient.getState(), 
State.READY);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 2a43ca20beb..e45d5397115 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -88,6 +89,10 @@ public class TransactionMetaStoreHandler extends HandlerState
     private Timeout requestTimeout;
 
     private final CompletableFuture<Void> connectFuture;
+    private final long lookupDeadline;
+    private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<>();
+
+
 
     public TransactionMetaStoreHandler(long transactionCoordinatorId, 
PulsarClientImpl pulsarClient, String topic,
                                        CompletableFuture<Void> connectFuture) {
@@ -109,6 +114,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         this.connectFuture = connectFuture;
         this.internalPinnedExecutor = 
pulsarClient.getInternalExecutorService();
         this.timer = pulsarClient.timer();
+        this.lookupDeadline = System.currentTimeMillis() + 
client.getConfiguration().getLookupTimeoutMs();
     }
 
     public void start() {
@@ -117,10 +123,22 @@ public class TransactionMetaStoreHandler extends 
HandlerState
 
     @Override
     public void connectionFailed(PulsarClientException exception) {
-        LOG.error("Transaction meta handler with transaction coordinator id {} 
connection failed.",
-            transactionCoordinatorId, exception);
-        if (!this.connectFuture.isDone()) {
-            this.connectFuture.completeExceptionally(exception);
+        boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
+        boolean timeout = System.currentTimeMillis() > lookupDeadline;
+        if (nonRetriableError || timeout) {
+            exception.setPreviousExceptions(previousExceptions);
+            if (connectFuture.completeExceptionally(exception)) {
+                if (nonRetriableError) {
+                    LOG.error("Transaction meta handler with transaction 
coordinator id {} connection failed.",
+                            transactionCoordinatorId, exception);
+                } else {
+                    LOG.error("Transaction meta handler with transaction 
coordinator id {} connection failed after "
+                            + "timeout", transactionCoordinatorId, exception);
+                }
+                setState(State.Failed);
+            }
+        } else {
+            previousExceptions.add(exception);
         }
     }
 

Reply via email to