This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 1614e365748 [improve][txn][branch-2.11] Add getState in transaction
for client API (#17423) (#19834)
1614e365748 is described below
commit 1614e365748673c6dde04e3107ac59e97b4113e0
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Mar 16 23:12:21 2023 +0800
[improve][txn][branch-2.11] Add getState in transaction for client API
(#17423) (#19834)
Co-authored-by: congbo <[email protected]>
---
.../pulsar/broker/transaction/TransactionTest.java | 48 ++++++++++++++++++
.../client/impl/TransactionEndToEndTest.java | 2 +-
.../pulsar/client/api/transaction/Transaction.java | 57 ++++++++++++++++++++++
.../client/impl/transaction/TransactionImpl.java | 17 +++----
4 files changed, 112 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index ccdca13c996..5aafbb25060 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1495,4 +1495,52 @@ public class TransactionTest extends TransactionTestBase
{
.send();
txn.commit();
}
+
+ @Test
+ public void testGetTxnState() throws Exception {
+ Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+ .build().get();
+
+ // test OPEN and TIMEOUT
+ assertEquals(transaction.getState(), Transaction.State.OPEN);
+ Transaction timeoutTxn = transaction;
+ Awaitility.await().until(() -> timeoutTxn.getState() ==
Transaction.State.TIME_OUT);
+
+ // test abort
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3,
TimeUnit.SECONDS)
+ .build().get();
+ transaction.abort().get();
+ assertEquals(transaction.getState(), Transaction.State.ABORTED);
+
+ // test commit
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3,
TimeUnit.SECONDS)
+ .build().get();
+ transaction.commit().get();
+ assertEquals(transaction.getState(), Transaction.State.COMMITTED);
+
+ // test error
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(1,
TimeUnit.SECONDS)
+ .build().get();
+ pulsarServiceList.get(0).getTransactionMetadataStoreService()
+ .endTransaction(transaction.getTxnID(), 0, false);
+ transaction.commit();
+ Transaction errorTxn = transaction;
+ Awaitility.await().until(() -> errorTxn.getState() ==
Transaction.State.ERROR);
+
+ // test committing
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3,
TimeUnit.SECONDS)
+ .build().get();
+ ((TransactionImpl) transaction).registerSendOp(new
CompletableFuture<>());
+ transaction.commit();
+ Transaction committingTxn = transaction;
+ Awaitility.await().until(() -> committingTxn.getState() ==
Transaction.State.COMMITTING);
+
+ // test aborting
+ transaction = pulsarClient.newTransaction().withTransactionTimeout(3,
TimeUnit.SECONDS)
+ .build().get();
+ ((TransactionImpl) transaction).registerSendOp(new
CompletableFuture<>());
+ transaction.abort();
+ Transaction abortingTxn = transaction;
+ Awaitility.await().until(() -> abortingTxn.getState() ==
Transaction.State.ABORTING);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index bc56eab6bc1..454edebe6f0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1186,7 +1186,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.build().get();
producer.newMessage().send();
Awaitility.await().untilAsserted(() -> {
- Assert.assertEquals(((TransactionImpl)transaction).getState(),
TransactionImpl.State.TIMEOUT);
+ Assert.assertEquals(((TransactionImpl)transaction).getState(),
TransactionImpl.State.TIME_OUT);
});
try {
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
index fd4cf0bc166..33e96d5c276 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/Transaction.java
@@ -29,6 +29,55 @@ import
org.apache.pulsar.common.classification.InterfaceStability;
@InterfaceStability.Evolving
public interface Transaction {
+ enum State {
+
+ /**
+ * When a transaction is in the `OPEN` state, messages can be produced
and acked with this transaction.
+ *
+ * When a transaction is in the `OPEN` state, it can commit or abort.
+ */
+ OPEN,
+
+ /**
+ * When a client invokes a commit, the transaction state is changed
from `OPEN` to `COMMITTING`.
+ */
+ COMMITTING,
+
+ /**
+ * When a client invokes an abort, the transaction state is changed
from `OPEN` to `ABORTING`.
+ */
+ ABORTING,
+
+ /**
+ * When a client receives a response to a commit, the transaction
state is changed from
+ * `COMMITTING` to `COMMITTED`.
+ */
+ COMMITTED,
+
+ /**
+ * When a client receives a response to an abort, the transaction
state is changed from `ABORTING` to `ABORTED`.
+ */
+ ABORTED,
+
+ /**
+ * When a client invokes a commit or an abort, but a transaction does
not exist in a coordinator,
+ * then the state is changed to `ERROR`.
+ *
+ * When a client invokes a commit, but the transaction state in a
coordinator is `ABORTED` or `ABORTING`,
+ * then the state is changed to `ERROR`.
+ *
+ * When a client invokes an abort, but the transaction state in a
coordinator is `COMMITTED` or `COMMITTING`,
+ * then the state is changed to `ERROR`.
+ */
+ ERROR,
+
+ /**
+ * When a transaction is timed out and the transaction state is `OPEN`,
+ * then the transaction state is changed from `OPEN` to `TIME_OUT`.
+ */
+ TIME_OUT
+ }
+
/**
* Commit the transaction.
*
@@ -48,4 +97,12 @@ public interface Transaction {
* @return {@link TxnID} the txnID.
*/
TxnID getTxnID();
+
+ /**
+ * Get transaction state.
+ *
+ * @return {@link State} the state of the transaction.
+ */
+ State getState();
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
index 55b20438693..833b0957d1c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
@@ -70,17 +70,7 @@ public class TransactionImpl implements Transaction ,
TimerTask {
@Override
public void run(Timeout timeout) throws Exception {
- STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
- }
-
- public enum State {
- OPEN,
- COMMITTING,
- ABORTING,
- COMMITTED,
- ABORTED,
- ERROR,
- TIMEOUT
+ STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIME_OUT);
}
TransactionImpl(PulsarClientImpl client,
@@ -215,6 +205,11 @@ public class TransactionImpl implements Transaction ,
TimerTask {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}
+ @Override
+ public State getState() {
+ return state;
+ }
+
public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
if (state == State.OPEN) {
return true;