This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c14d079 [Transaction] Transaction coordinator fence mechanism.
(#11357)
c14d079 is described below
commit c14d079fa992d308e5ae04f7e7d0b9ada4f28cac
Author: congbo <[email protected]>
AuthorDate: Fri Oct 8 14:57:57 2021 +0800
[Transaction] Transaction coordinator fence mechanism. (#11357)
---
.../org/apache/pulsar/broker/PulsarService.java | 1 -
.../broker/TransactionMetadataStoreService.java | 220 +++++++++++++++------
.../broker/admin/impl/PersistentTopicsBase.java | 31 ++-
.../pulsar/broker/service/BrokerService.java | 13 ++
.../apache/pulsar/broker/service/ServerCnx.java | 140 ++++++++++---
.../stats/prometheus/TransactionAggregator.java | 13 +-
.../broker/admin/v3/AdminApiTransactionTest.java | 2 +
.../TransactionMetadataStoreServiceTest.java | 36 ++--
.../broker/stats/TransactionMetricsTest.java | 36 ++--
.../TransactionClientReconnectTest.java | 180 ++++++++++++++---
.../broker/transaction/TransactionTestBase.java | 2 +-
.../TransactionMetaStoreAssignmentTest.java | 133 ++++++++++---
pulsar-client-cpp/lib/Commands.cc | 3 +
.../client/impl/TransactionMetaStoreHandler.java | 97 ++++++---
.../pulsar/common/events/EventsTopicNames.java | 4 +
.../apache/pulsar/common/protocol/Commands.java | 13 +-
.../pulsar/common/protocol/PulsarDecoder.java | 11 +-
pulsar-common/src/main/proto/PulsarApi.proto | 7 +
18 files changed, 733 insertions(+), 209 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 10b2e31..5fc8d77 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -713,7 +713,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
transactionMetadataStoreService = new
TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
transactionBufferClient, transactionTimer);
- transactionMetadataStoreService.start();
transactionBufferProvider = TransactionBufferProvider
.newProvider(config.getTransactionBufferProviderClassName());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 899a465..31a8714 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -25,13 +25,17 @@ import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
+import
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import
org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import
org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import
org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
@@ -47,6 +51,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
@@ -74,6 +79,13 @@ public class TransactionMetadataStoreService {
private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
private static final long endTransactionRetryIntervalTime = 1000;
private final Timer transactionOpRetryTimer;
+ // this semaphore for loading one transaction coordinator with the same tc
id on the same time
+ private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
+ // one connect request open the transactionMetaStore the other request
will add to the queue, when the open op
+ // finished the request will be poll and complete the future
+ private final
ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>>
pendingConnectRequests;
+
+ private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
public TransactionMetadataStoreService(TransactionMetadataStoreProvider
transactionMetadataStoreProvider,
PulsarService pulsarService,
TransactionBufferClient tbClient,
@@ -84,48 +96,53 @@ public class TransactionMetadataStoreService {
this.tbClient = tbClient;
this.timeoutTrackerFactory = new
TransactionTimeoutTrackerFactoryImpl(this, timer);
this.transactionOpRetryTimer = timer;
+ this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
+ this.pendingConnectRequests = new ConcurrentLongHashMap<>();
}
+ @Deprecated
public void start() {
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new
NamespaceBundleOwnershipListener() {
+
@Override
public void onLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
-
addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
+ .whenComplete((topics, ex) -> {
+ if (ex == null) {
+ for (String topic : topics) {
+ TopicName name = TopicName.get(topic);
+ if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
+
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
+ && name.isPartitioned()) {
+
handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
+ }
}
+ } else {
+ LOG.error("Failed to get owned topic list when
triggering on-loading bundle {}.",
+ bundle, ex);
}
- } else {
- LOG.error("Failed to get owned topic list when
triggering on-loading bundle {}.",
- bundle, ex);
- }
- });
+ });
}
+
@Override
public void unLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
- removeTransactionMetadataStore(
-
TransactionCoordinatorID.get(name.getPartitionIndex()));
+ .whenComplete((topics, ex) -> {
+ if (ex == null) {
+ for (String topic : topics) {
+ TopicName name = TopicName.get(topic);
+ if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
+
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
+ && name.isPartitioned()) {
+ removeTransactionMetadataStore(
+
TransactionCoordinatorID.get(name.getPartitionIndex()));
+ }
}
+ } else {
+ LOG.error("Failed to get owned topic list
error when triggering un-loading bundle {}.",
+ bundle, ex);
}
- } else {
- LOG.error("Failed to get owned topic list error
when triggering un-loading bundle {}.",
- bundle, ex);
- }
- });
+ });
}
@Override
public boolean test(NamespaceBundle namespaceBundle) {
@@ -134,44 +151,120 @@ public class TransactionMetadataStoreService {
});
}
- public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
- pulsarService.getBrokerService()
-
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX
+ tcId))
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.error("Add transaction metadata store with id {}
error", tcId.getId(), e);
- } else {
- TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
- TransactionRecoverTracker recoverTracker =
- new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
- timeoutTracker, tcId.getId());
- transactionMetadataStoreProvider.openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
- timeoutTracker, recoverTracker)
- .whenComplete((store, ex) -> {
- if (ex != null) {
- LOG.error("Add transaction metadata
store with id {} error", tcId.getId(), ex);
- } else {
- stores.put(tcId, store);
- LOG.info("Added new transaction meta
store {}", tcId);
- }
- });
+ public CompletableFuture<Void>
handleTcClientConnect(TransactionCoordinatorID tcId) {
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+ .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)
tcId.getId()).toString()).thenCompose(v -> {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+ .computeIfAbsent(tcId.getId(), (id) -> new
Semaphore(1));
+ Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+ .computeIfAbsent(tcId.getId(), (id) -> new
ConcurrentLinkedDeque<>());
+ if (tcLoadSemaphore.tryAcquire()) {
+ // when tcLoadSemaphore.release(), this command will
acquire semaphore, so we should jude the store
+ // exist again.
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
}
- });
- }
- public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
- TransactionMetadataStore metadataStore = stores.remove(tcId);
- if (metadataStore != null) {
- metadataStore.closeAsync().whenComplete((v, ex) -> {
- if (ex != null) {
- LOG.error("Close transaction metadata store with id " +
tcId, ex);
+ openTransactionMetadataStore(tcId).thenAccept((store) -> {
+ stores.put(tcId, store);
+ LOG.info("Added new transaction meta store {}", tcId);
+ long endTime = System.currentTimeMillis() +
HANDLE_PENDING_CONNECT_TIME_OUT;
+ while (true) {
+ // prevent thread in a busy loop.
+ if (System.currentTimeMillis() < endTime) {
+ CompletableFuture<Void> future = deque.poll();
+ if (future != null) {
+ // complete queue request future
+ future.complete(null);
+ } else {
+ break;
+ }
+ } else {
+ deque.clear();
+ break;
+ }
+ }
+
+ completableFuture.complete(null);
+ tcLoadSemaphore.release();
+ }).exceptionally(e -> {
+ completableFuture.completeExceptionally(e.getCause());
+ // release before handle request queue, in order to
client reconnect infinite loop
+ tcLoadSemaphore.release();
+ long endTime = System.currentTimeMillis() +
HANDLE_PENDING_CONNECT_TIME_OUT;
+ while (true) {
+ // prevent thread in a busy loop.
+ if (System.currentTimeMillis() < endTime) {
+ CompletableFuture<Void> future = deque.poll();
+ if (future != null) {
+ // this means that this tc client
connection connect fail
+ future.completeExceptionally(e);
+ } else {
+ break;
+ }
+ } else {
+ deque.clear();
+ break;
+ }
+ }
+ LOG.error("Add transaction metadata store with id {}
error", tcId.getId(), e);
+ return null;
+ });
} else {
- LOG.info("Removed and closed transaction meta store {}",
tcId);
+ // only one command can open transaction metadata store,
+ // other will be added to the deque, when the op of
openTransactionMetadataStore finished
+ // then handle the requests witch in the queue
+ deque.add(completableFuture);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handle tc client connect added into pending
queue! tcId : {}", tcId.toString());
+ }
}
+ return completableFuture;
});
}
}
+ public CompletableFuture<TransactionMetadataStore>
openTransactionMetadataStore(TransactionCoordinatorID tcId) {
+ return pulsarService.getBrokerService()
+ .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl
+ .TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> {
+ TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
+ TransactionRecoverTracker recoverTracker =
+ new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+ timeoutTracker, tcId.getId());
+ return transactionMetadataStoreProvider
+ .openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
+ timeoutTracker, recoverTracker);
+ });
+ }
+
+ public CompletableFuture<Void>
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
+ final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+ .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
+ if (tcLoadSemaphore.tryAcquire()) {
+ TransactionMetadataStore metadataStore = stores.remove(tcId);
+ if (metadataStore != null) {
+ metadataStore.closeAsync().whenComplete((v, ex) -> {
+ if (ex != null) {
+ LOG.error("Close transaction metadata store with id "
+ tcId, ex);
+ } else {
+ LOG.info("Removed and closed transaction meta store
{}", tcId);
+ }
+ });
+ }
+ tcLoadSemaphore.release();
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return FutureUtil.failedFuture(
+ new ServiceUnitNotReadyException("Could not remove "
+ + "TransactionMetadataStore, it is doing other
operations!"));
+ }
+ }
+
public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID
tcId, long timeoutInMills) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
@@ -323,6 +416,14 @@ public class TransactionMetadataStoreService {
return completableFuture;
}
+ // when managedLedger fence will remove this tc and reload
+ public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) {
+ if (e.getCause() instanceof
ManagedLedgerException.ManagedLedgerFencedException
+ || e instanceof
ManagedLedgerException.ManagedLedgerFencedException) {
+ removeTransactionMetadataStore(tcId);
+ }
+ }
+
public void endTransactionForTimeout(TxnID txnID) {
getTxnMeta(txnID).thenCompose(txnMeta -> {
if (txnMeta.status() == TxnStatus.OPEN) {
@@ -400,13 +501,14 @@ public class TransactionMetadataStoreService {
}
private static boolean isRetryableException(Throwable e) {
- return e instanceof TransactionMetadataStoreStateException
+ return (e instanceof TransactionMetadataStoreStateException
|| e instanceof RequestTimeoutException
|| e instanceof ManagedLedgerException
|| e instanceof BrokerPersistenceException
|| e instanceof LookupException
|| e instanceof ReachMaxPendingOpsException
- || e instanceof ConnectException;
+ || e instanceof ConnectException)
+ && !(e instanceof
ManagedLedgerException.ManagedLedgerFencedException);
}
private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID
txnID, int txnAction) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index aa40644..0271abd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.admin.impl;
import static
org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
+import static
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.Lists;
@@ -130,6 +131,7 @@ import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -675,7 +677,11 @@ public class PersistentTopicsBase extends AdminResource {
}
// If the topic name is a partition name, no need to get partition
topic metadata again
if (topicName.isPartitioned()) {
- internalUnloadNonPartitionedTopic(asyncResponse, authoritative);
+ if (checkTopicIsTransactionCoordinatorAssign(topicName)) {
+ internalUnloadTransactionCoordinator(asyncResponse,
authoritative);
+ } else {
+ internalUnloadNonPartitionedTopic(asyncResponse,
authoritative);
+ }
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(meta -> {
@@ -927,6 +933,29 @@ public class PersistentTopicsBase extends AdminResource {
});
}
+ private void internalUnloadTransactionCoordinator(AsyncResponse
asyncResponse, boolean authoritative) {
+ try {
+ validateTopicOperation(topicName, TopicOperation.UNLOAD);
+ } catch (Exception e) {
+ log.error("[{}] Failed to unload tc {},{}", clientAppId(),
topicName.getPartitionIndex(), e.getMessage());
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(v -> pulsar()
+ .getTransactionMetadataStoreService()
+
.removeTransactionMetadataStore(TransactionCoordinatorID.get(topicName.getPartitionIndex())))
+ .thenRun(() -> {
+ log.info("[{}] Successfully unloaded tc {}",
clientAppId(), topicName.getPartitionIndex());
+ asyncResponse.resume(Response.noContent().build());
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to unload tc {}, {}",
clientAppId(), topicName.getPartitionIndex(),
+ ex.getMessage());
+ asyncResponse.resume(ex.getCause());
+ return null;
+ });
+ }
+
protected void internalDeleteTopic(boolean authoritative, boolean force,
boolean deleteSchema) {
if (force) {
internalDeleteTopicForcefully(authoritative, deleteSchema);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 58b3f43..bd3c1e9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -91,6 +91,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.BundlesQuotas;
@@ -160,6 +161,7 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1675,6 +1677,17 @@ public class BrokerService implements Closeable {
: CompletableFuture.completedFuture(null)));
}
});
+ if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
+ &&
serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
+ TransactionMetadataStoreService metadataStoreService =
+ this.getPulsar().getTransactionMetadataStoreService();
+ // if the store belongs to this bundle, remove and close the store
+
this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store
->
+
serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN
+ .getPartition((int)
(store.getTransactionCoordinatorID().getId()))))
+ .map(TransactionMetadataStore::getTransactionCoordinatorID)
+ .forEach(tcId ->
closeFutures.add(metadataStoreService.removeTransactionMetadataStore(tcId)));
+ }
return FutureUtil.waitForAll(closeFutures).thenApply(v ->
closeFutures.size());
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 3e07eec..539fcd6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -114,6 +114,7 @@ import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.CommandTcClientConnect;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
@@ -1933,23 +1934,62 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
}
@Override
- protected void handleNewTxn(CommandNewTxn command) {
+ protected void handleTcClientConnect(CommandTcClientConnect command) {
final long requestId = command.getRequestId();
final TransactionCoordinatorID tcId =
TransactionCoordinatorID.get(command.getTcId());
if (log.isDebugEnabled()) {
- log.debug("Receive new txn request {} to transaction meta store {}
from {}.",
+ log.debug("Receive tc client connect request {} to transaction
meta store {} from {}.",
requestId, tcId, remoteAddress);
}
+
+ if (!checkTransactionEnableAndSenError(requestId)) {
+ return;
+ }
+
TransactionMetadataStoreService transactionMetadataStoreService =
service.pulsar().getTransactionMetadataStoreService();
- if (transactionMetadataStoreService == null) {
- CoordinatorException.CoordinatorNotFoundException ex =
- new CoordinatorException.CoordinatorNotFoundException(
- "Transaction manager is not
started or not enabled");
- ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
- BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
+
+
transactionMetadataStoreService.handleTcClientConnect(tcId).thenAccept(connection
-> {
+ if (log.isDebugEnabled()) {
+ log.debug("Handle tc client connect request {} to transaction
meta store {} from {} success.",
+ requestId, tcId, remoteAddress);
+ }
+ commandSender.sendSuccessResponse(requestId);
+ }).exceptionally(e -> {
+ log.error("Handle tc client connect request {} to transaction meta
store {} from {} fail.",
+ requestId, tcId, remoteAddress, e.getCause());
+ commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(e), e.getMessage());
+ return null;
+ });
+ }
+
+ private boolean checkTransactionEnableAndSenError(long requestId) {
+ if
(!service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
+ BrokerServiceException.NotAllowedException ex =
+ new BrokerServiceException.NotAllowedException(
+ "Transaction manager is not not enabled");
+ commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ protected void handleNewTxn(CommandNewTxn command) {
+ final long requestId = command.getRequestId();
+ final TransactionCoordinatorID tcId =
TransactionCoordinatorID.get(command.getTcId());
+ if (log.isDebugEnabled()) {
+ log.debug("Receive new txn request {} to transaction meta store {}
from {}.",
+ requestId, tcId, remoteAddress);
+ }
+
+ if (!checkTransactionEnableAndSenError(requestId)) {
return;
}
+
+ TransactionMetadataStoreService transactionMetadataStoreService =
+ service.pulsar().getTransactionMetadataStoreService();
transactionMetadataStoreService.newTransaction(tcId,
command.getTxnTtlSeconds())
.whenComplete(((txnID, ex) -> {
if (ex == null) {
@@ -1962,8 +2002,10 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (log.isDebugEnabled()) {
log.debug("Send response error for new txn request
{}", requestId, ex);
}
+
ctx.writeAndFlush(Commands.newTxnResponse(requestId,
tcId.getId(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
+ transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
}
@@ -1971,12 +2013,20 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
@Override
protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
+ final TransactionCoordinatorID tcId =
TransactionCoordinatorID.get(command.getTxnidMostBits());
final long requestId = command.getRequestId();
if (log.isDebugEnabled()) {
command.getPartitionsList().forEach(partion ->
log.debug("Receive add published partition to txn request
{} "
+ "from {} with txnId {}, topic: [{}]", requestId,
remoteAddress, txnID, partion));
}
+
+ if (!checkTransactionEnableAndSenError(requestId)) {
+ return;
+ }
+
+ TransactionMetadataStoreService transactionMetadataStoreService =
+ service.pulsar().getTransactionMetadataStoreService();
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
command.getPartitionsList())
.whenComplete(((v, ex) -> {
@@ -1990,10 +2040,18 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published
partition to txn request {}", requestId,
ex);
+ }
+
+ if (ex instanceof
CoordinatorException.CoordinatorNotFoundException) {
+
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getMostSigBits(),
+
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
+ } else {
+
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getMostSigBits(),
+
BrokerServiceException.getClientErrorCode(ex.getCause()),
+ ex.getCause().getMessage()));
+ }
+ transactionMetadataStoreService.handleOpFail(ex, tcId);
}
-
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getMostSigBits(),
- BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
- }
}));
}
@@ -2002,16 +2060,35 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
final long requestId = command.getRequestId();
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
+ final TransactionCoordinatorID tcId =
TransactionCoordinatorID.get(command.getTxnidMostBits());
+
+ if (!checkTransactionEnableAndSenError(requestId)) {
+ return;
+ }
- service.pulsar().getTransactionMetadataStoreService()
+ TransactionMetadataStoreService transactionMetadataStoreService =
+ service.pulsar().getTransactionMetadataStoreService();
+
+ transactionMetadataStoreService
.endTransaction(txnID, txnAction, false)
- .thenRun(() ->
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
- txnID.getLeastSigBits(), txnID.getMostSigBits())))
- .exceptionally(throwable -> {
- log.error("Send response error for end txn request.",
throwable);
- ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getMostSigBits(),
-
BrokerServiceException.getClientErrorCode(throwable.getCause()),
throwable.getMessage()));
- return null; });
+ .whenComplete((v, ex) -> {
+ if (ex == null) {
+ ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
+ txnID.getLeastSigBits(),
txnID.getMostSigBits()));
+ } else {
+ log.error("Send response error for end txn request.",
ex);
+
+ if (ex instanceof
CoordinatorException.CoordinatorNotFoundException) {
+
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
+ } else {
+
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
+
BrokerServiceException.getClientErrorCode(ex.getCause()),
+ ex.getCause().getMessage()));
+ }
+ transactionMetadataStoreService.handleOpFail(ex, tcId);
+ }
+ });
}
@Override
@@ -2198,7 +2275,16 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
requestId, remoteAddress, txnID);
}
-
service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
+ final TransactionCoordinatorID tcId =
TransactionCoordinatorID.get(command.getTxnidMostBits());
+
+ if (!checkTransactionEnableAndSenError(requestId)) {
+ return;
+ }
+
+ TransactionMetadataStoreService transactionMetadataStoreService =
+ service.pulsar().getTransactionMetadataStoreService();
+
+ transactionMetadataStoreService.addAckedPartitionToTxn(txnID,
MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList()))
.whenComplete(((v, ex) -> {
if (ex == null) {
@@ -2214,9 +2300,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.debug("Send response error for add published
partition to txn request {}",
requestId, ex);
}
-
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
- txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
- ex.getMessage()));
+
+ if (ex instanceof
CoordinatorException.CoordinatorNotFoundException) {
+
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+ txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
+ ex.getMessage()));
+ } else {
+
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
+ txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex.getCause()),
+ ex.getCause().getMessage()));
+ }
+ transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
index 5ea2bea..142ec48 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats.prometheus;
+import static
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -26,6 +27,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
@@ -62,10 +64,13 @@ public class TransactionAggregator {
topic.getSubscriptions().values().forEach(subscription -> {
try {
localManageLedgerStats.get().reset();
- ManagedLedger managedLedger =
- ((PersistentSubscription)
subscription).getPendingAckManageLedger().get();
- generateManageLedgerStats(managedLedger,
- stream, cluster, namespace, name,
subscription.getName());
+ if
(!checkTopicIsEventsNames(TopicName.get(subscription.getTopic().getName()))) {
+ ManagedLedger managedLedger =
+ ((PersistentSubscription)
subscription)
+
.getPendingAckManageLedger().get();
+
generateManageLedgerStats(managedLedger,
+ stream, cluster, namespace,
name, subscription.getName());
+ }
} catch (Exception e) {
log.warn("Transaction pending ack generate
managedLedgerStats fail!", e);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 45b11de..6f9a189 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -396,6 +396,8 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
private void initTransaction(int coordinatorSize) throws Exception {
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
coordinatorSize);
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+ pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
+ pulsarClient.close();
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== coordinatorSize);
pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 215dffe..983f0d1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -28,12 +28,17 @@ import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
@@ -57,6 +62,10 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
ServiceConfiguration configuration = getDefaultConf();
configuration.setTransactionCoordinatorEnabled(true);
super.baseSetup(configuration);
+ admin.tenants().createTenant("pulsar", new
TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
+
admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}
@AfterMethod(alwaysRun = true)
@@ -66,11 +75,12 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
}
@Test
- public void testAddAndRemoveTransactionMetadataStore() {
+ public void testAddAndRemoveTransactionMetadataStore() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService =
pulsar.getTransactionMetadataStoreService();
Assert.assertNotNull(transactionMetadataStoreService);
-
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
+
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 1);
@@ -82,9 +92,9 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test
public void testNewTransaction() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService =
pulsar.getTransactionMetadataStoreService();
-
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
-
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
+
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
+
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(1));
+
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(2));
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 3);
checkTransactionMetadataStoreReady((MLTransactionMetadataStore)
pulsar.getTransactionMetadataStoreService()
@@ -108,7 +118,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test
public void testAddProducedPartitionToTxn() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService =
pulsar.getTransactionMetadataStoreService();
-
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 1);
@@ -132,7 +142,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test
public void testAddAckedPartitionToTxn() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService =
pulsar.getTransactionMetadataStoreService();
-
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0)).get();
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 1);
@@ -154,7 +164,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test
public void testTimeoutTracker() throws Exception {
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) !=
null);
@@ -183,7 +193,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test
public void testTimeoutTrackerExpired() throws Exception {
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) !=
null);
MLTransactionMetadataStore transactionMetadataStore =
@@ -213,7 +223,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test
public void testTimeoutTrackerMultiThreading() throws Exception {
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) !=
null);
@@ -284,7 +294,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test
public void transactionTimeoutRecoverTest() throws Exception {
int timeout = 2000;
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) !=
null);
@@ -298,7 +308,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
pulsar.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) !=
null);
@@ -324,7 +334,7 @@ public class TransactionMetadataStoreServiceTest extends
BrokerTestBase {
@Test(dataProvider = "txnStatus")
public void testEndTransactionOpRetry(TxnStatus txnStatus) throws
Exception {
int timeOut = 3000;
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) !=
null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
index de7a12d2..a209d89 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
-import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
@@ -61,6 +60,12 @@ public class TransactionMetricsTest extends BrokerTestBase {
ServiceConfiguration serviceConfiguration = getDefaultConf();
serviceConfiguration.setTransactionCoordinatorEnabled(true);
super.baseSetup(serviceConfiguration);
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ TenantInfo.builder()
+ .adminRoles(Sets.newHashSet("appid1"))
+ .allowedClusters(Sets.newHashSet("test"))
+ .build());
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
}
@AfterMethod(alwaysRun = true)
@@ -72,12 +77,14 @@ public class TransactionMetricsTest extends BrokerTestBase {
@Test
public void testTransactionCoordinatorMetrics() throws Exception{
long timeout = 10000;
- TransactionCoordinatorID transactionCoordinatorIDOne =
TransactionCoordinatorID.get(1);
- TransactionCoordinatorID transactionCoordinatorIDTwo =
TransactionCoordinatorID.get(2);
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDTwo);
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
2);
+
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+ TransactionCoordinatorID transactionCoordinatorIDOne =
TransactionCoordinatorID.get(0);
+ TransactionCoordinatorID transactionCoordinatorIDTwo =
TransactionCoordinatorID.get(1);
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDTwo);
- Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
+ Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== 2);
pulsar.getTransactionMetadataStoreService().getStores()
.get(transactionCoordinatorIDOne).newTransaction(timeout).get();
@@ -92,7 +99,7 @@ public class TransactionMetricsTest extends BrokerTestBase {
Collection<PrometheusMetricsTest.Metric> metric =
metrics.get("pulsar_txn_active_count");
assertEquals(metric.size(), 2);
metric.forEach(item -> {
- if ("1".equals(item.tags.get("coordinator_id"))) {
+ if ("0".equals(item.tags.get("coordinator_id"))) {
assertEquals(item.value, 1);
} else {
assertEquals(item.value, 2);
@@ -108,8 +115,11 @@ public class TransactionMetricsTest extends BrokerTestBase
{
admin.namespaces().createNamespace(ns1);
String topic = "persistent://" + ns1 + "/test_coordinator_metrics";
String subName = "test_coordinator_metrics";
+
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
TransactionCoordinatorID transactionCoordinatorIDOne =
TransactionCoordinatorID.get(1);
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne);
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createSubscription(topic, subName, MessageId.earliest);
Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
@@ -196,16 +206,12 @@ public class TransactionMetricsTest extends
BrokerTestBase {
String topic = "persistent://" + ns1 + "/test_managed_ledger_metrics";
String subName = "test_managed_ledger_metrics";
admin.topics().createNonPartitionedTopic(topic);
-
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
- TenantInfo.builder()
- .adminRoles(Sets.newHashSet("appid1"))
- .allowedClusters(Sets.newHashSet("test"))
- .build());
-
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
1);
+
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
TransactionCoordinatorID transactionCoordinatorIDOne =
TransactionCoordinatorID.get(0);
-
pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(transactionCoordinatorIDOne);
+
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get();
admin.topics().createSubscription(topic, subName, MessageId.earliest);
+
Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size()
== 1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
index 8df470d..1f5ab15 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
@@ -19,27 +19,31 @@
package org.apache.pulsar.broker.transaction;
import com.google.common.collect.Sets;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;
@@ -80,67 +84,183 @@ public class TransactionClientReconnectTest extends
TransactionTestBase {
}
@Test
- public void testTransactionClientReconnectTest() throws
PulsarClientException, ExecutionException, InterruptedException {
+ public void testTransactionNewReconnect() throws Exception {
+ start();
- ((PulsarClientImpl) pulsarClient).getLookup()
-
.getPartitionedTopicMetadata(TopicName.TRANSACTION_COORDINATOR_ASSIGN).get();
+ // when throw CoordinatorNotFoundException client will reconnect tc
+ try {
+ pulsarClient.newTransaction()
+ .withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+ }
+ reconnect();
- Awaitility.await().until(() -> {
+
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+
+ // tc fence will remove this tc and reopen
+ try {
pulsarClient.newTransaction()
.withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
- return true;
- });
+ fail();
+ } catch (ExecutionException e) {
+ assertEquals(e.getCause().getMessage(),
+
"org.apache.bookkeeper.mledger.ManagedLedgerException$ManagedLedgerFencedException:
" +
+ "java.lang.Exception: Attempted to use a fenced
managed ledger");
+ }
- TransactionImpl transaction = (TransactionImpl)
pulsarClient.newTransaction()
- .withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
+ reconnect();
+ }
- TransactionMetadataStoreService transactionMetadataStoreService =
-
getPulsarServiceList().get(0).getTransactionMetadataStoreService();
+ @Test
+ public void testTransactionAddSubscriptionToTxnAsyncReconnect() throws
Exception {
+ TransactionCoordinatorClientImpl transactionCoordinatorClient =
((PulsarClientImpl) pulsarClient).getTcClient();
+ start();
-
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
+ try {
+ transactionCoordinatorClient.addSubscriptionToTxnAsync(new
TxnID(0, 0), "test", "test").get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+ }
- // transaction client will reconnect
+ reconnect();
+
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
try {
- pulsarClient.newTransaction()
- .withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
+ transactionCoordinatorClient.addSubscriptionToTxnAsync(new
TxnID(0, 0), "test", "test").get();
+ fail();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof
TransactionCoordinatorClientException.TransactionNotFoundException) {
+ assertEquals(e.getCause().getMessage(), "The transaction with
this txdID `(0,0)`not found ");
+ } else {
+ assertEquals(e.getCause().getMessage(), "java.lang.Exception:
Attempted to use a fenced managed ledger");
+ }
+ }
+ reconnect();
+ }
+
+ @Test
+ public void testTransactionAbortToTxnAsyncReconnect() throws Exception {
+ TransactionCoordinatorClientImpl transactionCoordinatorClient =
((PulsarClientImpl) pulsarClient).getTcClient();
+ start();
+
+ try {
+ transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.CoordinatorNotFoundException);
}
+ reconnect();
+
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
try {
- transaction.registerProducedTopic(RECONNECT_TOPIC).get();
+ transactionCoordinatorClient.abortAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+ if (e.getCause() instanceof
TransactionCoordinatorClientException.TransactionNotFoundException) {
+ assertEquals(e.getCause().getMessage(), "The transaction with
this txdID `(0,0)`not found ");
+ } else {
+ assertEquals(e.getCause().getMessage(), "java.lang.Exception:
Attempted to use a fenced managed ledger");
+ }
}
+ reconnect();
+ }
+
+ @Test
+ public void testTransactionCommitToTxnAsyncReconnect() throws Exception {
+ TransactionCoordinatorClientImpl transactionCoordinatorClient =
((PulsarClientImpl) pulsarClient).getTcClient();
+ start();
try {
- transaction.registerAckedTopic(RECONNECT_TOPIC, "test").get();
+ transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+ assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.CoordinatorNotFoundException);
}
+ reconnect();
+
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
try {
- transaction.commit().get();
+ transactionCoordinatorClient.commitAsync(new TxnID(0, 0)).get();
fail();
} catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+ if (e.getCause() instanceof
TransactionCoordinatorClientException.TransactionNotFoundException) {
+ assertEquals(e.getCause().getMessage(), "The transaction with
this txdID `(0,0)`not found ");
+ } else {
+ assertEquals(e.getCause().getMessage(), "java.lang.Exception:
Attempted to use a fenced managed ledger");
+ }
}
+ reconnect();
+ }
-
transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+ @Test
+ public void testTransactionAddPublishPartitionToTxnReconnect() throws
Exception {
+ TransactionCoordinatorClientImpl transactionCoordinatorClient =
((PulsarClientImpl) pulsarClient).getTcClient();
+ start();
+ try {
+ transactionCoordinatorClient.addPublishPartitionToTxnAsync(new
TxnID(0, 0),
+ Collections.singletonList("test")).get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException.CoordinatorNotFoundException);
+ }
+
+ reconnect();
+
fence(getPulsarServiceList().get(0).getTransactionMetadataStoreService());
+ try {
+ transactionCoordinatorClient.addPublishPartitionToTxnAsync(new
TxnID(0, 0),
+ Collections.singletonList("test")).get();
+ fail();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof
TransactionCoordinatorClientException.TransactionNotFoundException) {
+ assertEquals(e.getCause().getMessage(), "The transaction with
this txdID `(0,0)`not found ");
+ } else {
+ assertEquals(e.getCause().getMessage(), "java.lang.Exception:
Attempted to use a fenced managed ledger");
+ }
+ }
+ reconnect();
+ }
+
+ public void start() throws Exception {
// wait transaction coordinator init success
Awaitility.await().until(() -> {
- pulsarClient.newTransaction()
- .withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
+ try {
+ pulsarClient.newTransaction()
+ .withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
+ } catch (Exception e) {
+ return false;
+ }
return true;
});
- transaction = (TransactionImpl) pulsarClient.newTransaction()
+ pulsarClient.newTransaction()
.withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
- transaction.registerProducedTopic(RECONNECT_TOPIC).get();
- transaction.registerAckedTopic(RECONNECT_TOPIC, "test").get();
- transaction.commit().get();
+
+ TransactionMetadataStoreService transactionMetadataStoreService =
+
getPulsarServiceList().get(0).getTransactionMetadataStoreService();
+ // remove transaction metadata store
+
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0)).get();
+
+ }
+
+ public void fence(TransactionMetadataStoreService
transactionMetadataStoreService) throws Exception {
+ Field field = ManagedLedgerImpl.class.getDeclaredField("state");
+ field.setAccessible(true);
+ field.set(((MLTransactionMetadataStore)
transactionMetadataStoreService.getStores()
+ .get(TransactionCoordinatorID.get(0))).getManagedLedger(),
ManagedLedgerImpl.State.Fenced);
+ }
+
+ public void reconnect() {
+ //reconnect
+ Awaitility.await().until(() -> {
+ try {
+ pulsarClient.newTransaction()
+ .withTransactionTimeout(200,
TimeUnit.MILLISECONDS).build().get();
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ });
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index f40f80f..622421b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -74,7 +74,7 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
@Getter
private final List<ServiceConfiguration> serviceConfigurationList = new
ArrayList<>();
@Getter
- private final List<PulsarService> pulsarServiceList = new ArrayList<>();
+ protected final List<PulsarService> pulsarServiceList = new ArrayList<>();
protected PulsarAdmin admin;
protected PulsarClient pulsarClient;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
index ce173ba..1725305 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreAssignmentTest.java
@@ -18,57 +18,128 @@
*/
package org.apache.pulsar.broker.transaction.coordinator;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Sets;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.ServiceUrlProvider;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-@Test(groups = "broker")
-public class TransactionMetaStoreAssignmentTest extends
TransactionMetaStoreTestBase {
+public class TransactionMetaStoreAssignmentTest extends TransactionTestBase {
- @Test(groups = "broker")
- public void testTransactionMetaStoreAssignAndFailover() throws IOException
{
+ @Override
+ @BeforeMethod(alwaysRun = true)
+ protected void setup() throws Exception {
+ setBrokerCount(3);
+ super.internalSetup();
+ String[] brokerServiceUrlArr =
getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+ String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length
-1];
+ admin.clusters().createCluster(CLUSTER_NAME,
ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet(CLUSTER_NAME)));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(),
16);
+ pulsarClient.close();
+ }
- Awaitility.await()
- .untilAsserted(() -> {
- int transactionMetaStoreCount =
Arrays.stream(pulsarServices)
- .mapToInt(pulsarService ->
pulsarService.getTransactionMetadataStoreService().getStores().size())
- .sum();
- Assert.assertEquals(transactionMetaStoreCount, 16);
- });
+ @Test
+ public void testTransactionMetaStoreAssignAndFailover() throws Exception {
+
+ pulsarClient = buildClient();
+ checkTransactionCoordinatorNum(16);
+
+ pulsarClient.close();
PulsarService crashedMetaStore = null;
- for (int i = pulsarServices.length - 1; i >= 0; i--) {
- if
(pulsarServices[i].getTransactionMetadataStoreService().getStores().size() > 0)
{
- crashedMetaStore = pulsarServices[i];
+ for (int i = pulsarServiceList.size() - 1; i >= 0; i--) {
+ if
(pulsarServiceList.get(i).getTransactionMetadataStoreService().getStores().size()
> 0) {
+ crashedMetaStore = pulsarServiceList.get(i);
break;
}
}
Assert.assertNotNull(crashedMetaStore);
- List<PulsarService> services = new ArrayList<>(pulsarServices.length -
1);
- for (PulsarService pulsarService : pulsarServices) {
- if (pulsarService != crashedMetaStore) {
- services.add(pulsarService);
- }
- }
- pulsarServices = new PulsarService[pulsarServices.length - 1];
- for (int i = 0; i < services.size(); i++) {
- pulsarServices[i] = services.get(i);
- }
+ pulsarServiceList.remove(crashedMetaStore);
crashedMetaStore.close();
- Awaitility.await()
+ pulsarClient = buildClient();
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
- int transactionMetaStoreCount2 =
Arrays.stream(pulsarServices)
+ int transactionMetaStoreCount2 = pulsarServiceList.stream()
.mapToInt(pulsarService ->
pulsarService.getTransactionMetadataStoreService().getStores().size())
.sum();
Assert.assertEquals(transactionMetaStoreCount2, 16);
});
- transactionCoordinatorClient.close();
+ pulsarClient.close();
+ }
+
+ @Test
+ public void testTransactionMetaStoreUnload() throws Exception {
+
+ pulsarClient = buildClient();
+ checkTransactionCoordinatorNum(16);
+
+ // close pulsar client will not init tc again
+ pulsarClient.close();
+
+
admin.topics().unload(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
+
+ for (int i = 0; i < 16; i++) {
+ final int f = i;
+ pulsarServiceList.forEach((pulsarService) -> pulsarService
+ .getTransactionMetadataStoreService()
+
.removeTransactionMetadataStore(TransactionCoordinatorID.get(f)));
+ }
+ checkTransactionCoordinatorNum(0);
+ buildClient();
+ checkTransactionCoordinatorNum(16);
+
+ pulsarClient.close();
+
+ }
+
+ private void checkTransactionCoordinatorNum(int number) {
+ Awaitility.await()
+ .untilAsserted(() -> {
+ int transactionMetaStoreCount = pulsarServiceList.stream()
+ .mapToInt(pulsarService ->
pulsarService.getTransactionMetadataStoreService().getStores().size())
+ .sum();
+ Assert.assertEquals(transactionMetaStoreCount, number);
+ });
+ }
+
+ private PulsarClient buildClient() throws Exception {
+ return PulsarClient.builder()
+ .serviceUrlProvider(new ServiceUrlProvider() {
+ final AtomicInteger atomicInteger = new AtomicInteger();
+ @Override
+ public void initialize(PulsarClient client) {
+
+ }
+
+ @Override
+ public String getServiceUrl() {
+ return
pulsarServiceList.get(atomicInteger.getAndIncrement() %
pulsarServiceList.size()).getBrokerServiceUrl();
+ }
+ })
+ .statsInterval(0, TimeUnit.SECONDS)
+ .enableTransaction(true)
+ .build();
+ }
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
}
}
diff --git a/pulsar-client-cpp/lib/Commands.cc
b/pulsar-client-cpp/lib/Commands.cc
index 52821ede..4d079d3 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -644,6 +644,9 @@ std::string Commands::messageType(BaseCommand_Type type) {
case BaseCommand::END_TXN_ON_SUBSCRIPTION_RESPONSE:
return "END_TXN_ON_SUBSCRIPTION_RESPONSE";
break;
+ case BaseCommand::TC_CLIENT_CONNECT:
+ return "TC_CLIENT_CONNECT";
+ break;
};
BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration
value"));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 05a5cae..cdee3d0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -36,10 +36,8 @@ import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -94,28 +92,70 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
.setMandatoryStop(100, TimeUnit.MILLISECONDS)
.create(),
this);
- this.connectionHandler.grabCnx();
this.connectFuture = connectFuture;
+ this.connectionHandler.grabCnx();
}
@Override
public void connectionFailed(PulsarClientException exception) {
LOG.error("Transaction meta handler with transaction coordinator id {}
connection failed.",
transactionCoordinatorId, exception);
- setState(State.Failed);
- this.connectFuture.completeExceptionally(exception);
+ if (!this.connectFuture.isDone()) {
+ this.connectFuture.completeExceptionally(exception);
+ }
}
@Override
public void connectionOpened(ClientCnx cnx) {
LOG.info("Transaction meta handler with transaction coordinator id {}
connection opened.",
transactionCoordinatorId);
+
+ if (getState() == State.Closing || getState() == State.Closed) {
+ setState(State.Closed);
+ failPendingRequest();
+ this.pendingRequests.clear();
+ return;
+ }
+
connectionHandler.setClientCnx(cnx);
cnx.registerTransactionMetaStoreHandler(transactionCoordinatorId,
this);
- if (!changeToReadyState()) {
- cnx.channel().close();
- }
- this.connectFuture.complete(null);
+ long requestId = client.newRequestId();
+ ByteBuf request =
Commands.newTcClientConnect(transactionCoordinatorId, requestId);
+
+ cnx.sendRequestWithId(request, requestId).thenRun(() -> {
+ LOG.info("Transaction coordinator client connect success! tcId :
{}", transactionCoordinatorId);
+ if (!changeToReadyState()) {
+ setState(State.Closed);
+ cnx.channel().close();
+ }
+
+ if (!this.connectFuture.isDone()) {
+ this.connectFuture.complete(null);
+ }
+ this.connectionHandler.resetBackoff();
+ }).exceptionally((e) -> {
+ LOG.error("Transaction coordinator client connect fail! tcId :
{}", transactionCoordinatorId, e.getCause());
+ if (getState() == State.Closing || getState() == State.Closed
+ || e.getCause() instanceof
PulsarClientException.NotAllowedException) {
+ setState(State.Closed);
+ cnx.channel().close();
+ } else {
+ connectionHandler.reconnectLater(e.getCause());
+ }
+ return null;
+ });
+ }
+
+ private void failPendingRequest() {
+ pendingRequests.keys().forEach(k -> {
+ OpBase<?> op = pendingRequests.remove(k);
+ if (op != null && !op.callback.isDone()) {
+ op.callback.completeExceptionally(new
PulsarClientException.AlreadyClosedException(
+ "Could not get response from transaction meta store
when " +
+ "the transaction meta store has already
close."));
+ onResponse(op);
+ }
+ });
}
public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit
unit) {
@@ -146,6 +186,7 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
}
return;
}
+
if (!response.hasError()) {
TxnID txnID = new TxnID(response.getTxnidMostBits(),
response.getTxnidLeastBits());
if (LOG.isDebugEnabled()) {
@@ -153,12 +194,8 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
}
op.callback.complete(txnID);
} else {
- if (response.getError() ==
ServerError.TransactionCoordinatorNotFound) {
- connectionHandler.reconnectLater(
- new
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
- }
LOG.error("Got new txn for request {} error {}",
response.getRequestId(), response.getError());
-
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
response.getMessage()));
+ handleTransactionFailOp(response.getError(),
response.getMessage(), op);
}
onResponse(op);
@@ -193,18 +230,15 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
}
return;
}
+
if (!response.hasError()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Add publish partition for request {} success.",
response.getRequestId());
}
op.callback.complete(null);
} else {
- if (response.getError() ==
ServerError.TransactionCoordinatorNotFound) {
- connectionHandler.reconnectLater(
- new
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
- }
LOG.error("Add publish partition for request {} error {}.",
response.getRequestId(), response.getError());
-
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
response.getMessage()));
+ handleTransactionFailOp(response.getError(),
response.getMessage(), op);
}
onResponse(op);
@@ -239,19 +273,16 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
}
return;
}
+
if (!response.hasError()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Add subscription to txn success for request {}.",
response.getRequestId());
}
op.callback.complete(null);
} else {
- if (response.getError() ==
ServerError.TransactionCoordinatorNotFound) {
- connectionHandler.reconnectLater(
- new
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
- }
LOG.error("Add subscription to txn failed for request {} error
{}.",
response.getRequestId(), response.getError());
-
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
response.getMessage()));
+ handleTransactionFailOp(response.getError(),
response.getMessage(), op);
}
onResponse(op);
}
@@ -285,23 +316,31 @@ public class TransactionMetaStoreHandler extends
HandlerState implements Connect
}
return;
}
+
if (!response.hasError()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got end txn response success for request {}",
response.getRequestId());
}
op.callback.complete(null);
} else {
- if (response.getError() ==
ServerError.TransactionCoordinatorNotFound) {
- connectionHandler.reconnectLater(
- new
TransactionCoordinatorClientException.CoordinatorNotFoundException(response.getMessage()));
- }
LOG.error("Got end txn response for request {} error {}",
response.getRequestId(), response.getError());
-
op.callback.completeExceptionally(getExceptionByServerError(response.getError(),
response.getMessage()));
+ handleTransactionFailOp(response.getError(),
response.getMessage(), op);
}
onResponse(op);
}
+ private void handleTransactionFailOp(ServerError error, String message,
OpBase<?> op) {
+ if (error == ServerError.TransactionCoordinatorNotFound && getState()
!= State.Connecting) {
+ connectionHandler.reconnectLater(new
TransactionCoordinatorClientException
+ .CoordinatorNotFoundException(message));
+ }
+
+ if (op != null) {
+ op.callback.completeExceptionally(getExceptionByServerError(error,
message));
+ }
+ }
+
private static abstract class OpBase<T> {
protected ByteBuf cmd;
protected CompletableFuture<T> callback;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
index 5656404..2aa9e12 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
@@ -47,4 +47,8 @@ public class EventsTopicNames {
public static boolean checkTopicIsEventsNames(TopicName topicName) {
return
EVENTS_TOPIC_NAMES.contains(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
}
+
+ public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName
topicName) {
+ return
TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString());
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2453eb8..e2d2c99 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.common.api.proto.BaseCommand;
@@ -593,6 +594,12 @@ public class Commands {
return serializeWithSize(cmd);
}
+ public static ByteBuf newTcClientConnect(long tcId, long requestId) {
+ BaseCommand cmd = localCmd(Type.TC_CLIENT_CONNECT);
+ cmd.setTcClientConnect().setTcId(tcId).setRequestId(requestId);
+ return serializeWithSize(cmd);
+ }
+
private static KeySharedMode
convertKeySharedMode(org.apache.pulsar.client.api.KeySharedMode mode) {
switch (mode) {
@@ -1226,10 +1233,14 @@ public class Commands {
public static ByteBuf newAddPartitionToTxnResponse(long requestId, long
txnIdMostBits, ServerError error,
String errorMsg) {
BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN_RESPONSE);
- cmd.setAddPartitionToTxnResponse()
+ CommandAddPartitionToTxnResponse response =
cmd.setAddPartitionToTxnResponse()
.setRequestId(requestId)
.setError(error)
.setTxnidMostBits(txnIdMostBits);
+
+ if (errorMsg != null) {
+ response.setMessage(errorMsg);
+ }
return serializeWithSize(cmd);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 792746b..700a13e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.haproxy.HAProxyMessage;
-
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAckResponse;
@@ -75,6 +74,7 @@ import org.apache.pulsar.common.api.proto.CommandSendError;
import org.apache.pulsar.common.api.proto.CommandSendReceipt;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSuccess;
+import org.apache.pulsar.common.api.proto.CommandTcClientConnect;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.intercept.InterceptException;
@@ -361,6 +361,11 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
handleAuthResponse(cmd.getAuthResponse());
break;
+ case TC_CLIENT_CONNECT:
+ checkArgument(cmd.hasTcClientConnect());
+ handleTcClientConnect(cmd.getTcClientConnect());
+ break;
+
case NEW_TXN:
checkArgument(cmd.hasNewTxn());
handleNewTxn(cmd.getNewTxn());
@@ -602,6 +607,10 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
+ protected void handleTcClientConnect(CommandTcClientConnect
tcClientConnect) {
+ throw new UnsupportedOperationException();
+ }
+
protected void handleNewTxn(CommandNewTxn commandNewTxn) {
throw new UnsupportedOperationException();
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 320c51c..d1a9697 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -762,6 +762,11 @@ enum TxnAction {
ABORT = 1;
}
+message CommandTcClientConnect {
+ required uint64 request_id = 1;
+ required uint64 tc_id = 2 [default = 0];
+}
+
message CommandNewTxn {
required uint64 request_id = 1;
optional uint64 txn_ttl_seconds = 2 [default = 0];
@@ -941,6 +946,7 @@ message BaseCommand {
END_TXN_ON_SUBSCRIPTION = 60;
END_TXN_ON_SUBSCRIPTION_RESPONSE = 61;
+ TC_CLIENT_CONNECT = 62;
}
@@ -1016,4 +1022,5 @@ message BaseCommand {
optional CommandEndTxnOnPartitionResponse endTxnOnPartitionResponse = 59;
optional CommandEndTxnOnSubscription endTxnOnSubscription = 60;
optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse
= 61;
+ optional CommandTcClientConnect tcClientConnect = 62;
}