This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1801e58cc4 IGNITE-19663 Add possibility to check transaction state
(#2279)
1801e58cc4 is described below
commit 1801e58cc4c79548a527f4812b81f8d52a60141f
Author: Alexander Lapin <[email protected]>
AuthorDate: Wed Jul 5 18:10:02 2023 +0300
IGNITE-19663 Add possibility to check transaction state (#2279)
---
.../ignite/internal/sql/api/ItCommonApiTest.java | 12 ++--
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 2 +-
.../internal/sql/api/ItSqlSynchronousApiTest.java | 8 +--
.../org/apache/ignite/internal/app/IgniteImpl.java | 10 ++++
.../distributed/storage/InternalTableImpl.java | 4 +-
.../RepeatedFinishReadWriteTransactionTest.java | 9 ++-
.../ignite/internal/table/TxAbstractTest.java | 2 +
.../org/apache/ignite/internal/tx/TxManager.java | 11 +++-
.../org/apache/ignite/internal/tx/TxState.java | 1 +
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 5 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 6 ++
.../ignite/internal/tx/impl/TxManagerImpl.java | 22 +++++--
.../apache/ignite/internal/tx/TxManagerTest.java | 67 +++++++++++++++++++++-
13 files changed, 137 insertions(+), 22 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 326177814c..caa845dfe0 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.lang.ErrorGroups.Sql;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.Instant;
@@ -47,6 +46,7 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
@@ -184,7 +184,7 @@ public class ItCommonApiTest extends
ClusterPerClassIntegrationTest {
public void testTxStateChangedOnErroneousOp() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- // TODO: need to be refactored after
https://issues.apache.org/jira/browse/IGNITE-19663
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19916 need to be
refactored
TxManager txManagerInternal =
(TxManager)
IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class,
"txManager");
@@ -206,14 +206,17 @@ public class ItCommonApiTest extends
ClusterPerClassIntegrationTest {
}
assertEquals(0, txManagerInternal.finished() - txPrevCnt);
+ assertEquals(1, txManagerInternal.pending());
InternalTransaction tx0 = (InternalTransaction) tx;
- assertNull(tx0.state());
+ assertEquals(TxState.PENDING, tx0.state());
tx.rollback();
assertEquals(1, txManagerInternal.finished() - txPrevCnt);
+ assertEquals(0, txManagerInternal.pending());
sql("INSERT INTO TEST VALUES(1, 1)");
assertEquals(2, txManagerInternal.finished() - txPrevCnt);
+ assertEquals(0, txManagerInternal.pending());
var schemaManager = new ErroneousSchemaManager();
@@ -232,7 +235,8 @@ public class ItCommonApiTest extends
ClusterPerClassIntegrationTest {
// No op.
}
- assertEquals(2, txManagerInternal.finished() - txPrevCnt);
+ assertEquals(4, txManagerInternal.finished() - txPrevCnt);
+ assertEquals(0, txManagerInternal.pending());
IgniteTestUtils.setFieldValue(queryProc, "sqlSchemaManager",
oldManager);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 1843098c3c..c2c0b0a656 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -383,7 +383,7 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
checkDml(2 * ROW_COUNT, ses, "DELETE FROM TEST WHERE VAL0 >= 0");
- assertEquals(ROW_COUNT + 1 + 1 + 1 + 1 + 1,
txManagerInternal.finished() - txPrevCnt);
+ assertEquals(ROW_COUNT + 1 + 1 + 1 + 1 + 1 + 1,
txManagerInternal.finished() - txPrevCnt);
var states = (Map<UUID, TxState>)
IgniteTestUtils.getFieldValue(txManagerInternal, TxManagerImpl.class, "states");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index 13533f4825..76ab07047a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -279,6 +280,7 @@ public class ItSqlSynchronousApiTest extends
ClusterPerClassIntegrationTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19919")
public void errors() throws InterruptedException {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
for (int i = 0; i < ROW_COUNT; ++i) {
@@ -364,11 +366,7 @@ public class ItSqlSynchronousApiTest extends
ClusterPerClassIntegrationTest {
assertEquals(1, sql("SELECT ID FROM TEST WHERE ID = -1").size());
}
- TxManager txManagerInternal = (TxManager)
IgniteTestUtils.getFieldValue(CLUSTER_NODES.get(0), IgniteImpl.class,
"txManager");
-
- var states = (Map<UUID, TxState>)
IgniteTestUtils.getFieldValue(txManagerInternal, TxManagerImpl.class, "states");
-
- assertEquals(txManagerInternal.finished(), states.size());
+ assertEquals(0, ((IgniteImpl)
CLUSTER_NODES.get(0)).txManager().pending());
}
@Test
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0b015acd6a..5b984d2642 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1094,4 +1094,14 @@ public class IgniteImpl implements Ignite {
public HybridClock clock() {
return clock;
}
+
+ /**
+ * Returns the node's transaction manager.
+ *
+ * @return Transaction manager.
+ */
+ @TestOnly
+ public TxManager txManager() {
+ return txManager;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index d50e41233c..d2c382a9e0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -82,6 +82,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -301,7 +302,8 @@ public class InternalTableImpl implements InternalTable {
final boolean implicit = tx == null;
- if (!implicit && tx.state() != null) {
+ // It's possible to have null txState if transaction isn't started yet.
+ if (!implicit && !(tx.state() == TxState.PENDING || tx.state() ==
null)) {
return failedFuture(new TransactionException(
"The operation is attempted for completed transaction"));
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
index 6953346121..615cc5a1a3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
@@ -246,8 +246,8 @@ public class RepeatedFinishReadWriteTransactionTest {
}
@Override
- public boolean changeState(UUID txId, @Nullable TxState before,
TxState after) {
- return false;
+ public void changeState(UUID txId, @Nullable TxState before, TxState
after) {
+ // No-op.
}
@Override
@@ -281,6 +281,11 @@ public class RepeatedFinishReadWriteTransactionTest {
return 0;
}
+ @Override
+ public int pending() {
+ return 0;
+ }
+
@Override
public void start() {
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 32e3185a66..5457661d54 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -295,6 +295,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertEquals(BALANCE_2 + DELTA, view.get(null,
makeKey(2)).doubleValue("balance"));
assertEquals(5, clientTxManager().finished());
+ assertEquals(0, clientTxManager().pending());
}
/**
@@ -319,6 +320,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null,
makeKey(2)).doubleValue("balance"));
assertEquals(5, clientTxManager().finished());
+ assertEquals(0, clientTxManager().pending());
}
/**
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 3f00db332a..c6b759fbdf 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -69,11 +69,10 @@ public interface TxManager extends IgniteComponent {
* @param txId Transaction id.
* @param before Before state.
* @param after After state.
- * @return {@code True} if a state was changed.
*/
// TODO: IGNITE-17638 TestOnly code, let's consider using Txn state map
instead of states.
@Deprecated
- boolean changeState(UUID txId, @Nullable TxState before, TxState after);
+ void changeState(UUID txId, @Nullable TxState before, TxState after);
/**
* Returns lock manager.
@@ -129,6 +128,14 @@ public interface TxManager extends IgniteComponent {
@TestOnly
int finished();
+ /**
+ * Returns a number of pending transactions, that is, transactions that
have not yet been committed or rolled back.
+ *
+ * @return A number of pending transactions.
+ */
+ @TestOnly
+ int pending();
+
/**
* Updates the low watermark, the value is expected to only increase.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
index ae4ff4ff20..2321798ff7 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxState.java
@@ -21,6 +21,7 @@ package org.apache.ignite.internal.tx;
* Transaction state.
*/
public enum TxState {
+ PENDING,
ABORTED,
COMMITED;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 49ecc608e4..bd1ab4cce7 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -98,6 +100,7 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
return completedFuture(null);
}
- return ((TxManagerImpl)
txManager).completeReadOnlyTransactionFuture(new
TxIdAndTimestamp(readTimestamp, id()));
+ return ((TxManagerImpl) txManager).completeReadOnlyTransactionFuture(
+ new TxIdAndTimestamp(readTimestamp, id())).thenRun(() ->
txManager.changeState(id(), PENDING, COMMITED));
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 06cfc39d00..6785b0c44b 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -18,6 +18,9 @@
package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@@ -139,6 +142,9 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
id()
);
} else {
+ // TODO: IGNITE-17638 TestOnly code, let's
consider using Txn state map instead of states.
+ txManager.changeState(id(), PENDING, commit ?
COMMITED : ABORTED);
+
return completedFuture(null);
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 03af7753b0..93891f3335 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.tx.impl;
import static java.util.concurrent.CompletableFuture.allOf;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.tx.TxState.PENDING;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_READ_ONLY_TOO_OLD_ERR;
import java.util.Comparator;
@@ -116,6 +119,7 @@ public class TxManagerImpl implements TxManager {
public InternalTransaction begin(boolean readOnly) {
HybridTimestamp beginTimestamp = clock.now();
UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp);
+ changeState(txId, null, PENDING);
if (!readOnly) {
return new ReadWriteTransactionImpl(this, txId);
@@ -154,14 +158,17 @@ public class TxManagerImpl implements TxManager {
}
@Override
- public boolean changeState(UUID txId, TxState before, TxState after) {
- return states.compute(txId, (k, v) -> {
+ public void changeState(UUID txId, TxState before, TxState after) {
+ TxState computeResult = states.compute(txId, (k, v) -> {
if (v == before) {
return after;
} else {
return v;
}
- }) == after;
+ });
+
+ assert computeResult == after : "Unable to change transaction state,
expected = [" + before + "],"
+ + " got = [" + computeResult + "], state to set = [" + after +
']';
}
@Override
@@ -189,7 +196,7 @@ public class TxManagerImpl implements TxManager {
return replicaService.invoke(recipientNode, req)
// TODO: IGNITE-17638 TestOnly code, let's consider using Txn
state map instead of states.
- .thenRun(() -> changeState(txId, null, commit ?
TxState.COMMITED : TxState.ABORTED));
+ .thenRun(() -> changeState(txId, PENDING, commit ? COMMITED :
ABORTED));
}
@Override
@@ -222,7 +229,12 @@ public class TxManagerImpl implements TxManager {
@Override
public int finished() {
- return (int) states.entrySet().stream().filter(e -> e.getValue() ==
TxState.COMMITED || e.getValue() == TxState.ABORTED).count();
+ return (int) states.entrySet().stream().filter(e -> e.getValue() ==
COMMITED || e.getValue() == ABORTED).count();
+ }
+
+ @Override
+ public int pending() {
+ return (int) states.entrySet().stream().filter(e -> e.getValue() ==
PENDING).count();
}
@Override
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 8017646882..ceffd3ae65 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -48,9 +48,12 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -73,7 +76,7 @@ public class TxManagerTest extends IgniteAbstractTest {
/** Init test callback. */
@BeforeEach
- public void before() {
+ public void setup() {
clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
when(clusterService.topologyService().localMember().address()).thenReturn(ADDR);
@@ -81,6 +84,14 @@ public class TxManagerTest extends IgniteAbstractTest {
replicaService = mock(ReplicaService.class, RETURNS_DEEP_STUBS);
txManager = new TxManagerImpl(replicaService, new HeapLockManager(),
clock, new TransactionIdGenerator(0xdeadbeef));
+
+ txManager.start();
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ txManager.beforeNodeStop();
+ txManager.stop();
}
@Test
@@ -165,4 +176,58 @@ public class TxManagerTest extends IgniteAbstractTest {
assertThat(txManager.updateLowWatermark(clock.now()),
willSucceedFast());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testTestOnlyPendingCommit(boolean startReadOnlyTransaction) {
+ assertEquals(0, txManager.pending());
+ assertEquals(0, txManager.finished());
+
+ // Start transaction.
+ InternalTransaction tx = txManager.begin(startReadOnlyTransaction);
+ assertEquals(1, txManager.pending());
+ assertEquals(0, txManager.finished());
+
+ // Commit transaction.
+ tx.commit();
+ assertEquals(0, txManager.pending());
+ assertEquals(1, txManager.finished());
+
+ // Check that tx.commit() is idempotent within the scope of
txManager.pending() and txManager.finished()
+ tx.commit();
+ assertEquals(0, txManager.pending());
+ assertEquals(1, txManager.finished());
+
+ // Check that tx.rollback() after tx.commit() won't effect
txManager.pending() and txManager.finished()
+ tx.rollback();
+ assertEquals(0, txManager.pending());
+ assertEquals(1, txManager.finished());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testTestOnlyPendingRollback(boolean startReadOnlyTransaction) {
+ assertEquals(0, txManager.pending());
+ assertEquals(0, txManager.finished());
+
+ // Start transaction.
+ InternalTransaction tx = txManager.begin(startReadOnlyTransaction);
+ assertEquals(1, txManager.pending());
+ assertEquals(0, txManager.finished());
+
+ // Rollback transaction.
+ tx.rollback();
+ assertEquals(0, txManager.pending());
+ assertEquals(1, txManager.finished());
+
+ // Check that tx.rollback() is idempotent within the scope of
txManager.pending() and txManager.finished()
+ tx.rollback();
+ assertEquals(0, txManager.pending());
+ assertEquals(1, txManager.finished());
+
+ // Check that tx.commit() after tx.rollback() won't effect
txManager.pending() and txManager.finished()
+ tx.commit();
+ assertEquals(0, txManager.pending());
+ assertEquals(1, txManager.finished());
+ }
}