This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 676c38878aa27a87f53b78a756660f767947edb8 Author: atomchen <[email protected]> AuthorDate: Tue Jul 30 15:11:05 2024 +0800 [fix][client] TransactionCoordinatorClient support retry (#23081) (cherry picked from commit 6bbaec1f6b1cc09de42f14dccca1afd932c547d5) --- .../TransactionCoordinatorClientTest.java | 24 ++++++++++++++++++++ .../client/impl/TransactionMetaStoreHandler.java | 26 ++++++++++++++++++---- .../TransactionCoordinatorClientImpl.java | 4 ++-- 3 files changed, 48 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java index c442c3a9014..36bc0e522c2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java @@ -24,14 +24,18 @@ import java.lang.reflect.Field; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -107,4 +111,24 @@ public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBa instanceof TransactionCoordinatorClientException.InvalidTxnStatusException); } } + + @Test + public void testClientStartWithRetry() throws Exception{ + String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl(); + String invalidBrokerServiceUrl = "localhost:0"; + String brokerServiceUrl = validBrokerServiceUrl + "," + invalidBrokerServiceUrl; + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build(); + @Cleanup + TransactionCoordinatorClient transactionCoordinatorClient = new TransactionCoordinatorClientImpl(pulsarClient); + + try { + transactionCoordinatorClient.start(); + }catch (TransactionCoordinatorClientException e) { + Assert.fail("Shouldn't have exception at here", e); + } + + Assert.assertEquals(transactionCoordinatorClient.getState(), State.READY); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 2a43ca20beb..e45d5397115 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -88,6 +89,10 @@ public class TransactionMetaStoreHandler extends HandlerState private Timeout requestTimeout; private final CompletableFuture<Void> connectFuture; + private final long lookupDeadline; + private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>(); + + public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic, CompletableFuture<Void> connectFuture) { @@ -109,6 +114,7 @@ public class TransactionMetaStoreHandler extends HandlerState this.connectFuture = connectFuture; this.internalPinnedExecutor = pulsarClient.getInternalExecutorService(); this.timer = pulsarClient.timer(); + this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs(); } public void start() { @@ -117,10 +123,22 @@ public class TransactionMetaStoreHandler extends HandlerState @Override public void connectionFailed(PulsarClientException exception) { - LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", - transactionCoordinatorId, exception); - if (!this.connectFuture.isDone()) { - this.connectFuture.completeExceptionally(exception); + boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); + boolean timeout = System.currentTimeMillis() > lookupDeadline; + if (nonRetriableError || timeout) { + exception.setPreviousExceptions(previousExceptions); + if (connectFuture.completeExceptionally(exception)) { + if (nonRetriableError) { + LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", + transactionCoordinatorId, exception); + } else { + LOG.error("Transaction meta handler with transaction coordinator id {} connection failed after " + + "timeout", transactionCoordinatorId, exception); + } + setState(State.Failed); + } + } else { + previousExceptions.add(exception); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 499627f9c73..45a3ad4f978 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -79,8 +79,8 @@ public class TransactionCoordinatorClientImpl implements TransactionCoordinatorC @Override public CompletableFuture<Void> startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { - return pulsarClient.getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, true) + return pulsarClient.getPartitionedTopicMetadata( + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true) .thenCompose(partitionMeta -> { List<CompletableFuture<Void>> connectFutureList = new ArrayList<>(); if (LOG.isDebugEnabled()) {
