eolivelli commented on a change in pull request #11357:
URL: https://github.com/apache/pulsar/pull/11357#discussion_r672005966
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -84,94 +91,111 @@ public
TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.tbClient = tbClient;
this.timeoutTrackerFactory = new
TransactionTimeoutTrackerFactoryImpl(this, timer);
this.transactionOpRetryTimer = timer;
+ this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
+ this.pendingConnectRequests = new ConcurrentLongHashMap<>();
}
- public void start() {
-
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new
NamespaceBundleOwnershipListener() {
- @Override
- public void onLoad(NamespaceBundle bundle) {
-
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
-
addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
- }
+ public CompletableFuture<Void>
handleTcClientConnect(TransactionCoordinatorID tcId) {
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+ .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)
tcId.getId()).toString()).thenCompose(v -> {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+ .computeIfAbsent(tcId.getId(), (id) -> new
Semaphore(1));
+ Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+ .computeIfAbsent(tcId.getId(), (id) -> new
ConcurrentLinkedDeque<>());
+ if (tcLoadSemaphore.tryAcquire()) {
+ // when tcLoadSemaphore.release(), this command will
acquire semaphore, so we should jude the store
+ // exist again.
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ openTransactionMetadataStore(tcId).thenAccept((store) -> {
+ stores.put(tcId, store);
+ LOG.info("Added new transaction meta store {}", tcId);
+ while (true) {
+ CompletableFuture<Void> future = deque.poll();
Review comment:
This is a busy loop.
Can we add a timeout to poll? This way the thread may suspend
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1867,21 +1903,34 @@ protected void handleNewTxn(CommandNewTxn command) {
if (log.isDebugEnabled()) {
log.debug("Send response error for new txn request
{}", requestId, ex);
}
+
ctx.writeAndFlush(Commands.newTxnResponse(requestId,
tcId.getId(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
+ transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
}
@Override
protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(),
command.getTxnidLeastBits());
+ final TransactionCoordinatorID tcId =
TransactionCoordinatorID.get(command.getTxnidMostBits());
final long requestId = command.getRequestId();
if (log.isDebugEnabled()) {
command.getPartitionsList().forEach(partion ->
log.debug("Receive add published partition to txn request
{} "
+ "from {} with txnId {}, topic: [{}]", requestId,
remoteAddress, txnID, partion));
}
+
+ if
(!service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
+ BrokerServiceException.NotAllowedException ex =
Review comment:
Can we create a method for this code that is repeated more times
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -400,13 +432,14 @@ public void endTransactionForTimeout(TxnID txnID) {
}
private static boolean isRetryableException(Throwable e) {
- return e instanceof TransactionMetadataStoreStateException
+ return (e instanceof TransactionMetadataStoreStateException
|| e instanceof RequestTimeoutException
|| e instanceof ManagedLedgerException
|| e instanceof BrokerPersistenceException
|| e instanceof LookupException
|| e instanceof ReachMaxPendingOpsException
- || e instanceof ConnectException;
+ || e instanceof ConnectException)
+ && !(e instanceof
ManagedLedgerException.ManagedLedgerFencedException);
Review comment:
How is it possible that an object is an instance of the list of classes
above and of this class?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1675,6 +1677,17 @@ public boolean isTopicNsOwnedByBroker(TopicName
topicName) {
: CompletableFuture.completedFuture(null)));
}
});
+ if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
+ &&
serviceUnit.getNamespaceObject().toString().equals(NamespaceName.SYSTEM_NAMESPACE.toString()))
{
Review comment:
It is better to not compare on the result of toString.
It may result to unpredictable results.
It is better to add an explicit method to verify the equality
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -84,94 +91,111 @@ public
TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.tbClient = tbClient;
this.timeoutTrackerFactory = new
TransactionTimeoutTrackerFactoryImpl(this, timer);
this.transactionOpRetryTimer = timer;
+ this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
+ this.pendingConnectRequests = new ConcurrentLongHashMap<>();
}
- public void start() {
-
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new
NamespaceBundleOwnershipListener() {
- @Override
- public void onLoad(NamespaceBundle bundle) {
-
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
-
addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
- }
+ public CompletableFuture<Void>
handleTcClientConnect(TransactionCoordinatorID tcId) {
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+ .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)
tcId.getId()).toString()).thenCompose(v -> {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+ .computeIfAbsent(tcId.getId(), (id) -> new
Semaphore(1));
+ Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+ .computeIfAbsent(tcId.getId(), (id) -> new
ConcurrentLinkedDeque<>());
+ if (tcLoadSemaphore.tryAcquire()) {
+ // when tcLoadSemaphore.release(), this command will
acquire semaphore, so we should jude the store
+ // exist again.
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ openTransactionMetadataStore(tcId).thenAccept((store) -> {
+ stores.put(tcId, store);
+ LOG.info("Added new transaction meta store {}", tcId);
+ while (true) {
+ CompletableFuture<Void> future = deque.poll();
+ if (future != null) {
+ // complete queue request future
+ future.complete(null);
+ } else {
+ break;
}
- } else {
- LOG.error("Failed to get owned topic list when
triggering on-loading bundle {}.",
- bundle, ex);
}
- });
- }
- @Override
- public void unLoad(NamespaceBundle bundle) {
-
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
- removeTransactionMetadataStore(
-
TransactionCoordinatorID.get(name.getPartitionIndex()));
- }
+
+ completableFuture.complete(null);
+ tcLoadSemaphore.release();
+ }).exceptionally(e -> {
+ completableFuture.completeExceptionally(e.getCause());
+ // release before handle request queue, in order to
client reconnect infinite loop
+ tcLoadSemaphore.release();
+
+ while (true) {
+ CompletableFuture<Void> future = deque.poll();
+ if (future != null) {
+ // this means that this tc client connection
connect fail
+ future.completeExceptionally(e);
+ } else {
+ break;
}
- } else {
- LOG.error("Failed to get owned topic list error
when triggering un-loading bundle {}.",
- bundle, ex);
}
- });
- }
- @Override
- public boolean test(NamespaceBundle namespaceBundle) {
- return
namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
- }
- });
- }
-
- public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
- pulsarService.getBrokerService()
-
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX
+ tcId))
- .whenComplete((v, e) -> {
- if (e != null) {
LOG.error("Add transaction metadata store with id {}
error", tcId.getId(), e);
- } else {
- TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
- TransactionRecoverTracker recoverTracker =
- new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
- timeoutTracker, tcId.getId());
- transactionMetadataStoreProvider.openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
- timeoutTracker, recoverTracker)
- .whenComplete((store, ex) -> {
- if (ex != null) {
- LOG.error("Add transaction metadata
store with id {} error", tcId.getId(), ex);
- } else {
- stores.put(tcId, store);
- LOG.info("Added new transaction meta
store {}", tcId);
- }
- });
- }
- });
- }
-
- public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
- TransactionMetadataStore metadataStore = stores.remove(tcId);
- if (metadataStore != null) {
- metadataStore.closeAsync().whenComplete((v, ex) -> {
- if (ex != null) {
- LOG.error("Close transaction metadata store with id " +
tcId, ex);
+ return null;
+ });
} else {
- LOG.info("Removed and closed transaction meta store {}",
tcId);
+ // only one command can open transaction metadata store,
+ // other will be added to the deque, when the op of
openTransactionMetadataStore finished
+ // then handle the requests witch in the queue
+ deque.add(completableFuture);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Handle tc client connect added into pending
queue! tcId : {}", tcId.toString());
+ }
}
+ return completableFuture;
});
}
}
+ public CompletableFuture<TransactionMetadataStore>
openTransactionMetadataStore(TransactionCoordinatorID tcId) {
+ return pulsarService.getBrokerService()
+ .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl
+ .TRANSACTION_LOG_PREFIX + tcId)).thenCompose(v -> {
+ TransactionTimeoutTracker timeoutTracker =
timeoutTrackerFactory.newTracker(tcId);
+ TransactionRecoverTracker recoverTracker =
+ new
TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+ timeoutTracker, tcId.getId());
+ return transactionMetadataStoreProvider
+ .openStore(tcId,
pulsarService.getManagedLedgerFactory(), v,
+ timeoutTracker, recoverTracker);
+ });
+ }
+
+ public CompletableFuture<Void>
removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
+ final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+ .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
+ if (tcLoadSemaphore.tryAcquire()) {
Review comment:
Why can't we wait (or, better, reschedule a trial)?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -84,94 +91,111 @@ public
TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.tbClient = tbClient;
this.timeoutTrackerFactory = new
TransactionTimeoutTrackerFactoryImpl(this, timer);
this.transactionOpRetryTimer = timer;
+ this.tcLoadSemaphores = new ConcurrentLongHashMap<>();
+ this.pendingConnectRequests = new ConcurrentLongHashMap<>();
}
- public void start() {
-
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new
NamespaceBundleOwnershipListener() {
- @Override
- public void onLoad(NamespaceBundle bundle) {
-
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
-
addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
- }
+ public CompletableFuture<Void>
handleTcClientConnect(TransactionCoordinatorID tcId) {
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return
pulsarService.getBrokerService().checkTopicNsOwnership(TopicName
+ .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)
tcId.getId()).toString()).thenCompose(v -> {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
+ .computeIfAbsent(tcId.getId(), (id) -> new
Semaphore(1));
+ Deque<CompletableFuture<Void>> deque = pendingConnectRequests
+ .computeIfAbsent(tcId.getId(), (id) -> new
ConcurrentLinkedDeque<>());
+ if (tcLoadSemaphore.tryAcquire()) {
+ // when tcLoadSemaphore.release(), this command will
acquire semaphore, so we should jude the store
+ // exist again.
+ if (stores.get(tcId) != null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ openTransactionMetadataStore(tcId).thenAccept((store) -> {
+ stores.put(tcId, store);
+ LOG.info("Added new transaction meta store {}", tcId);
+ while (true) {
+ CompletableFuture<Void> future = deque.poll();
+ if (future != null) {
+ // complete queue request future
+ future.complete(null);
+ } else {
+ break;
}
- } else {
- LOG.error("Failed to get owned topic list when
triggering on-loading bundle {}.",
- bundle, ex);
}
- });
- }
- @Override
- public void unLoad(NamespaceBundle bundle) {
-
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
- .whenComplete((topics, ex) -> {
- if (ex == null) {
- for (String topic : topics) {
- TopicName name = TopicName.get(topic);
- if
(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
-
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
- && name.isPartitioned()) {
- removeTransactionMetadataStore(
-
TransactionCoordinatorID.get(name.getPartitionIndex()));
- }
+
+ completableFuture.complete(null);
+ tcLoadSemaphore.release();
+ }).exceptionally(e -> {
+ completableFuture.completeExceptionally(e.getCause());
+ // release before handle request queue, in order to
client reconnect infinite loop
+ tcLoadSemaphore.release();
+
+ while (true) {
+ CompletableFuture<Void> future = deque.poll();
Review comment:
The same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]