This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3089aa4040cc6c42c05898fab00c98332a9cb395 Author: Xiangying Meng <[email protected]> AuthorDate: Fri Feb 25 16:15:53 2022 +0800 [Transaction] Fix end transaction at state of timeout (#14370) ### Motivation For concurrency problems, timeout may change the status to timeout before commit/abort changes the status to committing/aborting. ### Modification Cancel timeout when commit or abort and then check the state. (cherry picked from commit 4b480450f32dc6ce5337d0b3d68a35111ddf474e) --- .../org/apache/pulsar/client/impl/transaction/TransactionImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 8adc162..bba5331 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -165,10 +165,10 @@ public class TransactionImpl implements Transaction , TimerTask { @Override public CompletableFuture<Void> commit() { + timeout.cancel(); return checkIfOpenOrCommitting().thenCompose((value) -> { CompletableFuture<Void> commitFuture = new CompletableFuture<>(); this.state = State.COMMITTING; - timeout.cancel(); allOpComplete().whenComplete((v, e) -> { if (e != null) { abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e)); @@ -194,10 +194,10 @@ public class TransactionImpl implements Transaction , TimerTask { @Override public CompletableFuture<Void> abort() { + timeout.cancel(); return checkIfOpenOrAborting().thenCompose(value -> { CompletableFuture<Void> abortFuture = new CompletableFuture<>(); this.state = State.ABORTING; - timeout.cancel(); allOpComplete().whenComplete((v, e) -> { if (e != null) { log.error(e.getMessage());
