This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bff7916441e9ed89d20f25664088290cf856cf5a
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Dec 14 21:15:57 2021 +0800

    [Transaction]No TransactionCoordinatorNotFound, but automatic reconnect 
(#13135)
    
    ### Motivation and Modification
    We should not throw the following exceptions to the user to deal with.
    1. `TransactionCoordinatorNotFound` or `ManagerLedgerFenceException`
               --- we should  retry the operation and reconnect to TC
    2. `TransactionMetaStoreHandler` was connecting
              ---- add the operation into `pendingRequests`, and executed the 
requests in `pendingRequests` when the connected completely.
    3.  The complexity of concurrent operations is too high. For operations in 
a TransactionMetaStoreHandler, consider using single-threaded operations
            --- use `internalPinnedExecutor`
    
    (cherry picked from commit 56323e4a5b70c3008706515acd871ba0571ec1eb)
---
 .../broker/TransactionMetadataStoreService.java    |   8 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  68 ++-
 .../impl}/TransactionClientConnectTest.java        | 218 +++-----
 .../client/impl/TransactionMetaStoreHandler.java   | 577 ++++++++++++++-------
 4 files changed, 504 insertions(+), 367 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 240c6c9..3f3e8d8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -356,7 +356,7 @@ public class TransactionMetadataStoreService {
                                         endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
 
                             }
-                            completableFuture.completeExceptionally(e);
+                            
completableFuture.completeExceptionally(e.getCause());
                             return null;
                         })).exceptionally(e -> {
                     if (!isRetryableException(e.getCause())) {
@@ -371,7 +371,7 @@ public class TransactionMetadataStoreService {
                                 endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
 
                     }
-                    completableFuture.completeExceptionally(e);
+                    completableFuture.completeExceptionally(e.getCause());
                     return null;
                 });
             } else {
@@ -391,7 +391,7 @@ public class TransactionMetadataStoreService {
                             LOG.error("EndTxnInTransactionBuffer fail! TxnId : 
{}, "
                                     + "TxnAction : {}", txnID, txnAction, e);
                         }
-                        completableFuture.completeExceptionally(e);
+                        completableFuture.completeExceptionally(e.getCause());
                         return null;
                     });
                 } else {
@@ -409,7 +409,7 @@ public class TransactionMetadataStoreService {
                 transactionOpRetryTimer.newTimeout(timeout -> 
endTransaction(txnID, txnAction, isTimeout),
                         endTransactionRetryIntervalTime, 
TimeUnit.MILLISECONDS);
             }
-            completableFuture.completeExceptionally(e);
+            completableFuture.completeExceptionally(e.getCause());
             return null;
         });
         return completableFuture;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7762c22..30ce9c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2003,7 +2003,26 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return true;
         }
     }
+    private Throwable handleTxnException(Throwable ex, String op, long 
requestId) {
+        if (ex instanceof CoordinatorException.CoordinatorNotFoundException || 
ex != null
+                && ex.getCause() instanceof 
CoordinatorException.CoordinatorNotFoundException) {
+            if (log.isDebugEnabled()) {
+                log.debug("The Coordinator was not found for the request {}", 
op);
+            }
+            return ex;
+        }
+        if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException 
|| ex != null
+                && ex.getCause() instanceof 
ManagedLedgerException.ManagedLedgerFencedException) {
+            if (log.isDebugEnabled()) {
+                log.debug("Throw a CoordinatorNotFoundException to client "
+                        + "with the message got from a 
ManagedLedgerFencedException for the request {}", op);
+            }
+            return new 
CoordinatorException.CoordinatorNotFoundException(ex.getMessage());
 
+        }
+        log.error("Send response error for {} request {}.", op, requestId, ex);
+        return ex;
+    }
     @Override
     protected void handleNewTxn(CommandNewTxn command) {
         final long requestId = command.getRequestId();
@@ -2028,9 +2047,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     ctx.writeAndFlush(Commands.newTxnResponse(requestId, 
txnID.getLeastSigBits(),
                             txnID.getMostSigBits()));
                 } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Send response error for new txn request 
{}", requestId, ex);
-                    }
+                    ex = handleTxnException(ex, 
BaseCommand.Type.NEW_TXN.name(), requestId);
 
                     ctx.writeAndFlush(Commands.newTxnResponse(requestId, 
tcId.getId(),
                             BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage()));
@@ -2066,19 +2083,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                     } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Send response error for add published 
partition to txn request {}", requestId,
-                                    ex);
-                        }
+                        ex = handleTxnException(ex, 
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);
 
-                        if (ex instanceof 
CoordinatorException.CoordinatorNotFoundException) {
-                            
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
-                                    
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
-                        } else {
-                            
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
-                                    
BrokerServiceException.getClientErrorCode(ex.getCause()),
-                                    ex.getCause().getMessage()));
-                        }
+                        
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
+                                BrokerServiceException.getClientErrorCode(ex),
+                                ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
             }));
@@ -2105,16 +2114,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
                                 txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                     } else {
-                        log.error("Send response error for end txn request.", 
ex);
+                        ex = handleTxnException(ex, 
BaseCommand.Type.END_TXN.name(), requestId);
+                        
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+                                BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage()));
 
-                        if (ex instanceof 
CoordinatorException.CoordinatorNotFoundException) {
-                            
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
-                                    
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
-                        } else {
-                            
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
-                                    
BrokerServiceException.getClientErrorCode(ex.getCause()),
-                                    ex.getCause().getMessage()));
-                        }
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
                 });
@@ -2325,20 +2328,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                                 txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
                         log.info("handle add partition to txn finish.");
                     } else {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Send response error for add published 
partition to txn request {}",
-                                    requestId, ex);
-                        }
+                        ex = handleTxnException(ex, 
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
 
-                        if (ex instanceof 
CoordinatorException.CoordinatorNotFoundException) {
-                            
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
-                                    txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex),
-                                    ex.getMessage()));
-                        } else {
-                            
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
-                                    txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex.getCause()),
-                                    ex.getCause().getMessage()));
-                        }
+                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                                txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex),
+                                ex.getMessage()));
                         transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
                 }));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
similarity index 50%
rename from 
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
rename to 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
index a51eae8..7fb924f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
@@ -16,19 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction;
-
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
 import org.apache.pulsar.client.api.MessageId;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
 import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -39,10 +48,11 @@ import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.junit.Assert.assertFalse;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-import static org.testng.FileAssert.fail;
 
+@Slf4j
 public class TransactionClientConnectTest extends TransactionTestBase {
 
     private static final String RECONNECT_TOPIC = NAMESPACE1 + 
"/txn-client-reconnect-test";
@@ -60,142 +70,69 @@ public class TransactionClientConnectTest extends 
TransactionTestBase {
 
     @Test
     public void testTransactionNewReconnect() throws Exception {
-        start();
-
-        // when throw CoordinatorNotFoundException client will reconnect tc
-        try {
-            pulsarClient.newTransaction()
-                    .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-        reconnect();
-
-        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-
-        // tc fence will remove this tc and reopen
-        try {
-            pulsarClient.newTransaction()
-                    .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
-            fail();
-        } catch (ExecutionException e) {
-            assertEquals(e.getCause().getMessage(),
-                    
"org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException:
 " +
-                            "java.lang.Exception: Attempted to use a fenced 
managed ledger");
-        }
-
-        reconnect();
+        Callable<CompletableFuture<?>> callable = () -> 
pulsarClient.newTransaction()
+                .withTransactionTimeout(200, TimeUnit.MILLISECONDS).build();
+        tryCommandReconnect(callable, callable);
     }
 
     @Test
     public void testTransactionAddSubscriptionToTxnAsyncReconnect() throws 
Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
+        Callable<CompletableFuture<?>> callable = () -> 
transactionCoordinatorClient
+                .addSubscriptionToTxnAsync(new TxnID(0, 0), "test", "test");
+        tryCommandReconnect(callable, callable);
+    }
 
+    public void tryCommandReconnect(Callable<CompletableFuture<?>> callable1, 
Callable<CompletableFuture<?>> callable2)
+            throws Exception {
+        start();
         try {
-            transactionCoordinatorClient.addSubscriptionToTxnAsync(new 
TxnID(0, 0), "test", "test").get();
-            fail();
+            callable1.call().get();
         } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+            assertFalse(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+            waitToReady();
+            callable1.call().get();
         }
-
-        reconnect();
         
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+        CompletableFuture<?> completableFuture = callable2.call();
         try {
-            transactionCoordinatorClient.addSubscriptionToTxnAsync(new 
TxnID(0, 0), "test", "test").get();
-            fail();
+            completableFuture.get(3, TimeUnit.SECONDS);
+        } catch (TimeoutException ignore) {
         } catch (ExecutionException e) {
-            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
-            }
+            Assert.assertFalse(e.getCause()
+                    instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
         }
-        reconnect();
+
+        
unFence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+        completableFuture.get();
     }
 
     @Test
     public void testTransactionAbortToTxnAsyncReconnect() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
-
-        try {
-            transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-
-        reconnect();
-        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-        try {
-            transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
-            }
-        }
-        reconnect();
+        Callable<CompletableFuture<?>> callable1 = () -> 
transactionCoordinatorClient.abortAsync(new TxnID(0,
+                0));
+        Callable<CompletableFuture<?>> callable2 = () -> 
transactionCoordinatorClient.abortAsync(new TxnID(0,
+                1));
+        tryCommandReconnect(callable1, callable2);
     }
 
     @Test
     public void testTransactionCommitToTxnAsyncReconnect() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
-
-        try {
-            transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-
-        reconnect();
-        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-        try {
-            transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
-            fail();
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
-            }
-        }
-        reconnect();
+        Callable<CompletableFuture<?>> callable1 = () -> 
transactionCoordinatorClient.commitAsync(new TxnID(0,
+                0));
+        Callable<CompletableFuture<?>> callable2 = () -> 
transactionCoordinatorClient.commitAsync(new TxnID(0,
+                1));
+        tryCommandReconnect(callable1, callable2);
     }
 
     @Test
     public void testTransactionAddPublishPartitionToTxnReconnect() throws 
Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
-        start();
-
-        try {
-            transactionCoordinatorClient.addPublishPartitionToTxnAsync(new 
TxnID(0, 0),
-                    Collections.singletonList("test")).get();
-            fail();
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
-        }
-
-        reconnect();
-        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
-        try {
-            transactionCoordinatorClient.addPublishPartitionToTxnAsync(new 
TxnID(0, 0),
-                    Collections.singletonList("test")).get();
-            fail();
-        } catch (ExecutionException e) {
-            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
-                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
-            } else {
-                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
-            }
-        }
-        reconnect();
+        Callable<CompletableFuture<?>> callable = () -> 
transactionCoordinatorClient.addPublishPartitionToTxnAsync(new TxnID(0, 0),
+                Collections.singletonList("test"));
+        tryCommandReconnect(callable, callable);
     }
 
     @Test
@@ -209,7 +146,11 @@ public class TransactionClientConnectTest extends 
TransactionTestBase {
         for (TransactionMetaStoreHandler handler : handlers) {
             handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
         }
-        pulsarClient.close();
+        for (TransactionMetaStoreHandler handler : handlers) {
+            Field stateField = HandlerState.class.getDeclaredField("state");
+            stateField.setAccessible(true);
+            stateField.set(handler, HandlerState.State.Closed);
+        }
         for (TransactionMetaStoreHandler handler : handlers) {
             Method method = 
TransactionMetaStoreHandler.class.getMethod("getConnectHandleState");
             method.setAccessible(true);
@@ -225,21 +166,14 @@ public class TransactionClientConnectTest extends 
TransactionTestBase {
 
     public void start() throws Exception {
         // wait transaction coordinator init success
-        Awaitility.await().until(() -> {
-            try {
-                pulsarClient.newTransaction()
-                        .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
-            } catch (Exception e) {
-                return false;
-            }
-            return true;
-        });
         pulsarClient.newTransaction()
-                .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
+                .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+        pulsarClient.newTransaction()
+                .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
 
         TransactionMetadataStoreService transactionMetadataStoreService =
                 
getPulsarServiceList().get(0).getTransactionMetadataStoreService();
-        // remove transaction metadata store
+        // remove transaction metadap0-ta store
         
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)).get();
 
     }
@@ -250,15 +184,35 @@ public class TransactionClientConnectTest extends 
TransactionTestBase {
         field.set(((MLTransactionMetadataStore) 
transactionMetadataStoreService.getStores()
                 .get(TransactionCoordinatorID.get(0))).getManagedLedger(), 
ManagedLedgerImpl.State.Fenced);
     }
+    public void unFence(TransactionMetadataStoreService 
transactionMetadataStoreService) throws Exception {
+        Field field = ManagedLedgerImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(((MLTransactionMetadataStore) 
transactionMetadataStoreService.getStores()
+                .get(TransactionCoordinatorID.get(0))).getManagedLedger(), 
ManagedLedgerImpl.State.LedgerOpened);
+    }
 
-    public void reconnect() {
-        //reconnect
+    public void waitToReady() throws Exception{
+        TransactionMetadataStoreService transactionMetadataStoreService =
+                
getPulsarServiceList().get(0).getTransactionMetadataStoreService();
+        Class<TransactionMetadataStoreService> 
transactionMetadataStoreServiceClass =
+                TransactionMetadataStoreService.class;
+        Field field1 =
+                
transactionMetadataStoreServiceClass.getDeclaredField("stores");
+        field1.setAccessible(true);
+        Map<TransactionCoordinatorID, TransactionMetadataStore> stores =
+                (Map<TransactionCoordinatorID, TransactionMetadataStore>) 
field1
+                        .get(transactionMetadataStoreService);
         Awaitility.await().until(() -> {
-            try {
-                pulsarClient.newTransaction()
-                        .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
-            } catch (Exception e) {
-                return false;
+            for (TransactionMetadataStore transactionMetadataStore : 
stores.values()) {
+                Class<TransactionMetadataStoreState> 
transactionMetadataStoreStateClass =
+                        TransactionMetadataStoreState.class;
+                Field field = 
transactionMetadataStoreStateClass.getDeclaredField("state");
+                field.setAccessible(true);
+                TransactionMetadataStoreState.State state =
+                        (TransactionMetadataStoreState.State) 
field.get(transactionMetadataStore);
+                if (!state.equals(TransactionMetadataStoreState.State.Ready)) {
+                    return false;
+                }
             }
             return true;
         });
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index ba6ee50..b2b756a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -23,7 +23,9 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.Timeout;
+import io.netty.util.Timer;
 import io.netty.util.TimerTask;
+import java.util.concurrent.ExecutorService;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -61,6 +63,9 @@ public class TransactionMetaStoreHandler extends HandlerState 
implements Connect
         new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLinkedQueue<RequestTime> timeoutQueue;
 
+    protected final Timer timer;
+    private final ExecutorService internalPinnedExecutor;
+
     private static class RequestTime {
         final long creationTimeMs;
         final long requestId;
@@ -96,6 +101,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             this);
         this.connectFuture = connectFuture;
         this.connectionHandler.grabCnx();
+        this.timer = pulsarClient.timer();
+        internalPinnedExecutor = pulsarClient.getInternalExecutorService();
     }
 
     @Override
@@ -109,64 +116,73 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
 
     @Override
     public void connectionOpened(ClientCnx cnx) {
-        LOG.info("Transaction meta handler with transaction coordinator id {} 
connection opened.",
-            transactionCoordinatorId);
-
-        if (getState() == State.Closing || getState() == State.Closed) {
-            setState(State.Closed);
-            failPendingRequest();
-            this.pendingRequests.clear();
-            return;
-        }
-
-        connectionHandler.setClientCnx(cnx);
-        cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, 
this);
-
-        // if broker protocol version < 19, don't send TcClientConnectRequest 
to broker.
-        if (cnx.getRemoteEndpointProtocolVersion() > 
ProtocolVersion.v18.getValue()) {
-            long requestId = client.newRequestId();
-            ByteBuf request = 
Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId);
+        internalPinnedExecutor.execute(() -> {
+            LOG.info("Transaction meta handler with transaction coordinator id 
{} connection opened.",
+                    transactionCoordinatorId);
+
+            if (getState() == State.Closing || getState() == State.Closed) {
+                setState(State.Closed);
+                failPendingRequest();
+                this.pendingRequests.clear();
+                return;
+            }
 
-            cnx.sendRequestWithId(request, requestId).thenRun(() -> {
-                LOG.info("Transaction coordinator client connect success! tcId 
: {}", transactionCoordinatorId);
+            connectionHandler.setClientCnx(cnx);
+            cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, 
this);
+
+            // if broker protocol version < 19, don't send 
TcClientConnectRequest to broker.
+            if (cnx.getRemoteEndpointProtocolVersion() > 
ProtocolVersion.v18.getValue()) {
+                long requestId = client.newRequestId();
+                ByteBuf request = 
Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId);
+
+                cnx.sendRequestWithId(request, requestId).thenRun(() -> {
+                    internalPinnedExecutor.execute(() -> {
+                        LOG.info("Transaction coordinator client connect 
success! tcId : {}", transactionCoordinatorId);
+                        if (!changeToReadyState()) {
+                            setState(State.Closed);
+                            cnx.channel().close();
+                        }
+
+                        if (!this.connectFuture.isDone()) {
+                            this.connectFuture.complete(null);
+                        }
+                        this.connectionHandler.resetBackoff();
+                        pendingRequests.forEach((requestID, opBase) -> 
checkStateAndSendRequest(opBase));
+                    });
+                }).exceptionally((e) -> {
+                    internalPinnedExecutor.execute(() -> {
+                        LOG.error("Transaction coordinator client connect 
fail! tcId : {}",
+                                transactionCoordinatorId, e.getCause());
+                        if (getState() == State.Closing || getState() == 
State.Closed
+                                || e.getCause() instanceof 
PulsarClientException.NotAllowedException) {
+                            setState(State.Closed);
+                            cnx.channel().close();
+                        } else {
+                            connectionHandler.reconnectLater(e.getCause());
+                        }
+                    });
+                    return null;
+                });
+            } else {
                 if (!changeToReadyState()) {
-                    setState(State.Closed);
-                    cnx.channel().close();
-                }
-
-                if (!this.connectFuture.isDone()) {
-                    this.connectFuture.complete(null);
-                }
-                this.connectionHandler.resetBackoff();
-            }).exceptionally((e) -> {
-                LOG.error("Transaction coordinator client connect fail! tcId : 
{}",
-                        transactionCoordinatorId, e.getCause());
-                if (getState() == State.Closing || getState() == State.Closed
-                        || e.getCause() instanceof 
PulsarClientException.NotAllowedException) {
-                    setState(State.Closed);
                     cnx.channel().close();
-                } else {
-                    connectionHandler.reconnectLater(e.getCause());
                 }
-                return null;
-            });
-        } else {
-            if (!changeToReadyState()) {
-                cnx.channel().close();
+                this.connectFuture.complete(null);
             }
-            this.connectFuture.complete(null);
-        }
+        });
     }
 
     private void failPendingRequest() {
-        pendingRequests.keys().forEach(k -> {
-            OpBase<?> op = pendingRequests.remove(k);
-            if (op != null && !op.callback.isDone()) {
-                op.callback.completeExceptionally(new 
PulsarClientException.AlreadyClosedException(
-                        "Could not get response from transaction meta store 
when " +
-                                "the transaction meta store has already 
close."));
-                onResponse(op);
-            }
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.keys().forEach(k -> {
+                OpBase<?> op = pendingRequests.remove(k);
+                if (op != null && !op.callback.isDone()) {
+                    op.callback.completeExceptionally(new 
PulsarClientException.AlreadyClosedException(
+                            "Could not get response from transaction meta 
store when " +
+                                    "the transaction meta store has already 
close."));
+                    onResponse(op);
+                }
+            });
         });
     }
 
@@ -175,42 +191,76 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             LOG.debug("New transaction with timeout in ms {}", 
unit.toMillis(timeout));
         }
         CompletableFuture<TxnID> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, 
unit.toMillis(timeout));
-        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, 
client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
+            checkStateAndSendRequest(op);
+        });
         return callback;
     }
 
     void handleNewTxnResponse(CommandNewTxnResponse response) {
-        OpForTxnIdCallBack op = (OpForTxnIdCallBack) 
pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got new txn response for timeout {} - {}", 
response.getTxnidMostBits(),
-                    response.getTxnidLeastBits());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+             error = response.getError();
+             message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForTxnIdCallBack op = (OpForTxnIdCallBack) 
pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got new txn response for timeout {} - {}", 
txnID.getMostSigBits(),
+                            txnID.getLeastSigBits());
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got new txn response {} for request {}", txnID, 
response.getRequestId());
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got new txn response {} for request {}", txnID, 
requestId);
+                }
+                op.callback.complete(txnID);
+            } else {
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for the {}  request {} error 
"
+                                        + "TransactionCoordinatorNotFound and 
try it again",
+                                BaseCommand.Type.NEW_TXN.name(), requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if 
(!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already 
timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("Got {} for request {} error {}", 
BaseCommand.Type.NEW_TXN.name(),
+                        requestId, error);
             }
-            op.callback.complete(txnID);
-        } else {
-            LOG.error("Got new txn for request {} error {}", 
response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
-        }
 
-        onResponse(op);
+            onResponse(op);
+        });
     }
 
     public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, 
List<String> partitions) {
@@ -218,42 +268,80 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             LOG.debug("Add publish partition {} to txn {}", partitions, txnID);
         }
         CompletableFuture<Void> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddPartitionToTxn(
                 requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
partitions);
-        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        OpForVoidCallBack op = OpForVoidCallBack
+                .create(cmd, callback, client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
+            checkStateAndSendRequest(op);
+        });
+
         return callback;
     }
 
     void 
handleAddPublishPartitionToTxnResponse(CommandAddPartitionToTxnResponse 
response) {
-        OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got add publish partition to txn response for 
timeout {} - {}", response.getTxnidMostBits(),
-                        response.getTxnidLeastBits());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+            error = response.getError();
+            message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got add publish partition to txn response for 
timeout {} - {}", txnID.getMostSigBits(),
+                            txnID.getLeastSigBits());
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Add publish partition for request {} success.", 
response.getRequestId());
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Add publish partition for request {} success.", 
requestId);
+                }
+                op.callback.complete(null);
+            } else {
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for the {} request {} "
+                                        + " error 
TransactionCoordinatorNotFound and try it again",
+                                BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), 
requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if 
(!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already 
timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("{} for request {} error {} with txnID {}.", 
BaseCommand.Type.ADD_PARTITION_TO_TXN.name(),
+                        requestId, error, txnID);
+
             }
-            op.callback.complete(null);
-        } else {
-            LOG.error("Add publish partition for request {} error {}.", 
response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
-        }
 
-        onResponse(op);
+            onResponse(op);
+        });
     }
 
     public CompletableFuture<Void> addSubscriptionToTxn(TxnID txnID, 
List<Subscription> subscriptionList) {
@@ -262,41 +350,76 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
         }
 
         CompletableFuture<Void> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddSubscriptionToTxn(
                 requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
subscriptionList);
-        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-        cmd.retain();
-        cnx().ctx().writeAndFlush(cmd, cnx().ctx().voidPromise());
+        OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
+            checkStateAndSendRequest(op);
+        });
         return callback;
     }
 
     public void 
handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse 
response) {
-        OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Add subscription to txn timeout for request {}.", 
response.getRequestId());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+            error = response.getError();
+            message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Add subscription to txn timeout for request 
{}.", requestId);
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Add subscription to txn success for request {}.", 
response.getRequestId());
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Add subscription to txn success for request 
{}.", requestId);
+                }
+                op.callback.complete(null);
+            } else {
+                LOG.error("Add subscription to txn failed for request {} error 
{}.",
+                        requestId, error);
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for {} request {} error 
TransactionCoordinatorNotFound and try it again",
+                                
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if 
(!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already 
timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("{} failed for request {} error {}.", 
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(),
+                       requestId, error);
+
             }
-            op.callback.complete(null);
-        } else {
-            LOG.error("Add subscription to txn failed for request {} error 
{}.",
-                    response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
-        }
-        onResponse(op);
+            onResponse(op);
+        });
     }
 
     public CompletableFuture<Void> endTxnAsync(TxnID txnID, TxnAction action) {
@@ -304,68 +427,115 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             LOG.debug("End txn {}, action {}", txnID, action);
         }
         CompletableFuture<Void> callback = new CompletableFuture<>();
-
         if (!canSendRequest(callback)) {
             return callback;
         }
         long requestId = client.newRequestId();
         BaseCommand cmd = Commands.newEndTxn(requestId, 
txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
         ByteBuf buf = Commands.serializeWithSize(cmd);
-        OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback);
-        pendingRequests.put(requestId, op);
-        timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
-        buf.retain();
-        cnx().ctx().writeAndFlush(buf, cnx().ctx().voidPromise());
+        OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client);
+        internalPinnedExecutor.execute(() -> {
+            pendingRequests.put(requestId, op);
+            timeoutQueue.add(new RequestTime(System.currentTimeMillis(), 
requestId));
+            checkStateAndSendRequest(op);
+        });
         return callback;
     }
 
     void handleEndTxnResponse(CommandEndTxnResponse response) {
-        OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(response.getRequestId());
-        if (op == null) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got end txn response for timeout {} - {}", 
response.getTxnidMostBits(),
-                        response.getTxnidLeastBits());
+        boolean hasError = response.hasError();
+        ServerError error;
+        String message;
+        if (hasError) {
+            error = response.getError();
+            message = response.getMessage();
+        } else {
+            error = null;
+            message = null;
+        }
+        TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
+        long requestId = response.getRequestId();
+        internalPinnedExecutor.execute(() -> {
+            OpForVoidCallBack op = (OpForVoidCallBack) 
pendingRequests.remove(requestId);
+            if (op == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got end txn response for timeout {} - {}", 
txnID.getMostSigBits(),
+                            txnID.getLeastSigBits());
+                }
+                return;
             }
-            return;
-        }
 
-        if (!response.hasError()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Got end txn response success for request {}", 
response.getRequestId());
-            }
-            op.callback.complete(null);
-        } else {
-            LOG.error("Got end txn response for request {} error {}", 
response.getRequestId(), response.getError());
-            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
-        }
+            if (!hasError) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got end txn response success for request {}", 
requestId);
+                }
+                op.callback.complete(null);
+            } else {
+                if (checkIfNeedRetryByError(error, message, op)) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Get a response for the {} request {} error "
+                                        + "TransactionCoordinatorNotFound and 
try it again",
+                                BaseCommand.Type.END_TXN.name(), requestId);
+                    }
+                    pendingRequests.put(requestId, op);
+                    timer.newTimeout(timeout -> {
+                                internalPinnedExecutor.execute(() -> {
+                                    if 
(!pendingRequests.containsKey(requestId)) {
+                                        if (LOG.isDebugEnabled()) {
+                                            LOG.debug("The request {} already 
timeout", requestId);
+                                        }
+                                        return;
+                                    }
+                                    checkStateAndSendRequest(op);
+                                });
+                            }
+                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                    return;
+                }
+                LOG.error("Got {} response for request {} error {}", 
BaseCommand.Type.END_TXN.name(),
+                        requestId, error);
 
-        onResponse(op);
+            }
+            onResponse(op);
+        });
     }
 
-    private void handleTransactionFailOp(ServerError error, String message, 
OpBase<?> op) {
-        if (error == ServerError.TransactionCoordinatorNotFound && getState() 
!= State.Connecting) {
-            connectionHandler.reconnectLater(new 
TransactionCoordinatorClientException
-                    .CoordinatorNotFoundException(message));
+
+    private boolean checkIfNeedRetryByError(ServerError error, String message, 
OpBase<?> op) {
+        if (error == ServerError.TransactionCoordinatorNotFound) {
+            if (getState() != State.Connecting) {
+                connectionHandler.reconnectLater(new 
TransactionCoordinatorClientException
+                        .CoordinatorNotFoundException(message));
+            }
+            return true;
         }
 
         if (op != null) {
             op.callback.completeExceptionally(getExceptionByServerError(error, 
message));
         }
+        return false;
     }
 
     private static abstract class OpBase<T> {
         protected ByteBuf cmd;
         protected CompletableFuture<T> callback;
+        protected Backoff backoff;
 
         abstract void recycle();
     }
 
     private static class OpForTxnIdCallBack extends OpBase<TxnID> {
 
-        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> 
callback) {
+        static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> 
callback, PulsarClientImpl client) {
             OpForTxnIdCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
+            op.backoff = new BackoffBuilder()
+                    
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+                            TimeUnit.NANOSECONDS)
+                    
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, 
TimeUnit.NANOSECONDS)
+                    .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                    .create();
             return op;
         }
 
@@ -375,6 +545,9 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
 
         @Override
         void recycle() {
+            this.backoff = null;
+            this.cmd = null;
+            this.callback = null;
             recyclerHandle.recycle(this);
         }
 
@@ -389,18 +562,29 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
 
     private static class OpForVoidCallBack extends OpBase<Void> {
 
-        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> 
callback) {
+
+        static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> 
callback, PulsarClientImpl client) {
             OpForVoidCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
+            op.backoff = new BackoffBuilder()
+                    
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
+                            TimeUnit.NANOSECONDS)
+                    
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, 
TimeUnit.NANOSECONDS)
+                    .setMandatoryStop(0, TimeUnit.MILLISECONDS)
+                    .create();
             return op;
         }
+
         private OpForVoidCallBack(Recycler.Handle<OpForVoidCallBack> 
recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
         @Override
         void recycle() {
+            this.backoff = null;
+            this.cmd = null;
+            this.callback = null;
             recyclerHandle.recycle(this);
         }
 
@@ -433,9 +617,6 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
     }
 
     private boolean canSendRequest(CompletableFuture<?> callback) {
-        if (!isValidHandlerState(callback)) {
-            return false;
-        }
         try {
             if (blockIfReachMaxPendingOps) {
                 semaphore.acquire();
@@ -453,81 +634,89 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
         return true;
     }
 
-    private boolean isValidHandlerState(CompletableFuture<?> callback) {
+    private void checkStateAndSendRequest(OpBase<?> op) {
         switch (getState()) {
             case Ready:
-                return true;
+                ClientCnx cnx = cnx();
+                if (cnx != null) {
+                    op.cmd.retain();
+                    cnx.ctx().writeAndFlush(op.cmd, cnx().ctx().voidPromise());
+                } else {
+                    LOG.error("The cnx was null when the TC handler was 
ready", new NullPointerException());
+                }
+                break;
             case Connecting:
-                callback.completeExceptionally(
-                        new 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
-                                "Transaction meta store handler for tcId "
-                                + transactionCoordinatorId
-                                + " is connecting now, please try later."));
-                return false;
+                break;
             case Closing:
             case Closed:
-                callback.completeExceptionally(
+                op.callback.completeExceptionally(
                         new 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 "Transaction meta store handler for tcId "
                                         + transactionCoordinatorId
                                         + " is closing or closed."));
-                return false;
+                onResponse(op);
+                break;
             case Failed:
             case Uninitialized:
-                callback.completeExceptionally(
+                op.callback.completeExceptionally(
                         new 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 "Transaction meta store handler for tcId "
                                         + transactionCoordinatorId
                                         + " not connected."));
-                return false;
+                onResponse(op);
+                break;
             default:
-                callback.completeExceptionally(
+                op.callback.completeExceptionally(
                         new 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 transactionCoordinatorId));
-                return false;
+                onResponse(op);
+                break;
         }
     }
 
     @Override
     public void run(Timeout timeout) throws Exception {
-        if (timeout.isCancelled()) {
-            return;
-        }
-        long timeToWaitMs;
-        if (getState() == State.Closing || getState() == State.Closed) {
-            return;
-        }
-        RequestTime peeked = timeoutQueue.peek();
-        while (peeked != null && peeked.creationTimeMs + 
client.getConfiguration().getOperationTimeoutMs()
-                - System.currentTimeMillis() <= 0) {
-            RequestTime lastPolled = timeoutQueue.poll();
-            if (lastPolled != null) {
-                OpBase<?> op = pendingRequests.remove(lastPolled.requestId);
-                if (op != null && !op.callback.isDone()) {
-                    op.callback.completeExceptionally(new 
PulsarClientException.TimeoutException(
-                            "Could not get response from transaction meta 
store within given timeout."));
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Transaction coordinator request {} is 
timeout.", lastPolled.requestId);
+        internalPinnedExecutor.execute(() -> {
+            if (timeout.isCancelled()) {
+                return;
+            }
+            long timeToWaitMs;
+            if (getState() == State.Closing || getState() == State.Closed) {
+                return;
+            }
+            RequestTime peeked = timeoutQueue.peek();
+            while (peeked != null && peeked.creationTimeMs + 
client.getConfiguration().getOperationTimeoutMs()
+                    - System.currentTimeMillis() <= 0) {
+                RequestTime lastPolled = timeoutQueue.poll();
+                if (lastPolled != null) {
+                    OpBase<?> op = 
pendingRequests.remove(lastPolled.requestId);
+                    if (op != null && !op.callback.isDone()) {
+                        op.callback.completeExceptionally(new 
PulsarClientException.TimeoutException(
+                                "Could not get response from transaction meta 
store within given timeout."));
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Transaction coordinator request {} is 
timeout.", lastPolled.requestId);
+                        }
+                        onResponse(op);
                     }
-                    onResponse(op);
+                } else {
+                    break;
                 }
-            } else {
-                break;
+                peeked = timeoutQueue.peek();
             }
-            peeked = timeoutQueue.peek();
-        }
 
-        if (peeked == null) {
-            timeToWaitMs = client.getConfiguration().getOperationTimeoutMs();
-        } else {
-            long diff = (peeked.creationTimeMs + 
client.getConfiguration().getOperationTimeoutMs()) - System.currentTimeMillis();
-            if (diff <= 0) {
+            if (peeked == null) {
                 timeToWaitMs = 
client.getConfiguration().getOperationTimeoutMs();
             } else {
-                timeToWaitMs = diff;
+                long diff = (peeked.creationTimeMs + 
client.getConfiguration().getOperationTimeoutMs())
+                        - System.currentTimeMillis();
+                if (diff <= 0) {
+                    timeToWaitMs = 
client.getConfiguration().getOperationTimeoutMs();
+                } else {
+                    timeToWaitMs = diff;
+                }
             }
-        }
-        requestTimeout = client.timer().newTimeout(this, timeToWaitMs, 
TimeUnit.MILLISECONDS);
+            requestTimeout = client.timer().newTimeout(this, timeToWaitMs, 
TimeUnit.MILLISECONDS);
+        });
     }
 
     private ClientCnx cnx() {

Reply via email to