This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c14d079  [Transaction] Transaction coordinator fence mechanism. 
(#11357)
c14d079 is described below

commit c14d079fa992d308e5ae04f7e7d0b9ada4f28cac
Author: congbo <[email protected]>
AuthorDate: Fri Oct 8 14:57:57 2021 +0800

    [Transaction] Transaction coordinator fence mechanism. (#11357)
---
 .../org/apache/pulsar/broker/PulsarService.java    |   1 -
 .../broker/TransactionMetadataStoreService.java    | 220 +++++++++++++++------
 .../broker/admin/impl/PersistentTopicsBase.java    |  31 ++-
 .../pulsar/broker/service/BrokerService.java       |  13 ++
 .../apache/pulsar/broker/service/ServerCnx.java    | 140 ++++++++++---
 .../stats/prometheus/TransactionAggregator.java    |  13 +-
 .../broker/admin/v3/AdminApiTransactionTest.java   |   2 +
 .../TransactionMetadataStoreServiceTest.java       |  36 ++--
 .../broker/stats/TransactionMetricsTest.java       |  36 ++--
 .../TransactionClientReconnectTest.java            | 180 ++++++++++++++---
 .../broker/transaction/TransactionTestBase.java    |   2 +-
 .../TransactionMetaStoreAssignmentTest.java        | 133 ++++++++++---
 pulsar-client-cpp/lib/Commands.cc                  |   3 +
 .../client/impl/TransactionMetaStoreHandler.java   |  97 ++++++---
 .../pulsar/common/events/EventsTopicNames.java     |   4 +
 .../apache/pulsar/common/protocol/Commands.java    |  13 +-
 .../pulsar/common/protocol/PulsarDecoder.java      |  11 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   7 +
 18 files changed, 733 insertions(+), 209 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 10b2e31..5fc8d77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -713,7 +713,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
                 transactionMetadataStoreService = new 
TransactionMetadataStoreService(TransactionMetadataStoreProvider
                         
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
                         transactionBufferClient, transactionTimer);
-                transactionMetadataStoreService.start();
 
                 transactionBufferProvider = TransactionBufferProvider
                         
.newProvider(config.getTransactionBufferProviderClassName());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 899a465..31a8714 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -25,13 +25,17 @@ import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import 
org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
 import 
org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
 import 
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
@@ -47,6 +51,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
@@ -74,6 +79,13 @@ public class TransactionMetadataStoreService {
     private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
     private static final long endTransactionRetryIntervalTime = 1000;
     private final Timer transactionOpRetryTimer;
+    // this semaphore for loading one transaction coordinator with the same tc 
id on the same time
+    private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
+    // one connect request open the transactionMetaStore the other request 
will add to the queue, when the open op
+    // finished the request will be poll and complete the future
+    private final 
ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> 
pendingConnectRequests;
+
+    private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
 
     public TransactionMetadataStoreService(TransactionMetadataStoreProvider 
transactionMetadataStoreProvider,
                                            PulsarService pulsarService, 
TransactionBufferClient tbClient,
@@ -84,48 +96,53 @@ public class TransactionMetadataStoreService {
         this.tbClient = tbClient;
         this.timeoutTrackerFactory = new 
TransactionTimeoutTrackerFactoryImpl(this, timer);
         this.transactionOpRetryTimer = timer;
+        this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
+        this.pendingConnectRequests = new ConcurrentLongHashMap<>();
     }
 
+    @Deprecated
     public void start() {
         
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new 
NamespaceBundleOwnershipListener() {
+
             @Override
             public void onLoad(NamespaceBundle bundle) {
                 
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
-                    .whenComplete((topics, ex) -> {
-                        if (ex == null) {
-                            for (String topic : topics) {
-                                TopicName name = TopicName.get(topic);
-                                if 
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-                                        
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
-                                        && name.isPartitioned()) {
-                                    
addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
+                        .whenComplete((topics, ex) -> {
+                            if (ex == null) {
+                                for (String topic : topics) {
+                                    TopicName name = TopicName.get(topic);
+                                    if 
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
+                                            
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
+                                            && name.isPartitioned()) {
+                                        
handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
+                                    }
                                 }
+                            } else {
+                                LOG.error("Failed to get owned topic list when 
triggering on-loading bundle {}.",
+                                        bundle, ex);
                             }
-                        } else {
-                            LOG.error("Failed to get owned topic list when 
triggering on-loading bundle {}.",
-                                    bundle, ex);
-                        }
-                    });
+                        });
             }
+
             @Override
             public void unLoad(NamespaceBundle bundle) {
                 
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
-                    .whenComplete((topics, ex) -> {
-                        if (ex == null) {
-                            for (String topic : topics) {
-                                TopicName name = TopicName.get(topic);
-                                if 
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-                                        
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
-                                        && name.isPartitioned()) {
-                                    removeTransactionMetadataStore(
-                                            
TransactionCoordinatorID.get(name.getPartitionIndex()));
+                        .whenComplete((topics, ex) -> {
+                            if (ex == null) {
+                                for (String topic : topics) {
+                                    TopicName name = TopicName.get(topic);
+                                    if 
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
+                                            
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
+                                            && name.isPartitioned()) {
+                                        removeTransactionMetadataStore(
+                                                
TransactionCoordinatorID.get(name.getPartitionIndex()));
+                                    }
                                 }
+                            } else {
+                                LOG.error("Failed to get owned topic list 
error when triggering un-loading bundle {}.",
+                                        bundle, ex);
                             }
-                        } else {
-                            LOG.error("Failed to get owned topic list error 
when triggering un-loading bundle {}.",
-                                    bundle, ex);
-                        }
-                     });
+                        });
             }
             @Override
             public boolean test(NamespaceBundle namespaceBundle) {
@@ -134,44 +151,120 @@ public class TransactionMetadataStoreService {
         });
     }
 
-    public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        pulsarService.getBrokerService()
-                
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX
 + tcId))
-                .whenComplete((v, e) -> {
-                    if (e != null) {
-                        LOG.error("Add transaction metadata store with id {} 
error", tcId.getId(), e);
-                    } else {
-                        TransactionTimeoutTracker timeoutTracker = 
timeoutTrackerFactory.newTracker(tcId);
-                        TransactionRecoverTracker recoverTracker =
-                                new 
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
-                                        timeoutTracker, tcId.getId());
-                        transactionMetadataStoreProvider.openStore(tcId, 
pulsarService.getManagedLedgerFactory(), v,
-                                timeoutTracker, recoverTracker)
-                                .whenComplete((store, ex) -> {
-                                    if (ex != null) {
-                                        LOG.error("Add transaction metadata 
store with id {} error", tcId.getId(), ex);
-                                    } else {
-                                        stores.put(tcId, store);
-                                        LOG.info("Added new transaction meta 
store {}", tcId);
-                                    }
-                                });
+    public CompletableFuture<Void> 
handleTcClientConnect(TransactionCoordinatorID tcId) {
+        if (stores.get(tcId) != null) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return 
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+                    .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) 
tcId.getId()).toString()).thenCompose(v -> {
+                        CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+                final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+                        .computeIfAbsent(tcId.getId(), (id) -> new 
Semaphore(1));
+                Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+                        .computeIfAbsent(tcId.getId(), (id) -> new 
ConcurrentLinkedDeque<>());
+                if (tcLoadSemaphore.tryAcquire()) {
+                    // when tcLoadSemaphore.release(), this command will 
acquire semaphore, so we should jude the store
+                    // exist again.
+                    if (stores.get(tcId) != null) {
+                        return CompletableFuture.completedFuture(null);
                     }
-        });
-    }
 
-    public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        TransactionMetadataStore metadataStore = stores.remove(tcId);
-        if (metadataStore != null) {
-            metadataStore.closeAsync().whenComplete((v, ex) -> {
-                if (ex != null) {
-                    LOG.error("Close transaction metadata store with id " + 
tcId, ex);
+                    openTransactionMetadataStore(tcId).thenAccept((store) -> {
+                        stores.put(tcId, store);
+                        LOG.info("Added new transaction meta store {}", tcId);
+                        long endTime = System.currentTimeMillis() + 
HANDLE_PENDING_CONNECT_TIME_OUT;
+                        while (true) {
+                            // prevent thread in a busy loop.
+                            if (System.currentTimeMillis() < endTime) {
+                                CompletableFuture<Void> future = deque.poll();
+                                if (future != null) {
+                                    // complete queue request future
+                                    future.complete(null);
+                                } else {
+                                    break;
+                                }
+                            } else {
+                                deque.clear();
+                                break;
+                            }
+                        }
+
+                        completableFuture.complete(null);
+                        tcLoadSemaphore.release();
+                    }).exceptionally(e -> {
+                        completableFuture.completeExceptionally(e.getCause());
+                        // release before handle request queue, in order to 
client reconnect infinite loop
+                        tcLoadSemaphore.release();
+                        long endTime = System.currentTimeMillis() + 
HANDLE_PENDING_CONNECT_TIME_OUT;
+                        while (true) {
+                            // prevent thread in a busy loop.
+                            if (System.currentTimeMillis() < endTime) {
+                                CompletableFuture<Void> future = deque.poll();
+                                if (future != null) {
+                                    // this means that this tc client 
connection connect fail
+                                    future.completeExceptionally(e);
+                                } else {
+                                    break;
+                                }
+                            } else {
+                                deque.clear();
+                                break;
+                            }
+                        }
+                        LOG.error("Add transaction metadata store with id {} 
error", tcId.getId(), e);
+                        return null;
+                    });
                 } else {
-                    LOG.info("Removed and closed transaction meta store {}", 
tcId);
+                    // only one command can open transaction metadata store,
+                    // other will be added to the deque, when the op of 
openTransactionMetadataStore finished
+                    // then handle the requests witch in the queue
+                    deque.add(completableFuture);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Handle tc client connect added into pending 
queue! tcId : {}", tcId.toString());
+                    }
                 }
+                return completableFuture;
             });
         }
     }
 
+    public CompletableFuture<TransactionMetadataStore> 
openTransactionMetadataStore(TransactionCoordinatorID tcId) {
+        return pulsarService.getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl
+                        .TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> {
+                            TransactionTimeoutTracker timeoutTracker = 
timeoutTrackerFactory.newTracker(tcId);
+                            TransactionRecoverTracker recoverTracker =
+                                    new 
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+                                    timeoutTracker, tcId.getId());
+                            return transactionMetadataStoreProvider
+                                    .openStore(tcId, 
pulsarService.getManagedLedgerFactory(), v,
+                                            timeoutTracker, recoverTracker);
+                });
+    }
+
+    public CompletableFuture<Void> 
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
+        final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+                .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
+        if (tcLoadSemaphore.tryAcquire()) {
+            TransactionMetadataStore metadataStore = stores.remove(tcId);
+            if (metadataStore != null) {
+                metadataStore.closeAsync().whenComplete((v, ex) -> {
+                    if (ex != null) {
+                        LOG.error("Close transaction metadata store with id " 
+ tcId, ex);
+                    } else {
+                        LOG.info("Removed and closed transaction meta store 
{}", tcId);
+                    }
+                });
+            }
+            tcLoadSemaphore.release();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return FutureUtil.failedFuture(
+                    new ServiceUnitNotReadyException("Could not remove "
+                            + "TransactionMetadataStore, it is doing other 
operations!"));
+        }
+    }
+
     public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID 
tcId, long timeoutInMills) {
         TransactionMetadataStore store = stores.get(tcId);
         if (store == null) {
@@ -323,6 +416,14 @@ public class TransactionMetadataStoreService {
         return completableFuture;
     }
 
+    // when managedLedger fence will remove this tc and reload
+    public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) {
+        if (e.getCause() instanceof 
ManagedLedgerException.ManagedLedgerFencedException
+                || e instanceof 
ManagedLedgerException.ManagedLedgerFencedException) {
+            removeTransactionMetadataStore(tcId);
+        }
+    }
+
     public void endTransactionForTimeout(TxnID txnID) {
         getTxnMeta(txnID).thenCompose(txnMeta -> {
             if (txnMeta.status() == TxnStatus.OPEN) {
@@ -400,13 +501,14 @@ public class TransactionMetadataStoreService {
     }
 
     private static boolean isRetryableException(Throwable e) {
-        return e instanceof TransactionMetadataStoreStateException
+        return (e instanceof TransactionMetadataStoreStateException
                 || e instanceof RequestTimeoutException
                 || e instanceof ManagedLedgerException
                 || e instanceof BrokerPersistenceException
                 || e instanceof LookupException
                 || e instanceof ReachMaxPendingOpsException
-                || e instanceof ConnectException;
+                || e instanceof ConnectException)
+                && !(e instanceof 
ManagedLedgerException.ManagedLedgerFencedException);
     }
 
     private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID 
txnID, int txnAction) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aa40644..0271abd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.admin.impl;
 
 import static 
org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
+import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.zafarkhaja.semver.Version;
 import com.google.common.collect.Lists;
@@ -130,6 +131,7 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -675,7 +677,11 @@ public class PersistentTopicsBase extends AdminResource {
         }
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
-            internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
+            if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+                internalUnloadTransactionCoordinator(asyncResponse, 
authoritative);
+            } else {
+                internalUnloadNonPartitionedTopic(asyncResponse, 
authoritative);
+            }
         } else {
             getPartitionedTopicMetadataAsync(topicName, authoritative, false)
                     .thenAccept(meta -> {
@@ -927,6 +933,29 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
+    private void internalUnloadTransactionCoordinator(AsyncResponse 
asyncResponse, boolean authoritative) {
+        try {
+            validateTopicOperation(topicName, TopicOperation.UNLOAD);
+        } catch (Exception e) {
+            log.error("[{}] Failed to unload tc {},{}", clientAppId(), 
topicName.getPartitionIndex(), e.getMessage());
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(v -> pulsar()
+                .getTransactionMetadataStoreService()
+                
.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex())))
+                .thenRun(() -> {
+                    log.info("[{}] Successfully unloaded tc {}", 
clientAppId(), topicName.getPartitionIndex());
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(ex -> {
+                    log.error("[{}] Failed to unload tc {}, {}", 
clientAppId(), topicName.getPartitionIndex(),
+                    ex.getMessage());
+            asyncResponse.resume(ex.getCause());
+            return null;
+        });
+    }
+
     protected void internalDeleteTopic(boolean authoritative, boolean force, 
boolean deleteSchema) {
         if (force) {
             internalDeleteTopicForcefully(authoritative, deleteSchema);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 58b3f43..bd3c1e9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -91,6 +91,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.BundlesQuotas;
@@ -160,6 +161,7 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1675,6 +1677,17 @@ public class BrokerService implements Closeable {
                                 : CompletableFuture.completedFuture(null)));
             }
         });
+        if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
+                && 
serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
+            TransactionMetadataStoreService metadataStoreService =
+                    this.getPulsar().getTransactionMetadataStoreService();
+            // if the store belongs to this bundle, remove and close the store
+            
this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store
 ->
+                    
serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN
+                            .getPartition((int) 
(store.getTransactionCoordinatorID().getId()))))
+                    .map(TransactionMetadataStore::getTransactionCoordinatorID)
+                    .forEach(tcId -> 
closeFutures.add(metadataStoreService.removeTransactionMetadataStore(tcId)));
+        }
 
         return FutureUtil.waitForAll(closeFutures).thenApply(v -> 
closeFutures.size());
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3e07eec..539fcd6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -114,6 +114,7 @@ import org.apache.pulsar.common.api.proto.CommandSend;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.CommandTcClientConnect;
 import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
@@ -1933,23 +1934,62 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     @Override
-    protected void handleNewTxn(CommandNewTxn command) {
+    protected void handleTcClientConnect(CommandTcClientConnect command) {
         final long requestId = command.getRequestId();
         final TransactionCoordinatorID tcId = 
TransactionCoordinatorID.get(command.getTcId());
         if (log.isDebugEnabled()) {
-            log.debug("Receive new txn request {} to transaction meta store {} 
from {}.",
+            log.debug("Receive tc client connect request {} to transaction 
meta store {} from {}.",
                     requestId, tcId, remoteAddress);
         }
+
+        if (!checkTransactionEnableAndSenError(requestId)) {
+            return;
+        }
+
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
-        if (transactionMetadataStoreService == null) {
-            CoordinatorException.CoordinatorNotFoundException ex =
-                    new CoordinatorException.CoordinatorNotFoundException(
-                                               "Transaction manager is not 
started or not enabled");
-            ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
-                    BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage()));
+
+        
transactionMetadataStoreService.handleTcClientConnect(tcId).thenAccept(connection
 -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Handle tc client connect request {} to transaction 
meta store {} from {} success.",
+                        requestId, tcId, remoteAddress);
+            }
+            commandSender.sendSuccessResponse(requestId);
+        }).exceptionally(e -> {
+            log.error("Handle tc client connect request {} to transaction meta 
store {} from {} fail.",
+                    requestId, tcId, remoteAddress, e.getCause());
+            commandSender.sendErrorResponse(requestId, 
BrokerServiceException.getClientErrorCode(e), e.getMessage());
+            return null;
+        });
+    }
+
+    private boolean checkTransactionEnableAndSenError(long requestId) {
+        if 
(!service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
+            BrokerServiceException.NotAllowedException ex =
+                    new BrokerServiceException.NotAllowedException(
+                            "Transaction manager is not not enabled");
+            commandSender.sendErrorResponse(requestId, 
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    @Override
+    protected void handleNewTxn(CommandNewTxn command) {
+        final long requestId = command.getRequestId();
+        final TransactionCoordinatorID tcId = 
TransactionCoordinatorID.get(command.getTcId());
+        if (log.isDebugEnabled()) {
+            log.debug("Receive new txn request {} to transaction meta store {} 
from {}.",
+                    requestId, tcId, remoteAddress);
+        }
+
+        if (!checkTransactionEnableAndSenError(requestId)) {
             return;
         }
+
+        TransactionMetadataStoreService transactionMetadataStoreService =
+                service.pulsar().getTransactionMetadataStoreService();
         transactionMetadataStoreService.newTransaction(tcId, 
command.getTxnTtlSeconds())
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
@@ -1962,8 +2002,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     if (log.isDebugEnabled()) {
                         log.debug("Send response error for new txn request 
{}", requestId, ex);
                     }
+
                     ctx.writeAndFlush(Commands.newTxnResponse(requestId, 
tcId.getId(),
                             BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage()));
+                    transactionMetadataStoreService.handleOpFail(ex, tcId);
                 }
             }));
     }
@@ -1971,12 +2013,20 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     @Override
     protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
         final TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final TransactionCoordinatorID tcId = 
TransactionCoordinatorID.get(command.getTxnidMostBits());
         final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
             command.getPartitionsList().forEach(partion ->
                     log.debug("Receive add published partition to txn request 
{} "
                             + "from {} with txnId {}, topic: [{}]", requestId, 
remoteAddress, txnID, partion));
         }
+
+        if (!checkTransactionEnableAndSenError(requestId)) {
+            return;
+        }
+
+        TransactionMetadataStoreService transactionMetadataStoreService =
+                service.pulsar().getTransactionMetadataStoreService();
         
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
                 command.getPartitionsList())
                 .whenComplete(((v, ex) -> {
@@ -1990,10 +2040,18 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response error for add published 
partition to txn request {}", requestId,
                                     ex);
+                        }
+
+                        if (ex instanceof 
CoordinatorException.CoordinatorNotFoundException) {
+                            
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
+                                    
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
+                        } else {
+                            
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
+                                    
BrokerServiceException.getClientErrorCode(ex.getCause()),
+                                    ex.getCause().getMessage()));
+                        }
+                        transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
-                    
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getMostSigBits(),
-                            BrokerServiceException.getClientErrorCode(ex), 
ex.getMessage()));
-                }
             }));
     }
 
@@ -2002,16 +2060,35 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         final long requestId = command.getRequestId();
         final int txnAction = command.getTxnAction().getValue();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), 
command.getTxnidLeastBits());
+        final TransactionCoordinatorID tcId = 
TransactionCoordinatorID.get(command.getTxnidMostBits());
+
+        if (!checkTransactionEnableAndSenError(requestId)) {
+            return;
+        }
 
-        service.pulsar().getTransactionMetadataStoreService()
+        TransactionMetadataStoreService transactionMetadataStoreService =
+                service.pulsar().getTransactionMetadataStoreService();
+
+        transactionMetadataStoreService
                 .endTransaction(txnID, txnAction, false)
-                .thenRun(() -> 
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
-                        txnID.getLeastSigBits(), txnID.getMostSigBits())))
-                .exceptionally(throwable -> {
-                    log.error("Send response error for end txn request.", 
throwable);
-                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, 
txnID.getMostSigBits(),
-                            
BrokerServiceException.getClientErrorCode(throwable.getCause()), 
throwable.getMessage()));
-                    return null; });
+                .whenComplete((v, ex) -> {
+                    if (ex == null) {
+                        ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
+                                txnID.getLeastSigBits(), 
txnID.getMostSigBits()));
+                    } else {
+                        log.error("Send response error for end txn request.", 
ex);
+
+                        if (ex instanceof 
CoordinatorException.CoordinatorNotFoundException) {
+                            
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+                                    
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
+                        } else {
+                            
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+                                    
BrokerServiceException.getClientErrorCode(ex.getCause()),
+                                    ex.getCause().getMessage()));
+                        }
+                        transactionMetadataStoreService.handleOpFail(ex, tcId);
+                    }
+                });
     }
 
     @Override
@@ -2198,7 +2275,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     requestId, remoteAddress, txnID);
         }
 
-        
service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
+        final TransactionCoordinatorID tcId = 
TransactionCoordinatorID.get(command.getTxnidMostBits());
+
+        if (!checkTransactionEnableAndSenError(requestId)) {
+            return;
+        }
+
+        TransactionMetadataStoreService transactionMetadataStoreService =
+                service.pulsar().getTransactionMetadataStoreService();
+
+        transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
                 
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
                 .whenComplete(((v, ex) -> {
                     if (ex == null) {
@@ -2214,9 +2300,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                             log.debug("Send response error for add published 
partition to txn request {}",
                                     requestId, ex);
                         }
-                        
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
-                                txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex),
-                                ex.getMessage()));
+
+                        if (ex instanceof 
CoordinatorException.CoordinatorNotFoundException) {
+                            
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                                    txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex),
+                                    ex.getMessage()));
+                        } else {
+                            
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+                                    txnID.getMostSigBits(), 
BrokerServiceException.getClientErrorCode(ex.getCause()),
+                                    ex.getCause().getMessage()));
+                        }
+                        transactionMetadataStoreService.handleOpFail(ex, tcId);
                     }
                 }));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 5ea2bea..142ec48 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import io.netty.util.concurrent.FastThreadLocal;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -26,6 +27,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
@@ -62,10 +64,13 @@ public class TransactionAggregator {
                             
topic.getSubscriptions().values().forEach(subscription -> {
                                 try {
                                     localManageLedgerStats.get().reset();
-                                    ManagedLedger managedLedger =
-                                            ((PersistentSubscription) 
subscription).getPendingAckManageLedger().get();
-                                    generateManageLedgerStats(managedLedger,
-                                            stream, cluster, namespace, name, 
subscription.getName());
+                                    if 
(!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))) {
+                                        ManagedLedger managedLedger =
+                                                ((PersistentSubscription) 
subscription)
+                                                        
.getPendingAckManageLedger().get();
+                                        
generateManageLedgerStats(managedLedger,
+                                                stream, cluster, namespace, 
name, subscription.getName());
+                                    }
                                 } catch (Exception e) {
                                     log.warn("Transaction pending ack generate 
managedLedgerStats fail!", e);
                                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 45b11de..6f9a189 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -396,6 +396,8 @@ public class AdminApiTransactionTest extends 
MockedPulsarServiceBaseTest {
     private void initTransaction(int coordinatorSize) throws Exception {
         
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 coordinatorSize);
         
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+        pulsarClient.close();
         Awaitility.await().until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== coordinatorSize);
         pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 215dffe..983f0d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -28,12 +28,17 @@ import java.util.List;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
@@ -57,6 +62,10 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         ServiceConfiguration configuration = getDefaultConf();
         configuration.setTransactionCoordinatorEnabled(true);
         super.baseSetup(configuration);
+        admin.tenants().createTenant("pulsar", new 
TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
+        
admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
     }
 
     @AfterMethod(alwaysRun = true)
@@ -66,11 +75,12 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     }
 
     @Test
-    public void testAddAndRemoveTransactionMetadataStore() {
+    public void testAddAndRemoveTransactionMetadataStore() throws Exception {
         TransactionMetadataStoreService transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
         Assert.assertNotNull(transactionMetadataStoreService);
 
-        
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
+        
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await().until(() ->
                 transactionMetadataStoreService.getStores().size() == 1);
 
@@ -82,9 +92,9 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     @Test
     public void testNewTransaction() throws Exception {
         TransactionMetadataStoreService transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
-        
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
-        
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
+        
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
+        
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(1));
+        
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(2));
         Awaitility.await().until(() ->
                 transactionMetadataStoreService.getStores().size() == 3);
         checkTransactionMetadataStoreReady((MLTransactionMetadataStore) 
pulsar.getTransactionMetadataStoreService()
@@ -108,7 +118,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     @Test
     public void testAddProducedPartitionToTxn() throws Exception {
         TransactionMetadataStoreService transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
-        
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await().until(() ->
                 transactionMetadataStoreService.getStores().size() == 1);
 
@@ -132,7 +142,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     @Test
     public void testAddAckedPartitionToTxn() throws Exception {
         TransactionMetadataStoreService transactionMetadataStoreService = 
pulsar.getTransactionMetadataStoreService();
-        
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0)).get();
         Awaitility.await().until(() ->
                 transactionMetadataStoreService.getStores().size() == 1);
 
@@ -154,7 +164,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
 
     @Test
     public void testTimeoutTracker() throws Exception {
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await()
                 .until(() -> pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
@@ -183,7 +193,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
 
     @Test
     public void testTimeoutTrackerExpired() throws Exception {
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await().until(() -> 
pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
         MLTransactionMetadataStore transactionMetadataStore =
@@ -213,7 +223,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
 
     @Test
     public void testTimeoutTrackerMultiThreading() throws Exception {
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await()
                 .until(() -> pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
@@ -284,7 +294,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     @Test
     public void transactionTimeoutRecoverTest() throws Exception {
         int timeout = 2000;
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await()
                 .until(() -> pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
@@ -298,7 +308,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
         pulsar.getTransactionMetadataStoreService()
                 
.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
 
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await()
                 .until(() -> pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
@@ -324,7 +334,7 @@ public class TransactionMetadataStoreServiceTest extends 
BrokerTestBase {
     @Test(dataProvider = "txnStatus")
     public void testEndTransactionOpRetry(TxnStatus txnStatus) throws 
Exception {
         int timeOut = 3000;
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
         Awaitility.await()
                 .until(() -> pulsar.getTransactionMetadataStoreService()
                         .getStores().get(TransactionCoordinatorID.get(0)) != 
null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index de7a12d2..a209d89 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
-import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
@@ -61,6 +60,12 @@ public class TransactionMetricsTest extends BrokerTestBase {
         ServiceConfiguration serviceConfiguration = getDefaultConf();
         serviceConfiguration.setTransactionCoordinatorEnabled(true);
         super.baseSetup(serviceConfiguration);
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                TenantInfo.builder()
+                        .adminRoles(Sets.newHashSet("appid1"))
+                        .allowedClusters(Sets.newHashSet("test"))
+                        .build());
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
     }
 
     @AfterMethod(alwaysRun = true)
@@ -72,12 +77,14 @@ public class TransactionMetricsTest extends BrokerTestBase {
     @Test
     public void testTransactionCoordinatorMetrics() throws Exception{
         long timeout = 10000;
-        TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(1);
-        TransactionCoordinatorID transactionCoordinatorIDTwo = 
TransactionCoordinatorID.get(2);
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDTwo);
+        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 2);
+        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+        TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
+        TransactionCoordinatorID transactionCoordinatorIDTwo = 
TransactionCoordinatorID.get(1);
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDTwo);
 
-        Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
+        Awaitility.await().until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== 2);
         pulsar.getTransactionMetadataStoreService().getStores()
                 
.get(transactionCoordinatorIDOne).newTransaction(timeout).get();
@@ -92,7 +99,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
         Collection<PrometheusMetricsTest.Metric> metric = 
metrics.get("pulsar_txn_active_count");
         assertEquals(metric.size(), 2);
         metric.forEach(item -> {
-            if ("1".equals(item.tags.get("coordinator_id"))) {
+            if ("0".equals(item.tags.get("coordinator_id"))) {
                 assertEquals(item.value, 1);
             } else {
                 assertEquals(item.value, 2);
@@ -108,8 +115,11 @@ public class TransactionMetricsTest extends BrokerTestBase 
{
         admin.namespaces().createNamespace(ns1);
         String topic = "persistent://" + ns1 + "/test_coordinator_metrics";
         String subName = "test_coordinator_metrics";
+
+        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(1);
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
         admin.topics().createNonPartitionedTopic(topic);
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
         Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
@@ -196,16 +206,12 @@ public class TransactionMetricsTest extends 
BrokerTestBase {
         String topic = "persistent://" + ns1 + "/test_managed_ledger_metrics";
         String subName = "test_managed_ledger_metrics";
         admin.topics().createNonPartitionedTopic(topic);
-        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
-                TenantInfo.builder()
-                        .adminRoles(Sets.newHashSet("appid1"))
-                        .allowedClusters(Sets.newHashSet("test"))
-                        .build());
-        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 1);
+        
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
         TransactionCoordinatorID transactionCoordinatorIDOne = 
TransactionCoordinatorID.get(0);
-        
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
+        
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
         admin.topics().createSubscription(topic, subName, MessageId.earliest);
+
         Awaitility.await().atMost(2000,  TimeUnit.MILLISECONDS).until(() ->
                 pulsar.getTransactionMetadataStoreService().getStores().size() 
== 1);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
index 8df470d..1f5ab15 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
@@ -19,27 +19,31 @@
 package org.apache.pulsar.broker.transaction;
 
 import com.google.common.collect.Sets;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.TransactionMetadataStoreService;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.FileAssert.fail;
 
@@ -80,67 +84,183 @@ public class TransactionClientReconnectTest extends 
TransactionTestBase {
     }
 
     @Test
-    public void testTransactionClientReconnectTest() throws 
PulsarClientException, ExecutionException, InterruptedException {
+    public void testTransactionNewReconnect() throws Exception {
+        start();
 
-        ((PulsarClientImpl) pulsarClient).getLookup()
-                
.getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
+        // when throw CoordinatorNotFoundException client will reconnect tc
+        try {
+            pulsarClient.newTransaction()
+                    .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+        }
+        reconnect();
 
-        Awaitility.await().until(() -> {
+        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+
+        // tc fence will remove this tc and reopen
+        try {
             pulsarClient.newTransaction()
                     .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
-            return true;
-        });
+            fail();
+        } catch (ExecutionException e) {
+            assertEquals(e.getCause().getMessage(),
+                    
"org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException:
 " +
+                            "java.lang.Exception: Attempted to use a fenced 
managed ledger");
+        }
 
-        TransactionImpl transaction = (TransactionImpl) 
pulsarClient.newTransaction()
-                .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
+        reconnect();
+    }
 
-        TransactionMetadataStoreService transactionMetadataStoreService =
-                
getPulsarServiceList().get(0).getTransactionMetadataStoreService();
+    @Test
+    public void testTransactionAddSubscriptionToTxnAsyncReconnect() throws 
Exception {
+        TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
+        start();
 
-        
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        try {
+            transactionCoordinatorClient.addSubscriptionToTxnAsync(new 
TxnID(0, 0), "test", "test").get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+        }
 
-        // transaction client will reconnect
+        reconnect();
+        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
         try {
-            pulsarClient.newTransaction()
-                    .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
+            transactionCoordinatorClient.addSubscriptionToTxnAsync(new 
TxnID(0, 0), "test", "test").get();
+            fail();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
+                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
+            } else {
+                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
+            }
+        }
+        reconnect();
+    }
+
+    @Test
+    public void testTransactionAbortToTxnAsyncReconnect() throws Exception {
+        TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
+        start();
+
+        try {
+            transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
             fail();
         } catch (ExecutionException e) {
             assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
         }
 
+        reconnect();
+        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
         try {
-            transaction.registerProducedTopic(RECONNECT_TOPIC).get();
+            transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
             fail();
         } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
+                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
+            } else {
+                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
+            }
         }
+        reconnect();
+    }
+
+    @Test
+    public void testTransactionCommitToTxnAsyncReconnect() throws Exception {
+        TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
+        start();
 
         try {
-            transaction.registerAckedTopic(RECONNECT_TOPIC, "test").get();
+            transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
             fail();
         } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
         }
 
+        reconnect();
+        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
         try {
-            transaction.commit().get();
+            transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
             fail();
         } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
+                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
+            } else {
+                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
+            }
         }
+        reconnect();
+    }
 
-        
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+    @Test
+    public void testTransactionAddPublishPartitionToTxnReconnect() throws 
Exception {
+        TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
+        start();
 
+        try {
+            transactionCoordinatorClient.addPublishPartitionToTxnAsync(new 
TxnID(0, 0),
+                    Collections.singletonList("test")).get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+        }
+
+        reconnect();
+        
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+        try {
+            transactionCoordinatorClient.addPublishPartitionToTxnAsync(new 
TxnID(0, 0),
+                    Collections.singletonList("test")).get();
+            fail();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof 
TransactionCoordinatorClientException.TransactionNotFoundException) {
+                assertEquals(e.getCause().getMessage(), "The transaction with 
this txdID `(0,0)`not found ");
+            } else {
+                assertEquals(e.getCause().getMessage(), "java.lang.Exception: 
Attempted to use a fenced managed ledger");
+            }
+        }
+        reconnect();
+    }
+
+    public void start() throws Exception {
         // wait transaction coordinator init success
         Awaitility.await().until(() -> {
-            pulsarClient.newTransaction()
-                    .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
+            try {
+                pulsarClient.newTransaction()
+                        .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
+            } catch (Exception e) {
+                return false;
+            }
             return true;
         });
-        transaction = (TransactionImpl) pulsarClient.newTransaction()
+        pulsarClient.newTransaction()
                 .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
-        transaction.registerProducedTopic(RECONNECT_TOPIC).get();
-        transaction.registerAckedTopic(RECONNECT_TOPIC, "test").get();
-        transaction.commit().get();
+
+        TransactionMetadataStoreService transactionMetadataStoreService =
+                
getPulsarServiceList().get(0).getTransactionMetadataStoreService();
+        // remove transaction metadata store
+        
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)).get();
+
+    }
+
+    public void fence(TransactionMetadataStoreService 
transactionMetadataStoreService) throws Exception {
+        Field field = ManagedLedgerImpl.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(((MLTransactionMetadataStore) 
transactionMetadataStoreService.getStores()
+                .get(TransactionCoordinatorID.get(0))).getManagedLedger(), 
ManagedLedgerImpl.State.Fenced);
+    }
+
+    public void reconnect() {
+        //reconnect
+        Awaitility.await().until(() -> {
+            try {
+                pulsarClient.newTransaction()
+                        .withTransactionTimeout(200, 
TimeUnit.MILLISECONDS).build().get();
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        });
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index f40f80f..622421b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -74,7 +74,7 @@ public abstract class TransactionTestBase extends 
TestRetrySupport {
     @Getter
     private final List<ServiceConfiguration> serviceConfigurationList = new 
ArrayList<>();
     @Getter
-    private final List<PulsarService> pulsarServiceList = new ArrayList<>();
+    protected final List<PulsarService> pulsarServiceList = new ArrayList<>();
 
     protected PulsarAdmin admin;
     protected PulsarClient pulsarClient;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index ce173ba..1725305 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -18,57 +18,128 @@
  */
 package org.apache.pulsar.broker.transaction.coordinator;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-@Test(groups = "broker")
-public class TransactionMetaStoreAssignmentTest extends 
TransactionMetaStoreTestBase {
+public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
 
-    @Test(groups = "broker")
-    public void testTransactionMetaStoreAssignAndFailover() throws IOException 
{
+    @Override
+    @BeforeMethod(alwaysRun = true)
+    protected void setup() throws Exception {
+        setBrokerCount(3);
+        super.internalSetup();
+        String[] brokerServiceUrlArr = 
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length 
-1];
+        admin.clusters().createCluster(CLUSTER_NAME, 
ClusterData.builder().serviceUrl("http://localhost:"; + webServicePort).build());
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet(CLUSTER_NAME)));
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
 16);
+        pulsarClient.close();
+    }
 
-        Awaitility.await()
-                .untilAsserted(() -> {
-                    int transactionMetaStoreCount = 
Arrays.stream(pulsarServices)
-                            .mapToInt(pulsarService -> 
pulsarService.getTransactionMetadataStoreService().getStores().size())
-                            .sum();
-                    Assert.assertEquals(transactionMetaStoreCount, 16);
-                });
+    @Test
+    public void testTransactionMetaStoreAssignAndFailover() throws Exception {
+
+        pulsarClient = buildClient();
 
+        checkTransactionCoordinatorNum(16);
+
+        pulsarClient.close();
         PulsarService crashedMetaStore = null;
-        for (int i = pulsarServices.length - 1; i >= 0; i--) {
-            if 
(pulsarServices[i].getTransactionMetadataStoreService().getStores().size() > 0) 
{
-                crashedMetaStore = pulsarServices[i];
+        for (int i = pulsarServiceList.size() - 1; i >= 0; i--) {
+            if 
(pulsarServiceList.get(i).getTransactionMetadataStoreService().getStores().size()
 > 0) {
+                crashedMetaStore = pulsarServiceList.get(i);
                 break;
             }
         }
 
         Assert.assertNotNull(crashedMetaStore);
-        List<PulsarService> services = new ArrayList<>(pulsarServices.length - 
1);
-        for (PulsarService pulsarService : pulsarServices) {
-            if (pulsarService != crashedMetaStore) {
-                services.add(pulsarService);
-            }
-        }
-        pulsarServices = new PulsarService[pulsarServices.length - 1];
-        for (int i = 0; i < services.size(); i++) {
-            pulsarServices[i] = services.get(i);
-        }
+        pulsarServiceList.remove(crashedMetaStore);
         crashedMetaStore.close();
 
-        Awaitility.await()
+        pulsarClient = buildClient();
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                 .untilAsserted(() -> {
-                    int transactionMetaStoreCount2 = 
Arrays.stream(pulsarServices)
+                    int transactionMetaStoreCount2 = pulsarServiceList.stream()
                             .mapToInt(pulsarService -> 
pulsarService.getTransactionMetadataStoreService().getStores().size())
                             .sum();
                     Assert.assertEquals(transactionMetaStoreCount2, 16);
                 });
-        transactionCoordinatorClient.close();
+        pulsarClient.close();
+    }
+
+    @Test
+    public void testTransactionMetaStoreUnload() throws Exception {
+
+        pulsarClient = buildClient();
+        checkTransactionCoordinatorNum(16);
+
+        // close pulsar client will not init tc again
+        pulsarClient.close();
+
+        
admin.topics().unload(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+
+        for (int i = 0; i < 16; i++) {
+            final int f = i;
+            pulsarServiceList.forEach((pulsarService) -> pulsarService
+                    .getTransactionMetadataStoreService()
+                    
.removeTransactionMetadataStore(TransactionCoordinatorID.get(f)));
+        }
+        checkTransactionCoordinatorNum(0);
+        buildClient();
+        checkTransactionCoordinatorNum(16);
+
+        pulsarClient.close();
+
+    }
+
+    private void checkTransactionCoordinatorNum(int number) {
+        Awaitility.await()
+                .untilAsserted(() -> {
+                    int transactionMetaStoreCount = pulsarServiceList.stream()
+                            .mapToInt(pulsarService -> 
pulsarService.getTransactionMetadataStoreService().getStores().size())
+                            .sum();
+                    Assert.assertEquals(transactionMetaStoreCount, number);
+                });
+    }
+
+    private PulsarClient buildClient() throws Exception {
+        return PulsarClient.builder()
+                .serviceUrlProvider(new ServiceUrlProvider() {
+                    final AtomicInteger atomicInteger = new AtomicInteger();
+                    @Override
+                    public void initialize(PulsarClient client) {
+
+                    }
+
+                    @Override
+                    public String getServiceUrl() {
+                        return 
pulsarServiceList.get(atomicInteger.getAndIncrement() % 
pulsarServiceList.size()).getBrokerServiceUrl();
+                    }
+                })
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+    }
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
     }
 }
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 52821ede..4d079d3 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -644,6 +644,9 @@ std::string Commands::messageType(BaseCommand_Type type) {
         case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
             return "END_TXN_ON_SUBSCRIPTION_RESPONSE";
             break;
+        case BaseCommand::TC_CLIENT_CONNECT:
+            return "TC_CLIENT_CONNECT";
+            break;
     };
     BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration 
value"));
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 05a5cae..cdee3d0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -36,10 +36,8 @@ import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
@@ -94,28 +92,70 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
                 .setMandatoryStop(100, TimeUnit.MILLISECONDS)
                 .create(),
             this);
-        this.connectionHandler.grabCnx();
         this.connectFuture = connectFuture;
+        this.connectionHandler.grabCnx();
     }
 
     @Override
     public void connectionFailed(PulsarClientException exception) {
         LOG.error("Transaction meta handler with transaction coordinator id {} 
connection failed.",
             transactionCoordinatorId, exception);
-        setState(State.Failed);
-        this.connectFuture.completeExceptionally(exception);
+        if (!this.connectFuture.isDone()) {
+            this.connectFuture.completeExceptionally(exception);
+        }
     }
 
     @Override
     public void connectionOpened(ClientCnx cnx) {
         LOG.info("Transaction meta handler with transaction coordinator id {} 
connection opened.",
             transactionCoordinatorId);
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            setState(State.Closed);
+            failPendingRequest();
+            this.pendingRequests.clear();
+            return;
+        }
+
         connectionHandler.setClientCnx(cnx);
         cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId, 
this);
-        if (!changeToReadyState()) {
-            cnx.channel().close();
-        }
-        this.connectFuture.complete(null);
+        long requestId = client.newRequestId();
+        ByteBuf request = 
Commands.newTcClientConnect(transactionCoordinatorId, requestId);
+
+        cnx.sendRequestWithId(request, requestId).thenRun(() -> {
+            LOG.info("Transaction coordinator client connect success! tcId : 
{}", transactionCoordinatorId);
+            if (!changeToReadyState()) {
+                setState(State.Closed);
+                cnx.channel().close();
+            }
+
+            if (!this.connectFuture.isDone()) {
+                this.connectFuture.complete(null);
+            }
+            this.connectionHandler.resetBackoff();
+        }).exceptionally((e) -> {
+            LOG.error("Transaction coordinator client connect fail! tcId : 
{}", transactionCoordinatorId, e.getCause());
+            if (getState() == State.Closing || getState() == State.Closed
+                    || e.getCause() instanceof 
PulsarClientException.NotAllowedException) {
+                setState(State.Closed);
+                cnx.channel().close();
+            } else {
+                connectionHandler.reconnectLater(e.getCause());
+            }
+            return null;
+        });
+    }
+
+    private void failPendingRequest() {
+        pendingRequests.keys().forEach(k -> {
+            OpBase<?> op = pendingRequests.remove(k);
+            if (op != null && !op.callback.isDone()) {
+                op.callback.completeExceptionally(new 
PulsarClientException.AlreadyClosedException(
+                        "Could not get response from transaction meta store 
when " +
+                                "the transaction meta store has already 
close."));
+                onResponse(op);
+            }
+        });
     }
 
     public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit 
unit) {
@@ -146,6 +186,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             }
             return;
         }
+
         if (!response.hasError()) {
             TxnID txnID = new TxnID(response.getTxnidMostBits(), 
response.getTxnidLeastBits());
             if (LOG.isDebugEnabled()) {
@@ -153,12 +194,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             }
             op.callback.complete(txnID);
         } else {
-            if (response.getError() == 
ServerError.TransactionCoordinatorNotFound) {
-                connectionHandler.reconnectLater(
-                        new 
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
-            }
             LOG.error("Got new txn for request {} error {}", 
response.getRequestId(), response.getError());
-            
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
 response.getMessage()));
+            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
         }
 
         onResponse(op);
@@ -193,18 +230,15 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             }
             return;
         }
+
         if (!response.hasError()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Add publish partition for request {} success.", 
response.getRequestId());
             }
             op.callback.complete(null);
         } else {
-            if (response.getError() == 
ServerError.TransactionCoordinatorNotFound) {
-                connectionHandler.reconnectLater(
-                        new 
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
-            }
             LOG.error("Add publish partition for request {} error {}.", 
response.getRequestId(), response.getError());
-            
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
 response.getMessage()));
+            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
         }
 
         onResponse(op);
@@ -239,19 +273,16 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             }
             return;
         }
+
         if (!response.hasError()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Add subscription to txn success for request {}.", 
response.getRequestId());
             }
             op.callback.complete(null);
         } else {
-            if (response.getError() == 
ServerError.TransactionCoordinatorNotFound) {
-                connectionHandler.reconnectLater(
-                        new 
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
-            }
             LOG.error("Add subscription to txn failed for request {} error 
{}.",
                     response.getRequestId(), response.getError());
-            
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
 response.getMessage()));
+            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
         }
         onResponse(op);
     }
@@ -285,23 +316,31 @@ public class TransactionMetaStoreHandler extends 
HandlerState implements Connect
             }
             return;
         }
+
         if (!response.hasError()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Got end txn response success for request {}", 
response.getRequestId());
             }
             op.callback.complete(null);
         } else {
-            if (response.getError() == 
ServerError.TransactionCoordinatorNotFound) {
-                connectionHandler.reconnectLater(
-                        new 
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
-            }
             LOG.error("Got end txn response for request {} error {}", 
response.getRequestId(), response.getError());
-            
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
 response.getMessage()));
+            handleTransactionFailOp(response.getError(), 
response.getMessage(), op);
         }
 
         onResponse(op);
     }
 
+    private void handleTransactionFailOp(ServerError error, String message, 
OpBase<?> op) {
+        if (error == ServerError.TransactionCoordinatorNotFound && getState() 
!= State.Connecting) {
+            connectionHandler.reconnectLater(new 
TransactionCoordinatorClientException
+                    .CoordinatorNotFoundException(message));
+        }
+
+        if (op != null) {
+            op.callback.completeExceptionally(getExceptionByServerError(error, 
message));
+        }
+    }
+
     private static abstract class OpBase<T> {
         protected ByteBuf cmd;
         protected CompletableFuture<T> callback;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
index 5656404..2aa9e12 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
@@ -47,4 +47,8 @@ public class EventsTopicNames {
     public static boolean checkTopicIsEventsNames(TopicName topicName) {
         return 
EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
     }
+
+    public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName 
topicName) {
+        return 
TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString());
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2453eb8..e2d2c99 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.api.proto.AuthMethod;
 import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -593,6 +594,12 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
+    public static ByteBuf newTcClientConnect(long tcId, long requestId) {
+        BaseCommand cmd = localCmd(Type.TC_CLIENT_CONNECT);
+        cmd.setTcClientConnect().setTcId(tcId).setRequestId(requestId);
+        return serializeWithSize(cmd);
+    }
+
 
     private static KeySharedMode 
convertKeySharedMode(org.apache.pulsar.client.api.KeySharedMode mode) {
         switch (mode) {
@@ -1226,10 +1233,14 @@ public class Commands {
     public static ByteBuf newAddPartitionToTxnResponse(long requestId, long 
txnIdMostBits, ServerError error,
            String errorMsg) {
         BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN_RESPONSE);
-        cmd.setAddPartitionToTxnResponse()
+        CommandAddPartitionToTxnResponse response = 
cmd.setAddPartitionToTxnResponse()
                 .setRequestId(requestId)
                 .setError(error)
                 .setTxnidMostBits(txnIdMostBits);
+
+        if (errorMsg != null) {
+            response.setMessage(errorMsg);
+        }
         return serializeWithSize(cmd);
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 792746b..700a13e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
-
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandAckResponse;
@@ -75,6 +74,7 @@ import org.apache.pulsar.common.api.proto.CommandSendError;
 import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSuccess;
+import org.apache.pulsar.common.api.proto.CommandTcClientConnect;
 import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.intercept.InterceptException;
@@ -361,6 +361,11 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 handleAuthResponse(cmd.getAuthResponse());
                 break;
 
+            case TC_CLIENT_CONNECT:
+                checkArgument(cmd.hasTcClientConnect());
+                handleTcClientConnect(cmd.getTcClientConnect());
+                break;
+
             case NEW_TXN:
                 checkArgument(cmd.hasNewTxn());
                 handleNewTxn(cmd.getNewTxn());
@@ -602,6 +607,10 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleTcClientConnect(CommandTcClientConnect 
tcClientConnect) {
+        throw new UnsupportedOperationException();
+    }
+
     protected void handleNewTxn(CommandNewTxn commandNewTxn) {
         throw new UnsupportedOperationException();
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 320c51c..d1a9697 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -762,6 +762,11 @@ enum TxnAction {
     ABORT = 1;
 }
 
+message CommandTcClientConnect {
+    required uint64 request_id = 1;
+    required uint64 tc_id = 2 [default = 0];
+}
+
 message CommandNewTxn {
     required uint64 request_id = 1;
     optional uint64 txn_ttl_seconds = 2 [default = 0];
@@ -941,6 +946,7 @@ message BaseCommand {
 
         END_TXN_ON_SUBSCRIPTION = 60;
         END_TXN_ON_SUBSCRIPTION_RESPONSE = 61;
+        TC_CLIENT_CONNECT = 62;
 
     }
 
@@ -1016,4 +1022,5 @@ message BaseCommand {
     optional CommandEndTxnOnPartitionResponse endTxnOnPartitionResponse = 59;
     optional CommandEndTxnOnSubscription endTxnOnSubscription = 60;
     optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse 
= 61;
+    optional CommandTcClientConnect tcClientConnect = 62;
 }

Reply via email to