This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 10e94521c0 IGNITE-20534 Transaction cannot enlist a partition after 
being fixed (#2757)
10e94521c0 is described below

commit 10e94521c0f3d0fcb30d9ee54b4874cdd007b3b4
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Oct 27 20:35:26 2023 +0300

    IGNITE-20534 Transaction cannot enlist a partition after being fixed (#2757)
---
 .../apache/ignite/jdbc/ItJdbcTransactionTest.java  |  2 -
 .../ignite/internal/sql/api/ItCommonApiTest.java   | 58 --------------------
 .../ignite/internal/sql/api/ItSqlApiBaseTest.java  | 10 ++--
 .../sql/api/ItSqlClientAsynchronousApiTest.java    |  6 +++
 .../sql/api/ItSqlClientSynchronousApiTest.java     |  6 +++
 .../distributed/storage/InternalTableImpl.java     | 28 ++++------
 .../internal/tx/impl/ReadWriteTransactionImpl.java | 61 +++++++++++++++++++++-
 7 files changed, 88 insertions(+), 83 deletions(-)

diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
index f0165f9998..58c6888741 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcTransactionTest.java
@@ -29,7 +29,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -246,7 +245,6 @@ public class ItJdbcTransactionTest extends 
AbstractJdbcSelfTest {
     /**
      * Ensure that explicit transaction can not be used, after it encounters 
an error.
      */
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534";)
     @ParameterizedTest
     @CsvSource({
             // dml or not | SQL statement
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 347b9c9e96..07dfa90e1b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -34,13 +34,8 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
-import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
 import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
@@ -48,7 +43,6 @@ import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
-import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
@@ -150,58 +144,6 @@ public class ItCommonApiTest extends 
BaseSqlIntegrationTest {
         }
     }
 
-    /** Check transaction change status with erroneous statements. */
-    @Test
-    // TODO should be removed after 
https://issues.apache.org/jira/browse/IGNITE-20534  is fixed.
-    public void testTxStateChangedOnErroneousOp() {
-        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
-
-        TxManager txManager = txManager();
-
-        SqlSchemaManager oldManager =
-                (SqlSchemaManager) 
IgniteTestUtils.getFieldValue(queryProcessor(), SqlQueryProcessor.class, 
"sqlSchemaManager");
-
-        Transaction tx = CLUSTER.aliveNode().transactions().begin();
-
-        try {
-            sql(tx, "INSERT INTO PUBLIC.TEST VALUES(1, 1)");
-            sql(tx, "INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
-        } catch (Throwable ignore) {
-            // No op.
-        }
-
-        assertEquals(0, txManager.pending());
-        InternalTransaction tx0 = (InternalTransaction) tx;
-        assertEquals(TxState.ABORTED, tx0.state());
-
-        tx.rollback();
-        assertEquals(0, txManager.pending());
-
-        sql("INSERT INTO TEST VALUES(1, 1)");
-        assertEquals(0, txManager.pending());
-
-        var schemaManager = new ErroneousSchemaManager();
-
-        // TODO: refactor after 
https://issues.apache.org/jira/browse/IGNITE-17694
-        IgniteTestUtils.setFieldValue(queryProcessor(), "sqlSchemaManager", 
schemaManager);
-
-        try {
-            sql("SELECT a FROM NOTEXIST.TEST");
-        } catch (Throwable ignore) {
-            // No op.
-        }
-
-        try {
-            sql("INSERT INTO NOTEXIST.TEST VALUES(1, 1)");
-        } catch (Throwable ignore) {
-            // No op.
-        }
-
-        assertEquals(0, txManager.pending());
-
-        IgniteTestUtils.setFieldValue(queryProcessor(), "sqlSchemaManager", 
oldManager);
-    }
-
     private static class ErroneousSchemaManager implements SqlSchemaManager {
 
         /** {@inheritDoc} */
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index b6cfb185b7..a6d36e72b3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -62,7 +62,6 @@ import org.apache.ignite.tx.TransactionOptions;
 import org.hamcrest.Matcher;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -724,7 +723,6 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
         }
     }
 
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20534";)
     @Test
     public void testLockIsNotReleasedAfterTxRollback() {
         IgniteSql sql = igniteSql();
@@ -737,8 +735,14 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
             Transaction tx = igniteTx().begin();
 
             assertThrows(RuntimeException.class, () -> execute(tx, session, 
"SELECT 1/0"));
+
             tx.rollback();
-            session.execute(tx, "INSERT INTO tst VALUES (1, 1)");
+
+            assertThrowsSqlException(
+                    Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR,
+                    "Transaction is already finished",
+                    () -> session.execute(tx, "INSERT INTO tst VALUES (1, 1)")
+            );
         }
 
         try (Session session = sql.createSession()) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index b04024f862..9e6a097ff2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -64,4 +64,10 @@ public class ItSqlClientAsynchronousApiTest extends 
ItSqlAsynchronousApiTest {
     public void checkTransactionsWithDml() {
         super.checkTransactionsWithDml();
     }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20742";)
+    public void testLockIsNotReleasedAfterTxRollback() {
+        super.testLockIsNotReleasedAfterTxRollback();
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 045c745535..8ce27d6dd5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -66,4 +66,10 @@ public class ItSqlClientSynchronousApiTest extends 
ItSqlSynchronousApiTest {
     public void checkTransactionsWithDml() {
         super.checkTransactionsWithDml();
     }
+
+    @Override
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20742";)
+    public void testLockIsNotReleasedAfterTxRollback() {
+        super.testLockIsNotReleasedAfterTxRollback();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ab34b3265a..5a1eae715b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,7 @@ import static 
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL;
 import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
 import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET_ALL;
@@ -70,7 +71,6 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.IgnitePentaFunction;
-import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.IgniteTriFunction;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
@@ -107,7 +107,6 @@ import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.TxManager;
-import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -328,12 +327,6 @@ public class InternalTableImpl implements InternalTable {
             );
         }
 
-        // It's possible to have null txState if transaction isn't started yet.
-        if (tx != null && !(tx.state() == TxState.PENDING || tx.state() == 
null)) {
-            return failedFuture(new TransactionException(
-                    "The operation is attempted for completed transaction"));
-        }
-
         boolean implicit = tx == null;
         InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
 
@@ -524,9 +517,9 @@ public class InternalTableImpl implements InternalTable {
             // Track only write requests from explicit transactions.
             if (!txManager.addInflight(tx.id())) {
                 return failedFuture(
-                        new TransactionException(TX_UNEXPECTED_STATE_ERR, 
IgniteStringFormatter.format(
+                        new TransactionException(TX_UNEXPECTED_STATE_ERR, 
format(
                                 "Failed to enlist a write operation into a 
transaction, tx is locked for updates "
-                                        + "[tableName={}, partId={}, 
txState={}]",
+                                        + "[tableName={}, partId={}, 
txState={}].",
                                 tableName,
                                 partId,
                                 tx.state()
@@ -629,8 +622,8 @@ public class InternalTableImpl implements InternalTable {
             } catch (Throwable e) {
                 throw new TransactionException(
                         INTERNAL_ERR,
-                        IgniteStringFormatter.format(
-                                "Failed to invoke the replica request 
[tableName={}, partId={}]",
+                        format(
+                                "Failed to invoke the replica request 
[tableName={}, partId={}].",
                                 tableName,
                                 partId
                         ),
@@ -680,8 +673,8 @@ public class InternalTableImpl implements InternalTable {
             } catch (Throwable e) {
                 throw new TransactionException(
                         INTERNAL_ERR,
-                        IgniteStringFormatter.format(
-                                "Failed to invoke the replica request 
[tableName={}, partId={}]",
+                        format(
+                                "Failed to invoke the replica request 
[tableName={}, partId={}].",
                                 tableName,
                                 partId
                         ),
@@ -1451,7 +1444,7 @@ public class InternalTableImpl implements InternalTable {
     private void validatePartitionIndex(int p) {
         if (p < 0 || p >= partitions) {
             throw new IllegalArgumentException(
-                    IgniteStringFormatter.format(
+                    format(
                             "Invalid partition [partition={}, minValue={}, 
maxValue={}].",
                             p,
                             0,
@@ -1829,9 +1822,8 @@ public class InternalTableImpl implements InternalTable {
                 if (n <= 0) {
                     cancel(null, true);
 
-                    subscriber.onError(new 
IllegalArgumentException(IgniteStringFormatter
-                            .format("Invalid requested amount of items 
[requested={}, minValue=1]", n))
-                    );
+                    subscriber.onError(new IllegalArgumentException(
+                            format("Invalid requested amount of items 
[requested={}, minValue=1].", n)));
                 }
 
                 if (canceled.get()) {
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index e0097627be..dc1e769c05 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
+
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -24,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -35,6 +40,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
 
 /**
  * The read-write implementation of an internal transaction.
@@ -56,6 +62,12 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** A partition which stores the transaction state. */
     private volatile TablePartitionId commitPart;
 
+    /** The lock protects the transaction topology from concurrent 
modification during finishing. */
+    private final ReentrantReadWriteLock enlistPartitionLock = new 
ReentrantReadWriteLock();
+
+    /** The future is initialized when this transaction starts committing or 
rolling back and is finished together with the transaction. */
+    private CompletableFuture<Void> finishFuture;
+
     /**
      * Constructs an explicit read-write transaction.
      *
@@ -90,12 +102,57 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** {@inheritDoc} */
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId 
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
-        return enlisted.computeIfAbsent(tablePartitionId, k -> nodeAndTerm);
+        checkEnlistReady();
+
+        try {
+            enlistPartitionLock.readLock().lock();
+
+            checkEnlistReady();
+
+            return enlisted.computeIfAbsent(tablePartitionId, k -> 
nodeAndTerm);
+        } finally {
+            enlistPartitionLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Checks that this transaction was not finished and will be able to 
enlist another partition.
+     */
+    private void checkEnlistReady() {
+        if (isFinalState(state())) {
+            throw new TransactionException(
+                    TX_FAILED_READ_WRITE_OPERATION_ERR,
+                    format("Transaction is already finished [id={}, 
state={}].", id(), state()));
+        }
     }
 
     /** {@inheritDoc} */
     @Override
     protected CompletableFuture<Void> finish(boolean commit) {
+        if (isFinalState(state())) {
+            return finishFuture;
+        }
+
+        try {
+            enlistPartitionLock.writeLock().lock();
+
+            if (!isFinalState(state())) {
+                finishFuture = finishInternal(commit);
+            }
+
+            return finishFuture;
+        } finally {
+            enlistPartitionLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Internal method for finishing this transaction.
+     *
+     * @param commit {@code true} to commit, false to rollback.
+     * @return The future of transaction completion.
+     */
+    private CompletableFuture<Void> finishInternal(boolean commit) {
         if (!enlisted.isEmpty()) {
             Map<TablePartitionId, Long> enlistedGroups = 
enlisted.entrySet().stream()
                     .collect(Collectors.toMap(
@@ -108,7 +165,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
             ClusterNode recipientNode = nodeAndTerm.get1();
             Long term = nodeAndTerm.get2();
 
-            LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={}, 
groups={}",
+            LOG.debug("Finish [recipientNode={}, term={} commit={}, txId={}, 
groups={}].",
                     recipientNode, term, commit, id(), enlistedGroups);
 
             assert recipientNode != null;

Reply via email to