This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 22a24d62e8d4a0da43f6b28ff911696a515b6e0b Author: Xiangying Meng <[email protected]> AuthorDate: Wed Dec 15 13:41:02 2021 +0800 [Transaction] Allow transaction be commit or abort in the state of aborting or committing. (#13323) Due to operations such as concurrency, retry, and timeout abort. We should allow `abort` or `commit` in the state of aborting or committing Allow `abort` or `commit` in the state of aborting or committing (cherry picked from commit d220c21a601a9a2c97ee3434b3b44b4bd9b8adc1) --- .../broker/namespace/NamespaceServiceTest.java | 1 - .../pulsar/broker/transaction/TransactionTest.java | 26 ++++++++++++++++++ .../client/impl/transaction/TransactionImpl.java | 31 ++++++++++++++++++---- 3 files changed, 52 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index d45dcc2..bb33ac8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -66,7 +66,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.Policies; -import org.apache.pulsar.common.policies.data.impl.BundlesDataImpl.BundlesDataImplBuilder; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.GetResult; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 3d0c406..90ddc4b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -76,6 +76,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -560,4 +561,29 @@ public class TransactionTest extends TransactionTestBase { Awaitility.await().untilAsserted(() -> assertEquals(metadataStore2.getCoordinatorStats().state, "Ready")); } + + @Test + public void testEndTxnWhenCommittingOrAborting() throws Exception { + Transaction commitTxn = pulsarClient + .newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + Transaction abortTxn = pulsarClient + .newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + + Class<TransactionImpl> transactionClass = TransactionImpl.class; + Field field = transactionClass.getDeclaredField("state"); + field.setAccessible(true); + + field.set(commitTxn, TransactionImpl.State.COMMITTING); + field.set(abortTxn, TransactionImpl.State.ABORTING); + + abortTxn.abort(); + commitTxn.commit(); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 60c7829..458976a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -144,7 +144,7 @@ public class TransactionImpl implements Transaction { @Override public CompletableFuture<Void> commit() { - return checkIfOpen().thenCompose((value) -> { + return checkIfOpenOrCommitting().thenCompose((value) -> { CompletableFuture<Void> commitFuture = new CompletableFuture<>(); this.state = State.COMMITTING; allOpComplete().whenComplete((v, e) -> { @@ -172,7 +172,7 @@ public class TransactionImpl implements Transaction { @Override public CompletableFuture<Void> abort() { - return checkIfOpen().thenCompose(value -> { + return checkIfOpenOrAborting().thenCompose(value -> { CompletableFuture<Void> abortFuture = new CompletableFuture<>(); this.state = State.ABORTING; allOpComplete().whenComplete((v, e) -> { @@ -217,12 +217,33 @@ public class TransactionImpl implements Transaction { if (state == State.OPEN) { return CompletableFuture.completedFuture(null); } else { - return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + txnIdMostBits + ":" - + txnIdLeastBits + "] with unexpected state : " - + state.name() + ", expect " + State.OPEN + " state!")); + return invalidTxnStatusFuture(); + } + } + + private CompletableFuture<Void> checkIfOpenOrCommitting() { + if (state == State.OPEN || state == State.COMMITTING) { + return CompletableFuture.completedFuture(null); + } else { + return invalidTxnStatusFuture(); + } + } + + private CompletableFuture<Void> checkIfOpenOrAborting() { + if (state == State.OPEN || state == State.ABORTING) { + return CompletableFuture.completedFuture(null); + } else { + return invalidTxnStatusFuture(); } } + private CompletableFuture<Void> invalidTxnStatusFuture() { + return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + txnIdMostBits + ":" + + txnIdLeastBits + "] with unexpected state : " + + state.name() + ", expect " + State.OPEN + " state!")); + } + + private CompletableFuture<Void> allOpComplete() { List<CompletableFuture<?>> futureList = new ArrayList<>(); futureList.addAll(sendFutureList);
