This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0b3d994d8d IGNITE-20779 Transaction operation might throw no exception
on enlist… (#2868)
0b3d994d8d is described below
commit 0b3d994d8d5a7afb0d676e084175afd0649f7f16
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Thu Nov 23 21:05:06 2023 +0300
IGNITE-20779 Transaction operation might throw no exception on enlist…
(#2868)
---
.../internal/tx/impl/ReadWriteTransactionImpl.java | 24 +++--
.../ignite/internal/tx/impl/TxManagerImpl.java | 33 +++---
.../tx/impl/ReadWriteTransactionImplTest.java | 112 ++++++++++++++++++++-
3 files changed, 141 insertions(+), 28 deletions(-)
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index a60f013a4d..2d64411a78 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx.impl;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.apache.ignite.internal.tx.TxState.isFinalState;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
@@ -95,12 +96,12 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** {@inheritDoc} */
@Override
public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
- checkEnlistReady();
+ checkEnlistPossibility();
enlistPartitionLock.readLock().lock();
try {
- checkEnlistReady();
+ checkEnlistPossibility();
return enlisted.computeIfAbsent(tablePartitionId, k ->
nodeAndTerm);
} finally {
@@ -111,25 +112,36 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/**
* Checks that this transaction was not finished and will be able to
enlist another partition.
*/
- private void checkEnlistReady() {
- if (isFinalState(state())) {
+ private void checkEnlistPossibility() {
+ if (hasTxFinalizationBegun()) {
throw new TransactionException(
TX_FAILED_READ_WRITE_OPERATION_ERR,
format("Transaction is already finished [id={},
state={}].", id(), state()));
}
}
+ /**
+ * Checks the transaction state and makes a decision depends on it.
+ *
+ * @return True when the transaction started to finalize, false otherwise.
+ */
+ private boolean hasTxFinalizationBegun() {
+ return isFinalState(state()) || state() == FINISHING;
+ }
+
/** {@inheritDoc} */
@Override
protected CompletableFuture<Void> finish(boolean commit) {
- if (isFinalState(state())) {
+ if (hasTxFinalizationBegun()) {
return finishFuture;
}
enlistPartitionLock.writeLock().lock();
try {
- if (!isFinalState(state())) {
+ if (!hasTxFinalizationBegun()) {
+ assert finishFuture == null : "Transaction is already finished
[id=" + id() + ", state=" + state() + "].";
+
finishFuture = finishInternal(commit);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 00cc9fd300..839de54bb7 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -331,37 +331,28 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
TxStateMetaFinishing finishingStateMeta = (TxStateMetaFinishing)
updateTxMeta(txId, old ->
new TxStateMetaFinishing(localNodeId, old == null ? null :
old.commitPartitionId()));
- AtomicBoolean performingFinish = new AtomicBoolean();
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
tuple0 = new TxContext(); // No writes enlisted.
}
- if (!tuple0.isTxFinishing()) {
- tuple0.finishTx();
+ assert !tuple0.isTxFinishing() : "Transaction is already finished
[id=" + uuid + "].";
- performingFinish.set(true);
- }
+ tuple0.finishTx();
return tuple0;
});
- // This is a finishing thread.
- if (performingFinish.get()) {
- // Wait for commit acks first, then proceed with the finish
request.
- return tuple.performFinish(commit, ignored ->
- prepareFinish(
- observableTimestampTracker,
- commitPartition,
- commit,
- enlistedGroups,
- txId,
- finishingStateMeta.txFinishFuture()
- ));
- }
- // The method `performFinish` above has a side effect on
`finishInProgressFuture` future,
- // it kicks off another future that will complete it.
- return tuple.finishInProgressFuture;
+ // Wait for commit acks first, then proceed with the finish request.
+ return tuple.performFinish(commit, ignored ->
+ prepareFinish(
+ observableTimestampTracker,
+ commitPartition,
+ commit,
+ enlistedGroups,
+ txId,
+ finishingStateMeta.txFinishFuture()
+ ));
}
private CompletableFuture<Void> prepareFinish(
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index ff2f882a85..aaf5d89dc0 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -17,36 +17,146 @@
package org.apache.ignite.internal.tx.impl;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import java.util.HashSet;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class ReadWriteTransactionImplTest extends BaseIgniteAbstractTest {
+ private static final ClusterNode CLUSTER_NODE = new ClusterNodeImpl(
+ "test-node-id",
+ "test-node",
+ new NetworkAddress("localhost", 1234)
+ );
+
+ private static final IgniteBiTuple NODE_AND_TOKEN = new
IgniteBiTuple(CLUSTER_NODE, 0L);
+
+ private static final int TABLE_ID = 1;
+
+ /** Transaction commit partition id. */
+ public static final TablePartitionId TX_COMMIT_PART = new
TablePartitionId(TABLE_ID, 0);
+
@Mock
private TxManager txManager;
private final HybridClock clock = new HybridClockImpl();
+ /** The state is assigned to the transaction after a finalize method
(commit or rollback) is called. */
+ private TxState txState = null;
+
@Test
- void effectiveSchemaTimestampIsBeginTimestamp() {
+ public void effectiveSchemaTimestampIsBeginTimestamp() {
HybridTimestamp beginTs = clock.now();
+
UUID txId =
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
var tx = new ReadWriteTransactionImpl(txManager, new
HybridTimestampTracker(), txId);
assertThat(tx.startTimestamp(), is(beginTs));
}
+
+ /**
+ * Starts a transaction, enlists some partitions, finalizes the
transaction, and again tries to enlist.
+ *
+ * @param commit True for committing the transaction, false for rolling
back.
+ */
+ private void startTxAndTryToEnlist(boolean commit) {
+ HashSet<UUID> finishedTxs = new HashSet<>();
+
+ Mockito.when(txManager.finish(any(), any(), anyBoolean(), any(),
any())).thenAnswer(invocation -> {
+ finishedTxs.add(invocation.getArgument(4));
+
+ return completedFuture(null);
+ });
+
+ Mockito.when(txManager.stateMeta(any())).thenAnswer(invocation -> {
+ if (finishedTxs.contains(invocation.getArgument(0))) {
+ return new TxStateMeta(txState, "crd-id", TX_COMMIT_PART,
null);
+ }
+
+ return null;
+ });
+
+ HybridTimestamp beginTs = clock.now();
+
+ UUID txId =
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
+
+ var tx = new ReadWriteTransactionImpl(txManager, new
HybridTimestampTracker(), txId);
+
+ tx.assignCommitPartition(TX_COMMIT_PART);
+
+ tx.enlist(new TablePartitionId(TABLE_ID, 0), NODE_AND_TOKEN);
+ tx.enlist(new TablePartitionId(TABLE_ID, 2), NODE_AND_TOKEN);
+
+ if (commit) {
+ if (txState == null) {
+ txState = COMMITED;
+ }
+
+ tx.commit();
+ } else {
+ if (txState == null) {
+ txState = ABORTED;
+ }
+
+ tx.rollback();
+ }
+
+ TransactionException ex = assertThrows(TransactionException.class,
+ () -> tx.enlist(new TablePartitionId(TABLE_ID, 5),
NODE_AND_TOKEN));
+
+ assertTrue(ex.getMessage().contains(txState.toString()));
+
+ ex = assertThrows(TransactionException.class, () -> tx.enlist(new
TablePartitionId(TABLE_ID, 0), NODE_AND_TOKEN));
+
+ assertTrue(ex.getMessage().contains(txState.toString()));
+ }
+
+ @Test
+ void testEnlistOnCommit() {
+ startTxAndTryToEnlist(true);
+
+ txState = FINISHING;
+
+ startTxAndTryToEnlist(true);
+ }
+
+ @Test
+ void testEnlistOnAbort() {
+ startTxAndTryToEnlist(false);
+
+ txState = FINISHING;
+
+ startTxAndTryToEnlist(false);
+ }
}