This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 22a24d62e8d4a0da43f6b28ff911696a515b6e0b
Author: Xiangying Meng <[email protected]>
AuthorDate: Wed Dec 15 13:41:02 2021 +0800

    [Transaction] Allow transaction be commit or abort in the state of aborting 
or committing. (#13323)
    
    Due to operations such as concurrency, retry, and timeout abort. We should 
allow  `abort` or `commit` in the state of aborting or committing
    Allow `abort` or `commit` in the state of aborting or committing
    
    (cherry picked from commit d220c21a601a9a2c97ee3434b3b44b4bd9b8adc1)
---
 .../broker/namespace/NamespaceServiceTest.java     |  1 -
 .../pulsar/broker/transaction/TransactionTest.java | 26 ++++++++++++++++++
 .../client/impl/transaction/TransactionImpl.java   | 31 ++++++++++++++++++----
 3 files changed, 52 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index d45dcc2..bb33ac8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -66,7 +66,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
-import 
org.apache.pulsar.common.policies.data.impl.BundlesDataImpl.BundlesDataImplBuilder;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.GetResult;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 3d0c406..90ddc4b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -76,6 +76,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -560,4 +561,29 @@ public class TransactionTest extends TransactionTestBase {
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, 
"Ready"));
     }
+
+    @Test
+    public void testEndTxnWhenCommittingOrAborting() throws Exception {
+        Transaction commitTxn = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+        Transaction abortTxn = pulsarClient
+                .newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build()
+                .get();
+
+        Class<TransactionImpl> transactionClass = TransactionImpl.class;
+        Field field = transactionClass.getDeclaredField("state");
+        field.setAccessible(true);
+
+        field.set(commitTxn, TransactionImpl.State.COMMITTING);
+        field.set(abortTxn, TransactionImpl.State.ABORTING);
+
+        abortTxn.abort();
+        commitTxn.commit();
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 60c7829..458976a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -144,7 +144,7 @@ public class TransactionImpl implements Transaction {
 
     @Override
     public CompletableFuture<Void> commit() {
-        return checkIfOpen().thenCompose((value) -> {
+        return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
             this.state = State.COMMITTING;
             allOpComplete().whenComplete((v, e) -> {
@@ -172,7 +172,7 @@ public class TransactionImpl implements Transaction {
 
     @Override
     public CompletableFuture<Void> abort() {
-        return checkIfOpen().thenCompose(value -> {
+        return checkIfOpenOrAborting().thenCompose(value -> {
             CompletableFuture<Void> abortFuture = new CompletableFuture<>();
             this.state = State.ABORTING;
             allOpComplete().whenComplete((v, e) -> {
@@ -217,12 +217,33 @@ public class TransactionImpl implements Transaction {
         if (state == State.OPEN) {
             return CompletableFuture.completedFuture(null);
         } else {
-            return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + 
txnIdMostBits + ":"
-                    + txnIdLeastBits + "] with unexpected state : "
-                    + state.name() + ", expect " + State.OPEN + " state!"));
+            return invalidTxnStatusFuture();
+        }
+    }
+
+    private CompletableFuture<Void> checkIfOpenOrCommitting() {
+        if (state == State.OPEN || state == State.COMMITTING) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return invalidTxnStatusFuture();
+        }
+    }
+
+    private CompletableFuture<Void> checkIfOpenOrAborting() {
+        if (state == State.OPEN || state == State.ABORTING) {
+            return CompletableFuture.completedFuture(null);
+        } else {
+            return invalidTxnStatusFuture();
         }
     }
 
+    private CompletableFuture<Void> invalidTxnStatusFuture() {
+        return FutureUtil.failedFuture(new InvalidTxnStatusException("[" + 
txnIdMostBits + ":"
+                + txnIdLeastBits + "] with unexpected state : "
+                + state.name() + ", expect " + State.OPEN + " state!"));
+    }
+
+
     private CompletableFuture<Void> allOpComplete() {
         List<CompletableFuture<?>> futureList = new ArrayList<>();
         futureList.addAll(sendFutureList);

Reply via email to