This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 10e94521c0 IGNITE-20534 Transaction cannot enlist a partition after
being fixed (#2757)
10e94521c0 is described below
commit 10e94521c0f3d0fcb30d9ee54b4874cdd007b3b4
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Oct 27 20:35:26 2023 +0300
IGNITE-20534 Transaction cannot enlist a partition after being fixed (#2757)
---
.../apache/ignite/jdbc/ItJdbcTransactionTest.java | 2 -
.../ignite/internal/sql/api/ItCommonApiTest.java | 58 --------------------
.../ignite/internal/sql/api/ItSqlApiBaseTest.java | 10 ++--
.../sql/api/ItSqlClientAsynchronousApiTest.java | 6 +++
.../sql/api/ItSqlClientSynchronousApiTest.java | 6 +++
.../distributed/storage/InternalTableImpl.java | 28 ++++------
.../internal/tx/impl/ReadWriteTransactionImpl.java | 61 +++++++++++++++++++++-
7 files changed, 88 insertions(+), 83 deletions(-)
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
index f0165f9998..58c6888741 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
@@ -29,7 +29,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -246,7 +245,6 @@ public class ItJdbcTransactionTest extends
AbstractJdbcSelfTest {
/**
* Ensure that explicit transaction can not be used, after it encounters
an error.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534")
@ParameterizedTest
@CsvSource({
// dml or not | SQL statement
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 347b9c9e96..07dfa90e1b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -34,13 +34,8 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
-import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
@@ -48,7 +43,6 @@ import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
-import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -150,58 +144,6 @@ public class ItCommonApiTest extends
BaseSqlIntegrationTest {
}
}
- /** Check transaction change status with erroneous statements. */
- @Test
- // TODO should be removed after
https://issues.apache.org/jira/browse/IGNITE-20534 is fixed.
- public void testTxStateChangedOnErroneousOp() {
- sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
- TxManager txManager = txManager();
-
- SqlSchemaManager oldManager =
- (SqlSchemaManager)
IgniteTestUtils.getFieldValue(queryProcessor(), SqlQueryProcessor.class,
"sqlSchemaManager");
-
- Transaction tx = CLUSTER.aliveNode().transactions().begin();
-
- try {
- sql(tx, "INSERT INTO PUBLIC.TEST VALUES(1, 1)");
- sql(tx, "INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
- } catch (Throwable ignore) {
- // No op.
- }
-
- assertEquals(0, txManager.pending());
- InternalTransaction tx0 = (InternalTransaction) tx;
- assertEquals(TxState.ABORTED, tx0.state());
-
- tx.rollback();
- assertEquals(0, txManager.pending());
-
- sql("INSERT INTO TEST VALUES(1, 1)");
- assertEquals(0, txManager.pending());
-
- var schemaManager = new ErroneousSchemaManager();
-
- // TODO: refactor after
https://issues.apache.org/jira/browse/IGNITE-17694
- IgniteTestUtils.setFieldValue(queryProcessor(), "sqlSchemaManager",
schemaManager);
-
- try {
- sql("SELECT a FROM NOTEXIST.TEST");
- } catch (Throwable ignore) {
- // No op.
- }
-
- try {
- sql("INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
- } catch (Throwable ignore) {
- // No op.
- }
-
- assertEquals(0, txManager.pending());
-
- IgniteTestUtils.setFieldValue(queryProcessor(), "sqlSchemaManager",
oldManager);
- }
-
private static class ErroneousSchemaManager implements SqlSchemaManager {
/** {@inheritDoc} */
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index b6cfb185b7..a6d36e72b3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -62,7 +62,6 @@ import org.apache.ignite.tx.TransactionOptions;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -724,7 +723,6 @@ public abstract class ItSqlApiBaseTest extends
BaseSqlIntegrationTest {
}
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534")
@Test
public void testLockIsNotReleasedAfterTxRollback() {
IgniteSql sql = igniteSql();
@@ -737,8 +735,14 @@ public abstract class ItSqlApiBaseTest extends
BaseSqlIntegrationTest {
Transaction tx = igniteTx().begin();
assertThrows(RuntimeException.class, () -> execute(tx, session,
"SELECT 1/0"));
+
tx.rollback();
- session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
+
+ assertThrowsSqlException(
+ Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
+ "Transaction is already finished",
+ () -> session.execute(tx, "INSERT INTO tst VALUES (1, 1)")
+ );
}
try (Session session = sql.createSession()) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index b04024f862..9e6a097ff2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -64,4 +64,10 @@ public class ItSqlClientAsynchronousApiTest extends
ItSqlAsynchronousApiTest {
public void checkTransactionsWithDml() {
super.checkTransactionsWithDml();
}
+
+ @Override
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20742")
+ public void testLockIsNotReleasedAfterTxRollback() {
+ super.testLockIsNotReleasedAfterTxRollback();
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 045c745535..8ce27d6dd5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -66,4 +66,10 @@ public class ItSqlClientSynchronousApiTest extends
ItSqlSynchronousApiTest {
public void checkTransactionsWithDml() {
super.checkTransactionsWithDml();
}
+
+ @Override
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20742")
+ public void testLockIsNotReleasedAfterTxRollback() {
+ super.testLockIsNotReleasedAfterTxRollback();
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ab34b3265a..5a1eae715b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,7 @@ import static
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL;
import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
import static
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
@@ -70,7 +71,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgnitePentaFunction;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -107,7 +107,6 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -328,12 +327,6 @@ public class InternalTableImpl implements InternalTable {
);
}
- // It's possible to have null txState if transaction isn't started yet.
- if (tx != null && !(tx.state() == TxState.PENDING || tx.state() ==
null)) {
- return failedFuture(new TransactionException(
- "The operation is attempted for completed transaction"));
- }
-
boolean implicit = tx == null;
InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
@@ -524,9 +517,9 @@ public class InternalTableImpl implements InternalTable {
// Track only write requests from explicit transactions.
if (!txManager.addInflight(tx.id())) {
return failedFuture(
- new TransactionException(TX_UNEXPECTED_STATE_ERR,
IgniteStringFormatter.format(
+ new TransactionException(TX_UNEXPECTED_STATE_ERR,
format(
"Failed to enlist a write operation into a
transaction, tx is locked for updates "
- + "[tableName={}, partId={},
txState={}]",
+ + "[tableName={}, partId={},
txState={}].",
tableName,
partId,
tx.state()
@@ -629,8 +622,8 @@ public class InternalTableImpl implements InternalTable {
} catch (Throwable e) {
throw new TransactionException(
INTERNAL_ERR,
- IgniteStringFormatter.format(
- "Failed to invoke the replica request
[tableName={}, partId={}]",
+ format(
+ "Failed to invoke the replica request
[tableName={}, partId={}].",
tableName,
partId
),
@@ -680,8 +673,8 @@ public class InternalTableImpl implements InternalTable {
} catch (Throwable e) {
throw new TransactionException(
INTERNAL_ERR,
- IgniteStringFormatter.format(
- "Failed to invoke the replica request
[tableName={}, partId={}]",
+ format(
+ "Failed to invoke the replica request
[tableName={}, partId={}].",
tableName,
partId
),
@@ -1451,7 +1444,7 @@ public class InternalTableImpl implements InternalTable {
private void validatePartitionIndex(int p) {
if (p < 0 || p >= partitions) {
throw new IllegalArgumentException(
- IgniteStringFormatter.format(
+ format(
"Invalid partition [partition={}, minValue={},
maxValue={}].",
p,
0,
@@ -1829,9 +1822,8 @@ public class InternalTableImpl implements InternalTable {
if (n <= 0) {
cancel(null, true);
- subscriber.onError(new
IllegalArgumentException(IgniteStringFormatter
- .format("Invalid requested amount of items
[requested={}, minValue=1]", n))
- );
+ subscriber.onError(new IllegalArgumentException(
+ format("Invalid requested amount of items
[requested={}, minValue=1].", n)));
}
if (canceled.get()) {
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index e0097627be..dc1e769c05 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.tx.impl;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
+
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
@@ -24,6 +28,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -35,6 +40,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
/**
* The read-write implementation of an internal transaction.
@@ -56,6 +62,12 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** A partition which stores the transaction state. */
private volatile TablePartitionId commitPart;
+ /** The lock protects the transaction topology from concurrent
modification during finishing. */
+ private final ReentrantReadWriteLock enlistPartitionLock = new
ReentrantReadWriteLock();
+
+ /** The future is initialized when this transaction starts committing or
rolling back and is finished together with the transaction. */
+ private CompletableFuture<Void> finishFuture;
+
/**
* Constructs an explicit read-write transaction.
*
@@ -90,12 +102,57 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** {@inheritDoc} */
@Override
public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
- return enlisted.computeIfAbsent(tablePartitionId, k -> nodeAndTerm);
+ checkEnlistReady();
+
+ try {
+ enlistPartitionLock.readLock().lock();
+
+ checkEnlistReady();
+
+ return enlisted.computeIfAbsent(tablePartitionId, k ->
nodeAndTerm);
+ } finally {
+ enlistPartitionLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Checks that this transaction was not finished and will be able to
enlist another partition.
+ */
+ private void checkEnlistReady() {
+ if (isFinalState(state())) {
+ throw new TransactionException(
+ TX_FAILED_READ_WRITE_OPERATION_ERR,
+ format("Transaction is already finished [id={},
state={}].", id(), state()));
+ }
}
/** {@inheritDoc} */
@Override
protected CompletableFuture<Void> finish(boolean commit) {
+ if (isFinalState(state())) {
+ return finishFuture;
+ }
+
+ try {
+ enlistPartitionLock.writeLock().lock();
+
+ if (!isFinalState(state())) {
+ finishFuture = finishInternal(commit);
+ }
+
+ return finishFuture;
+ } finally {
+ enlistPartitionLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Internal method for finishing this transaction.
+ *
+ * @param commit {@code true} to commit, false to rollback.
+ * @return The future of transaction completion.
+ */
+ private CompletableFuture<Void> finishInternal(boolean commit) {
if (!enlisted.isEmpty()) {
Map<TablePartitionId, Long> enlistedGroups =
enlisted.entrySet().stream()
.collect(Collectors.toMap(
@@ -108,7 +165,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
ClusterNode recipientNode = nodeAndTerm.get1();
Long term = nodeAndTerm.get2();
- LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={},
groups={}",
+ LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={},
groups={}].",
recipientNode, term, commit, id(), enlistedGroups);
assert recipientNode != null;