This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b3d994d8d IGNITE-20779 Transaction operation might throw no exception 
on enlist… (#2868)
0b3d994d8d is described below

commit 0b3d994d8d5a7afb0d676e084175afd0649f7f16
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Nov 23 21:05:06 2023 +0300

    IGNITE-20779 Transaction operation might throw no exception on enlist… 
(#2868)
---
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  24 +++--
 .../ignite/internal/tx/impl/TxManagerImpl.java     |  33 +++---
 .../tx/impl/ReadWriteTransactionImplTest.java      | 112 ++++++++++++++++++++-
 3 files changed, 141 insertions(+), 28 deletions(-)

diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index a60f013a4d..2d64411a78 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.tx.impl;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.TxState.isFinalState;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
 
@@ -95,12 +96,12 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId 
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
-        checkEnlistReady();
+        checkEnlistPossibility();
 
         enlistPartitionLock.readLock().lock();
 
         try {
-            checkEnlistReady();
+            checkEnlistPossibility();
 
             return enlisted.computeIfAbsent(tablePartitionId, k -> 
nodeAndTerm);
         } finally {
@@ -111,25 +112,36 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /**
      * Checks that this transaction was not finished and will be able to 
enlist another partition.
      */
-    private void checkEnlistReady() {
-        if (isFinalState(state())) {
+    private void checkEnlistPossibility() {
+        if (hasTxFinalizationBegun()) {
             throw new TransactionException(
                     TX_FAILED_READ_WRITE_OPERATION_ERR,
                     format("Transaction is already finished [id={}, 
state={}].", id(), state()));
         }
     }
 
+    /**
+     * Checks the transaction state and makes a decision depends on it.
+     *
+     * @return True when the transaction started to finalize, false otherwise.
+     */
+    private boolean hasTxFinalizationBegun() {
+        return isFinalState(state()) || state() == FINISHING;
+    }
+
     /** {@inheritDoc} */
     @Override
     protected CompletableFuture<Void> finish(boolean commit) {
-        if (isFinalState(state())) {
+        if (hasTxFinalizationBegun()) {
             return finishFuture;
         }
 
         enlistPartitionLock.writeLock().lock();
 
         try {
-            if (!isFinalState(state())) {
+            if (!hasTxFinalizationBegun()) {
+                assert finishFuture == null : "Transaction is already finished 
[id=" + id() + ", state=" + state() + "].";
+
                 finishFuture = finishInternal(commit);
             }
 
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 00cc9fd300..839de54bb7 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -331,37 +331,28 @@ public class TxManagerImpl implements TxManager, 
NetworkMessageHandler {
         TxStateMetaFinishing finishingStateMeta = (TxStateMetaFinishing) 
updateTxMeta(txId, old ->
                 new TxStateMetaFinishing(localNodeId, old == null ? null : 
old.commitPartitionId()));
 
-        AtomicBoolean performingFinish = new AtomicBoolean();
         TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
             if (tuple0 == null) {
                 tuple0 = new TxContext(); // No writes enlisted.
             }
 
-            if (!tuple0.isTxFinishing()) {
-                tuple0.finishTx();
+            assert !tuple0.isTxFinishing() : "Transaction is already finished 
[id=" + uuid + "].";
 
-                performingFinish.set(true);
-            }
+            tuple0.finishTx();
 
             return tuple0;
         });
 
-        // This is a finishing thread.
-        if (performingFinish.get()) {
-            // Wait for commit acks first, then proceed with the finish 
request.
-            return tuple.performFinish(commit, ignored ->
-                    prepareFinish(
-                            observableTimestampTracker,
-                            commitPartition,
-                            commit,
-                            enlistedGroups,
-                            txId,
-                            finishingStateMeta.txFinishFuture()
-                    ));
-        }
-        // The method `performFinish` above has a side effect on 
`finishInProgressFuture` future,
-        // it kicks off another future that will complete it.
-        return tuple.finishInProgressFuture;
+        // Wait for commit acks first, then proceed with the finish request.
+        return tuple.performFinish(commit, ignored ->
+                prepareFinish(
+                        observableTimestampTracker,
+                        commitPartition,
+                        commit,
+                        enlistedGroups,
+                        txId,
+                        finishingStateMeta.txFinishFuture()
+                ));
     }
 
     private CompletableFuture<Void> prepareFinish(
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index ff2f882a85..aaf5d89dc0 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -17,36 +17,146 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 
+import java.util.HashSet;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
 class ReadWriteTransactionImplTest extends BaseIgniteAbstractTest {
+    private static final ClusterNode CLUSTER_NODE = new ClusterNodeImpl(
+            "test-node-id",
+            "test-node",
+            new NetworkAddress("localhost", 1234)
+    );
+
+    private static final IgniteBiTuple NODE_AND_TOKEN = new 
IgniteBiTuple(CLUSTER_NODE, 0L);
+
+    private static final int TABLE_ID = 1;
+
+    /** Transaction commit partition id. */
+    public static final TablePartitionId TX_COMMIT_PART = new 
TablePartitionId(TABLE_ID, 0);
+
     @Mock
     private TxManager txManager;
 
     private final HybridClock clock = new HybridClockImpl();
 
+    /** The state is assigned to the transaction after a finalize method 
(commit or rollback) is called. */
+    private TxState txState = null;
+
     @Test
-    void effectiveSchemaTimestampIsBeginTimestamp() {
+    public void effectiveSchemaTimestampIsBeginTimestamp() {
         HybridTimestamp beginTs = clock.now();
+
         UUID txId = 
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
 
         var tx = new ReadWriteTransactionImpl(txManager, new 
HybridTimestampTracker(), txId);
 
         assertThat(tx.startTimestamp(), is(beginTs));
     }
+
+    /**
+     * Starts a transaction, enlists some partitions, finalizes the 
transaction, and again tries to enlist.
+     *
+     * @param commit True for committing the transaction, false for rolling 
back.
+     */
+    private void startTxAndTryToEnlist(boolean commit) {
+        HashSet<UUID> finishedTxs = new HashSet<>();
+
+        Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(), 
any())).thenAnswer(invocation -> {
+            finishedTxs.add(invocation.getArgument(4));
+
+            return completedFuture(null);
+        });
+
+        Mockito.when(txManager.stateMeta(any())).thenAnswer(invocation -> {
+            if (finishedTxs.contains(invocation.getArgument(0))) {
+                return new TxStateMeta(txState, "crd-id", TX_COMMIT_PART, 
null);
+            }
+
+            return null;
+        });
+
+        HybridTimestamp beginTs = clock.now();
+
+        UUID txId = 
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
+
+        var tx = new ReadWriteTransactionImpl(txManager, new 
HybridTimestampTracker(), txId);
+
+        tx.assignCommitPartition(TX_COMMIT_PART);
+
+        tx.enlist(new TablePartitionId(TABLE_ID, 0), NODE_AND_TOKEN);
+        tx.enlist(new TablePartitionId(TABLE_ID, 2), NODE_AND_TOKEN);
+
+        if (commit) {
+            if (txState == null) {
+                txState = COMMITED;
+            }
+
+            tx.commit();
+        } else {
+            if (txState == null) {
+                txState = ABORTED;
+            }
+
+            tx.rollback();
+        }
+
+        TransactionException ex = assertThrows(TransactionException.class,
+                () -> tx.enlist(new TablePartitionId(TABLE_ID, 5), 
NODE_AND_TOKEN));
+
+        assertTrue(ex.getMessage().contains(txState.toString()));
+
+        ex = assertThrows(TransactionException.class, () -> tx.enlist(new 
TablePartitionId(TABLE_ID, 0), NODE_AND_TOKEN));
+
+        assertTrue(ex.getMessage().contains(txState.toString()));
+    }
+
+    @Test
+    void testEnlistOnCommit() {
+        startTxAndTryToEnlist(true);
+
+        txState = FINISHING;
+
+        startTxAndTryToEnlist(true);
+    }
+
+    @Test
+    void testEnlistOnAbort() {
+        startTxAndTryToEnlist(false);
+
+        txState = FINISHING;
+
+        startTxAndTryToEnlist(false);
+    }
 }

Reply via email to