This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6bbaec1f6b1 [fix][client] TransactionCoordinatorClient support retry
(#23081)
6bbaec1f6b1 is described below
commit 6bbaec1f6b1cc09de42f14dccca1afd932c547d5
Author: atomchen <[email protected]>
AuthorDate: Tue Jul 30 15:11:05 2024 +0800
[fix][client] TransactionCoordinatorClient support retry (#23081)
---
.../TransactionCoordinatorClientTest.java | 24 ++++++++++++++++++++
.../client/impl/TransactionMetaStoreHandler.java | 26 ++++++++++++++++++----
.../TransactionCoordinatorClientImpl.java | 4 ++--
3 files changed, 48 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index c442c3a9014..36bc0e522c2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -24,14 +24,18 @@ import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import lombok.Cleanup;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
+import
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -107,4 +111,24 @@ public class TransactionCoordinatorClientTest extends
TransactionMetaStoreTestBa
instanceof
TransactionCoordinatorClientException.InvalidTxnStatusException);
}
}
+
+ @Test
+ public void testClientStartWithRetry() throws Exception{
+ String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl();
+ String invalidBrokerServiceUrl = "localhost:0";
+ String brokerServiceUrl = validBrokerServiceUrl + "," +
invalidBrokerServiceUrl;
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(brokerServiceUrl).build();
+ @Cleanup
+ TransactionCoordinatorClient transactionCoordinatorClient = new
TransactionCoordinatorClientImpl(pulsarClient);
+
+ try {
+ transactionCoordinatorClient.start();
+ }catch (TransactionCoordinatorClientException e) {
+ Assert.fail("Shouldn't have exception at here", e);
+ }
+
+ Assert.assertEquals(transactionCoordinatorClient.getState(),
State.READY);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 2a43ca20beb..e45d5397115 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -88,6 +89,10 @@ public class TransactionMetaStoreHandler extends HandlerState
private Timeout requestTimeout;
private final CompletableFuture<Void> connectFuture;
+ private final long lookupDeadline;
+ private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<>();
+
+
public TransactionMetaStoreHandler(long transactionCoordinatorId,
PulsarClientImpl pulsarClient, String topic,
CompletableFuture<Void> connectFuture) {
@@ -109,6 +114,7 @@ public class TransactionMetaStoreHandler extends
HandlerState
this.connectFuture = connectFuture;
this.internalPinnedExecutor =
pulsarClient.getInternalExecutorService();
this.timer = pulsarClient.timer();
+ this.lookupDeadline = System.currentTimeMillis() +
client.getConfiguration().getLookupTimeoutMs();
}
public void start() {
@@ -117,10 +123,22 @@ public class TransactionMetaStoreHandler extends
HandlerState
@Override
public void connectionFailed(PulsarClientException exception) {
- LOG.error("Transaction meta handler with transaction coordinator id {}
connection failed.",
- transactionCoordinatorId, exception);
- if (!this.connectFuture.isDone()) {
- this.connectFuture.completeExceptionally(exception);
+ boolean nonRetriableError =
!PulsarClientException.isRetriableError(exception);
+ boolean timeout = System.currentTimeMillis() > lookupDeadline;
+ if (nonRetriableError || timeout) {
+ exception.setPreviousExceptions(previousExceptions);
+ if (connectFuture.completeExceptionally(exception)) {
+ if (nonRetriableError) {
+ LOG.error("Transaction meta handler with transaction
coordinator id {} connection failed.",
+ transactionCoordinatorId, exception);
+ } else {
+ LOG.error("Transaction meta handler with transaction
coordinator id {} connection failed after "
+ + "timeout", transactionCoordinatorId, exception);
+ }
+ setState(State.Failed);
+ }
+ } else {
+ previousExceptions.add(exception);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 499627f9c73..45a3ad4f978 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -79,8 +79,8 @@ public class TransactionCoordinatorClientImpl implements
TransactionCoordinatorC
@Override
public CompletableFuture<Void> startAsync() {
if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
- return pulsarClient.getLookup()
-
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
true)
+ return pulsarClient.getPartitionedTopicMetadata(
+
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true)
.thenCompose(partitionMeta -> {
List<CompletableFuture<Void>> connectFutureList = new
ArrayList<>();
if (LOG.isDebugEnabled()) {