This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7535a31962b181803c7051294bc17e16b3a08a89 Author: atomchen <[email protected]> AuthorDate: Tue Jul 30 15:11:05 2024 +0800 [fix][client] TransactionCoordinatorClient support retry (#23081) (cherry picked from commit 6bbaec1f6b1cc09de42f14dccca1afd932c547d5) --- .../TransactionCoordinatorClientTest.java | 24 ++++++++++++++++++++ .../client/impl/TransactionMetaStoreHandler.java | 26 ++++++++++++++++++---- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java index c442c3a9014..36bc0e522c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java @@ -24,14 +24,18 @@ import java.lang.reflect.Field; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -107,4 +111,24 @@ public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBa instanceof TransactionCoordinatorClientException.InvalidTxnStatusException); } } + + @Test + public void testClientStartWithRetry() throws Exception{ + String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl(); + String invalidBrokerServiceUrl = "localhost:0"; + String brokerServiceUrl = validBrokerServiceUrl + "," + invalidBrokerServiceUrl; + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build(); + @Cleanup + TransactionCoordinatorClient transactionCoordinatorClient = new TransactionCoordinatorClientImpl(pulsarClient); + + try { + transactionCoordinatorClient.start(); + }catch (TransactionCoordinatorClientException e) { + Assert.fail("Shouldn't have exception at here", e); + } + + Assert.assertEquals(transactionCoordinatorClient.getState(), State.READY); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 2a43ca20beb..e45d5397115 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -88,6 +89,10 @@ public class TransactionMetaStoreHandler extends HandlerState private Timeout requestTimeout; private final CompletableFuture<Void> connectFuture; + private final long lookupDeadline; + private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>(); + + public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic, CompletableFuture<Void> connectFuture) { @@ -109,6 +114,7 @@ public class TransactionMetaStoreHandler extends HandlerState this.connectFuture = connectFuture; this.internalPinnedExecutor = pulsarClient.getInternalExecutorService(); this.timer = pulsarClient.timer(); + this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs(); } public void start() { @@ -117,10 +123,22 @@ public class TransactionMetaStoreHandler extends HandlerState @Override public void connectionFailed(PulsarClientException exception) { - LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", - transactionCoordinatorId, exception); - if (!this.connectFuture.isDone()) { - this.connectFuture.completeExceptionally(exception); + boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); + boolean timeout = System.currentTimeMillis() > lookupDeadline; + if (nonRetriableError || timeout) { + exception.setPreviousExceptions(previousExceptions); + if (connectFuture.completeExceptionally(exception)) { + if (nonRetriableError) { + LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", + transactionCoordinatorId, exception); + } else { + LOG.error("Transaction meta handler with transaction coordinator id {} connection failed after " + + "timeout", transactionCoordinatorId, exception); + } + setState(State.Failed); + } + } else { + previousExceptions.add(exception); } }
