This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bbaec1f6b1 [fix][client] TransactionCoordinatorClient support retry 
(#23081)
6bbaec1f6b1 is described below

commit 6bbaec1f6b1cc09de42f14dccca1afd932c547d5
Author: atomchen <[email protected]>
AuthorDate: Tue Jul 30 15:11:05 2024 +0800

    [fix][client] TransactionCoordinatorClient support retry (#23081)
---
 .../TransactionCoordinatorClientTest.java          | 24 ++++++++++++++++++++
 .../client/impl/TransactionMetaStoreHandler.java   | 26 ++++++++++++++++++----
 .../TransactionCoordinatorClientImpl.java          |  4 ++--
 3 files changed, 48 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index c442c3a9014..36bc0e522c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -24,14 +24,18 @@ import java.lang.reflect.Field;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -107,4 +111,24 @@ public class TransactionCoordinatorClientTest extends 
TransactionMetaStoreTestBa
                     instanceof 
TransactionCoordinatorClientException.InvalidTxnStatusException);
         }
     }
+
+    @Test
+    public void testClientStartWithRetry() throws Exception{
+        String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl();
+        String invalidBrokerServiceUrl = "localhost:0";
+        String brokerServiceUrl = validBrokerServiceUrl + "," + 
invalidBrokerServiceUrl;
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
+        @Cleanup
+        TransactionCoordinatorClient transactionCoordinatorClient = new 
TransactionCoordinatorClientImpl(pulsarClient);
+
+        try {
+            transactionCoordinatorClient.start();
+        }catch (TransactionCoordinatorClientException e) {
+            Assert.fail("Shouldn't have exception at here", e);
+        }
+
+        Assert.assertEquals(transactionCoordinatorClient.getState(), 
State.READY);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 2a43ca20beb..e45d5397115 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -88,6 +89,10 @@ public class TransactionMetaStoreHandler extends HandlerState
     private Timeout requestTimeout;
 
     private final CompletableFuture<Void> connectFuture;
+    private final long lookupDeadline;
+    private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<>();
+
+
 
     public TransactionMetaStoreHandler(long transactionCoordinatorId, 
PulsarClientImpl pulsarClient, String topic,
                                        CompletableFuture<Void> connectFuture) {
@@ -109,6 +114,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         this.connectFuture = connectFuture;
         this.internalPinnedExecutor = 
pulsarClient.getInternalExecutorService();
         this.timer = pulsarClient.timer();
+        this.lookupDeadline = System.currentTimeMillis() + 
client.getConfiguration().getLookupTimeoutMs();
     }
 
     public void start() {
@@ -117,10 +123,22 @@ public class TransactionMetaStoreHandler extends 
HandlerState
 
     @Override
     public void connectionFailed(PulsarClientException exception) {
-        LOG.error("Transaction meta handler with transaction coordinator id {} 
connection failed.",
-            transactionCoordinatorId, exception);
-        if (!this.connectFuture.isDone()) {
-            this.connectFuture.completeExceptionally(exception);
+        boolean nonRetriableError = 
!PulsarClientException.isRetriableError(exception);
+        boolean timeout = System.currentTimeMillis() > lookupDeadline;
+        if (nonRetriableError || timeout) {
+            exception.setPreviousExceptions(previousExceptions);
+            if (connectFuture.completeExceptionally(exception)) {
+                if (nonRetriableError) {
+                    LOG.error("Transaction meta handler with transaction 
coordinator id {} connection failed.",
+                            transactionCoordinatorId, exception);
+                } else {
+                    LOG.error("Transaction meta handler with transaction 
coordinator id {} connection failed after "
+                            + "timeout", transactionCoordinatorId, exception);
+                }
+                setState(State.Failed);
+            }
+        } else {
+            previousExceptions.add(exception);
         }
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 499627f9c73..45a3ad4f978 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -79,8 +79,8 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
     @Override
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
-            return pulsarClient.getLookup()
-                
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 
true)
+            return pulsarClient.getPartitionedTopicMetadata(
+                            
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true)
                 .thenCompose(partitionMeta -> {
                     List<CompletableFuture<Void>> connectFutureList = new 
ArrayList<>();
                     if (LOG.isDebugEnabled()) {

Reply via email to