This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 1614e365748 [improve][txn][branch-2.11] Add getState in transaction 
for client API (#17423) (#19834)
1614e365748 is described below

commit 1614e365748673c6dde04e3107ac59e97b4113e0
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Mar 16 23:12:21 2023 +0800

    [improve][txn][branch-2.11] Add getState in transaction for client API 
(#17423) (#19834)
    
    Co-authored-by: congbo <[email protected]>
---
 .../pulsar/broker/transaction/TransactionTest.java | 48 ++++++++++++++++++
 .../client/impl/TransactionEndToEndTest.java       |  2 +-
 .../pulsar/client/api/transaction/Transaction.java | 57 ++++++++++++++++++++++
 .../client/impl/transaction/TransactionImpl.java   | 17 +++----
 4 files changed, 112 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ccdca13c996..5aafbb25060 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1495,4 +1495,52 @@ public class TransactionTest extends TransactionTestBase 
{
                     .send();
         txn.commit();
     }
+
+    @Test
+    public void testGetTxnState() throws Exception {
+        Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+                .build().get();
+
+        // test OPEN and TIMEOUT
+        assertEquals(transaction.getState(), Transaction.State.OPEN);
+        Transaction timeoutTxn = transaction;
+        Awaitility.await().until(() -> timeoutTxn.getState() == 
Transaction.State.TIME_OUT);
+
+        // test abort
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, 
TimeUnit.SECONDS)
+                .build().get();
+        transaction.abort().get();
+        assertEquals(transaction.getState(), Transaction.State.ABORTED);
+
+        // test commit
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, 
TimeUnit.SECONDS)
+                .build().get();
+        transaction.commit().get();
+        assertEquals(transaction.getState(), Transaction.State.COMMITTED);
+
+        // test error
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(1, 
TimeUnit.SECONDS)
+                .build().get();
+        pulsarServiceList.get(0).getTransactionMetadataStoreService()
+                .endTransaction(transaction.getTxnID(), 0, false);
+        transaction.commit();
+        Transaction errorTxn = transaction;
+        Awaitility.await().until(() -> errorTxn.getState() == 
Transaction.State.ERROR);
+
+        // test committing
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, 
TimeUnit.SECONDS)
+                .build().get();
+        ((TransactionImpl) transaction).registerSendOp(new 
CompletableFuture<>());
+        transaction.commit();
+        Transaction committingTxn = transaction;
+        Awaitility.await().until(() -> committingTxn.getState() == 
Transaction.State.COMMITTING);
+
+        // test aborting
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(3, 
TimeUnit.SECONDS)
+                .build().get();
+        ((TransactionImpl) transaction).registerSendOp(new 
CompletableFuture<>());
+        transaction.abort();
+        Transaction abortingTxn = transaction;
+        Awaitility.await().until(() -> abortingTxn.getState() == 
Transaction.State.ABORTING);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index bc56eab6bc1..454edebe6f0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1186,7 +1186,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .build().get();
         producer.newMessage().send();
         Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(((TransactionImpl)transaction).getState(), 
TransactionImpl.State.TIMEOUT);
+            Assert.assertEquals(((TransactionImpl)transaction).getState(), 
TransactionImpl.State.TIME_OUT);
         });
 
         try {
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
index fd4cf0bc166..33e96d5c276 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
@@ -29,6 +29,55 @@ import 
org.apache.pulsar.common.classification.InterfaceStability;
 @InterfaceStability.Evolving
 public interface Transaction {
 
+    enum State {
+
+        /**
+         * When a transaction is in the `OPEN` state, messages can be produced 
and acked with this transaction.
+         *
+         * When a transaction is in the `OPEN` state, it can commit or abort.
+         */
+        OPEN,
+
+        /**
+         * When a client invokes a commit, the transaction state is changed 
from `OPEN` to `COMMITTING`.
+         */
+        COMMITTING,
+
+        /**
+         * When a client invokes an abort, the transaction state is changed 
from `OPEN` to `ABORTING`.
+         */
+        ABORTING,
+
+        /**
+         * When a client receives a response to a commit, the transaction 
state is changed from
+         * `COMMITTING` to `COMMITTED`.
+         */
+        COMMITTED,
+
+        /**
+         * When a client receives a response to an abort, the transaction 
state is changed from `ABORTING` to `ABORTED`.
+         */
+        ABORTED,
+
+        /**
+         * When a client invokes a commit or an abort, but a transaction does 
not exist in a coordinator,
+         * then the state is changed to `ERROR`.
+         *
+         * When a client invokes a commit, but the transaction state in a 
coordinator is `ABORTED` or `ABORTING`,
+         * then the state is changed to `ERROR`.
+         *
+         * When a client invokes an abort, but the transaction state in a 
coordinator is `COMMITTED` or `COMMITTING`,
+         * then the state is changed to `ERROR`.
+         */
+        ERROR,
+
+        /**
+         * When a transaction is timed out and the transaction state is `OPEN`,
+         * then the transaction state is changed from `OPEN` to `TIME_OUT`.
+         */
+        TIME_OUT
+    }
+
     /**
      * Commit the transaction.
      *
@@ -48,4 +97,12 @@ public interface Transaction {
      *  @return {@link TxnID} the txnID.
      */
     TxnID getTxnID();
+
+    /**
+     * Get transaction state.
+     *
+     * @return {@link State} the state of the transaction.
+     */
+    State getState();
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 55b20438693..833b0957d1c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction , 
TimerTask {
 
     @Override
     public void run(Timeout timeout) throws Exception {
-        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
-    }
-
-    public enum State {
-        OPEN,
-        COMMITTING,
-        ABORTING,
-        COMMITTED,
-        ABORTED,
-        ERROR,
-        TIMEOUT
+        STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
     }
 
     TransactionImpl(PulsarClientImpl client,
@@ -215,6 +205,11 @@ public class TransactionImpl implements Transaction , 
TimerTask {
         return new TxnID(txnIdMostBits, txnIdLeastBits);
     }
 
+    @Override
+    public State getState() {
+        return state;
+    }
+
     public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
         if (state == State.OPEN) {
             return true;

Reply via email to