codelipenghui commented on a change in pull request #13957:
URL: https://github.com/apache/pulsar/pull/13957#discussion_r803238846
##########
File path:
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
##########
@@ -87,94 +87,109 @@ public MLTransactionMetadataStore(TransactionCoordinatorID
tcID,
this.abortedTransactionCount = new LongAdder();
this.transactionTimeoutCount = new LongAdder();
this.appendLogCount = new LongAdder();
+ }
+ public CompletableFuture<TransactionMetadataStore>
init(TransactionRecoverTracker recoverTracker) {
+ CompletableFuture<TransactionMetadataStore> completableFuture = new
CompletableFuture<>();
if (!changeToInitializingState()) {
log.error("Managed ledger transaction metadata store change state
error when init it");
- return;
- }
- new Thread(() -> transactionLog.replayAsync(new
TransactionLogReplayCallback() {
-
- @Override
- public void replayComplete() {
- recoverTracker.appendOpenTransactionToTimeoutTracker();
- if (!changeToReadyState()) {
- log.error("Managed ledger transaction metadata store
change state error when replay complete");
- } else {
- recoverTracker.handleCommittingAndAbortingTransaction();
- timeoutTracker.start();
+ completableFuture
+ .completeExceptionally(new
TransactionCoordinatorClientException
+ .CoordinatorNotFoundException("transaction metadata store
with tcId "
+ + tcID.toString() + " change state to Initializing
error when init it"));
+ } else {
+ new Thread(() -> transactionLog.replayAsync(new
TransactionLogReplayCallback() {
Review comment:
+1 we already introduced the transaction thread pool, @liangyepianzhou
is there any problem if using the transaction thread pool?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]