This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 192ea0ed31 IGNITE-20537 Add ability to create implicit transactions 
(#2649)
192ea0ed31 is described below

commit 192ea0ed31271ea99f18f9e50aad3a7839c3ed14
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Oct 3 13:18:17 2023 +0400

    IGNITE-20537 Add ability to create implicit transactions (#2649)
---
 .../apache/ignite/client/fakes/FakeTxManager.java  | 10 +++++
 .../internal/sql/engine/SqlQueryProcessor.java     |  1 +
 .../sql/engine/framework/NoOpTransaction.java      |  5 +++
 .../distributed/storage/InternalTableImpl.java     | 46 +++++++++++-----------
 .../RepeatedFinishReadWriteTransactionTest.java    |  7 +++-
 .../ignite/internal/tx/InternalTransaction.java    |  6 +++
 .../org/apache/ignite/internal/tx/TxManager.java   | 13 +++++-
 .../internal/tx/impl/ReadOnlyTransactionImpl.java  |  5 +++
 .../internal/tx/impl/ReadWriteTransactionImpl.java | 22 ++++++++++-
 .../ignite/internal/tx/impl/TxManagerImpl.java     | 11 +++++-
 .../apache/ignite/internal/tx/TxManagerTest.java   |  7 ++++
 11 files changed, 106 insertions(+), 27 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index f8b5487c84..836b515d5f 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -140,9 +140,19 @@ public class FakeTxManager implements TxManager {
             public HybridTimestamp startTimestamp() {
                 return timestamp;
             }
+
+            @Override
+            public boolean implicit() {
+                return false;
+            }
         };
     }
 
+    @Override
+    public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker) {
+        throw new UnsupportedOperationException("Not expected to be called 
here");
+    }
+
     @Override
     public @Nullable TxStateMeta stateMeta(UUID txId) {
         return null;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 009917388d..f80787f964 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -499,6 +499,7 @@ public class SqlQueryProcessor implements QueryProcessor {
      * @return Wrapper for an active transaction.
      * @throws SqlException If an outer transaction was started for a {@link 
SqlQueryType#DDL DDL} query.
      */
+    // TODO: IGNITE-20539 - unify creation of implicit transactions.
     static QueryTransactionWrapper wrapTxOrStartImplicit(
             SqlQueryType queryType,
             IgniteTransactions transactions,
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index c6621c94c5..1b3091a9c3 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -125,6 +125,11 @@ public final class NoOpTransaction implements 
InternalTransaction {
         return hybridTimestamp;
     }
 
+    @Override
+    public boolean implicit() {
+        return false;
+    }
+
     @Override
     public UUID id() {
         return id;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index da8fadab24..1fca0a43e8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -256,22 +256,20 @@ public class InternalTableImpl implements InternalTable {
             );
         }
 
-        boolean implicit = tx == null;
-
-        InternalTransaction tx0 = implicit ? 
txManager.begin(observableTimestampTracker) : tx;
+        InternalTransaction actualTx = startImplicitTxIfNeeded(tx);
 
         int partId = partitionId(row);
 
         TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
 
-        IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = 
tx0.enlistedNodeAndTerm(partGroupId);
+        IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = 
actualTx.enlistedNodeAndTerm(partGroupId);
 
         CompletableFuture<R> fut;
 
         if (primaryReplicaAndTerm != null) {
-            assert !implicit;
+            assert !actualTx.implicit();
 
-            ReplicaRequest request = fac.apply(tx, partGroupId, 
primaryReplicaAndTerm.get2());
+            ReplicaRequest request = fac.apply(actualTx, partGroupId, 
primaryReplicaAndTerm.get2());
 
             try {
                 fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -281,10 +279,10 @@ public class InternalTableImpl implements InternalTable {
                 throw new TransactionException("Failed to invoke the replica 
request.");
             }
         } else {
-            fut = enlistWithRetry(tx0, partId, term -> fac.apply(tx0, 
partGroupId, term), ATTEMPTS_TO_ENLIST_PARTITION);
+            fut = enlistWithRetry(actualTx, partId, term -> 
fac.apply(actualTx, partGroupId, term), ATTEMPTS_TO_ENLIST_PARTITION);
         }
 
-        return postEnlist(fut, false, tx0, implicit);
+        return postEnlist(fut, false, actualTx, actualTx.implicit());
     }
 
     /**
@@ -314,33 +312,33 @@ public class InternalTableImpl implements InternalTable {
             );
         }
 
-        boolean implicit = tx == null;
-
         // It's possible to have null txState if transaction isn't started yet.
-        if (!implicit && !(tx.state() == TxState.PENDING || tx.state() == 
null)) {
+        if (tx != null && !(tx.state() == TxState.PENDING || tx.state() == 
null)) {
             return failedFuture(new TransactionException(
                     "The operation is attempted for completed transaction"));
         }
 
-        InternalTransaction tx0 = implicit ? 
txManager.begin(observableTimestampTracker) : tx;
+        InternalTransaction actualTx = startImplicitTxIfNeeded(tx);
 
         Int2ObjectMap<RowBatch> rowBatchByPartitionId = 
toRowBatchByPartitionId(keyRows);
 
         boolean singlePart = rowBatchByPartitionId.size() == 1;
 
+        boolean implicit = actualTx.implicit();
+
         for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : 
rowBatchByPartitionId.int2ObjectEntrySet()) {
             int partitionId = partitionRowBatch.getIntKey();
             RowBatch rowBatch = partitionRowBatch.getValue();
 
             TablePartitionId partGroupId = new TablePartitionId(tableId, 
partitionId);
 
-            IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = 
tx0.enlistedNodeAndTerm(partGroupId);
+            IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm = 
actualTx.enlistedNodeAndTerm(partGroupId);
 
             CompletableFuture<Object> fut;
 
             if (primaryReplicaAndTerm != null) {
                 assert !implicit;
-                ReplicaRequest request = fac.apply(rowBatch.requestedRows, 
tx0, partGroupId, primaryReplicaAndTerm.get2(), false);
+                ReplicaRequest request = fac.apply(rowBatch.requestedRows, 
actualTx, partGroupId, primaryReplicaAndTerm.get2(), false);
 
                 try {
                     fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), 
request);
@@ -351,9 +349,9 @@ public class InternalTableImpl implements InternalTable {
                 }
             } else {
                 fut = enlistWithRetry(
-                        tx0,
+                        actualTx,
                         partitionId,
-                        term -> fac.apply(rowBatch.requestedRows, tx0, 
partGroupId, term, implicit && singlePart),
+                        term -> fac.apply(rowBatch.requestedRows, actualTx, 
partGroupId, term, implicit && singlePart),
                         ATTEMPTS_TO_ENLIST_PARTITION
                 );
             }
@@ -363,7 +361,11 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<T> fut = 
reducer.apply(rowBatchByPartitionId.values());
 
-        return postEnlist(fut, implicit && !singlePart, tx0, implicit && 
singlePart);
+        return postEnlist(fut, implicit && !singlePart, actualTx, implicit && 
singlePart);
+    }
+
+    private InternalTransaction startImplicitTxIfNeeded(@Nullable 
InternalTransaction tx) {
+        return tx == null ? 
txManager.beginImplicit(observableTimestampTracker) : tx;
     }
 
     /**
@@ -711,7 +713,7 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int 
partition) {
-        InternalTransaction tx = txManager.begin(observableTimestampTracker);
+        InternalTransaction tx = 
txManager.beginImplicit(observableTimestampTracker);
         TablePartitionId partGroupId = new TablePartitionId(tableId, 
partition);
 
         CompletableFuture<Void> fut = enlistWithRetry(
@@ -1070,13 +1072,13 @@ public class InternalTableImpl implements InternalTable 
{
 
         validatePartitionIndex(partId);
 
-        boolean implicit = tx == null;
+        InternalTransaction actualTx = startImplicitTxIfNeeded(tx);
 
-        InternalTransaction tx0 = implicit ? 
txManager.begin(observableTimestampTracker) : tx;
+        boolean implicit = actualTx.implicit();
 
         return new PartitionScanPublisher(
                 (scanId, batchSize) -> enlistCursorInTx(
-                        tx0,
+                        actualTx,
                         partId,
                         scanId,
                         batchSize,
@@ -1088,7 +1090,7 @@ public class InternalTableImpl implements InternalTable {
                         columnsToInclude,
                         implicit
                 ),
-                (commit, fut) -> postEnlist(fut, commit, tx0, implicit && 
!commit)
+                (commit, fut) -> postEnlist(fut, commit, actualTx, implicit && 
!commit)
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
index ed114e14d4..863d01f5e2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
@@ -237,7 +237,7 @@ public class RepeatedFinishReadWriteTransactionTest extends 
BaseIgniteAbstractTe
 
         @Override
         public InternalTransaction begin(HybridTimestampTracker 
timestampTracker) {
-            return null;
+            throw new UnsupportedOperationException("Not implemented");
         }
 
         @Override
@@ -245,6 +245,11 @@ public class RepeatedFinishReadWriteTransactionTest 
extends BaseIgniteAbstractTe
             return null;
         }
 
+        @Override
+        public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+
         @Override
         public @Nullable TxStateMeta stateMeta(UUID txId) {
             return null;
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index 1ef01b7c07..784a1a1b35 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -99,4 +99,10 @@ public interface InternalTransaction extends Transaction {
      * @return Timestamp that is used to obtain the effective schema version 
used inside the transaction.
      */
     HybridTimestamp startTimestamp();
+
+    /**
+     * Returns whether this transaction is implicit (i.e. started by the 
system automatically when the user
+     * provided no transaction for an operation) or not.
+     */
+    boolean implicit();
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 280cc91e74..fc06c3ea15 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -37,7 +37,7 @@ import org.jetbrains.annotations.TestOnly;
  */
 public interface TxManager extends IgniteComponent {
     /**
-     * Starts a read-write transaction coordinated by a local node.
+     * Starts an explicit read-write transaction coordinated by a local node.
      *
      * @param timestampTracker Observable timestamp tracker is used to track a 
timestamp for either read-write or read-only
      *         transaction execution. The tracker is also used to determine 
the read timestamp for read-only transactions.
@@ -46,7 +46,7 @@ public interface TxManager extends IgniteComponent {
     InternalTransaction begin(HybridTimestampTracker timestampTracker);
 
     /**
-     * Starts either read-write or read-only transaction, depending on {@code 
readOnly} parameter value.
+     * Starts either explicit read-write or read-only transaction, depending 
on {@code readOnly} parameter value.
      *
      * @param timestampTracker Observable timestamp tracker is used to track a 
timestamp for either read-write or read-only
      *         transaction execution. The tracker is also used to determine 
the read timestamp for read-only transactions. Each client
@@ -59,6 +59,15 @@ public interface TxManager extends IgniteComponent {
      */
     InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean 
readOnly);
 
+    /**
+     * Starts an implicit read-write transaction coordinated by a local node.
+     *
+     * @param timestampTracker Observable timestamp tracker is used to track a 
timestamp for either read-write or read-only
+     *         transaction execution. The tracker is also used to determine 
the read timestamp for read-only transactions.
+     * @return The transaction.
+     */
+    InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker);
+
     /**
      * Returns a transaction state meta.
      *
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 7e60c2f745..8cfd7b6da8 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -67,6 +67,11 @@ class ReadOnlyTransactionImpl extends 
IgniteAbstractTransactionImpl {
         return readTimestamp;
     }
 
+    @Override
+    public boolean implicit() {
+        return false;
+    }
+
     @Override
     public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId 
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
         // TODO: IGNITE-17666 Close cursor tx finish and do it on the first 
finish invocation only.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index f26090ab2f..e90ed3e2a4 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -62,6 +62,8 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     /** The tracker is used to track an observable timestamp. */
     private final HybridTimestampTracker observableTsTracker;
 
+    private final boolean implicit;
+
     /** A partition which stores the transaction state. */
     private volatile TablePartitionId commitPart;
 
@@ -69,16 +71,29 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     private volatile CompletableFuture<Void> finishFut;
 
     /**
-     * The constructor.
+     * Constructs an explicit read-write transaction.
      *
      * @param txManager The tx manager.
      * @param observableTsTracker Observable timestamp tracker.
      * @param id The id.
      */
     public ReadWriteTransactionImpl(TxManager txManager, 
HybridTimestampTracker observableTsTracker, UUID id) {
+        this(txManager, observableTsTracker, id, false);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param txManager The tx manager.
+     * @param observableTsTracker Observable timestamp tracker.
+     * @param id The id.
+     * @param implicit Whether the transaction will be implicit or not.
+     */
+    public ReadWriteTransactionImpl(TxManager txManager, 
HybridTimestampTracker observableTsTracker, UUID id, boolean implicit) {
         super(txManager, id);
 
         this.observableTsTracker = observableTsTracker;
+        this.implicit = implicit;
     }
 
     /** {@inheritDoc} */
@@ -185,4 +200,9 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
     public HybridTimestamp startTimestamp() {
         return TransactionIds.beginTimestamp(id());
     }
+
+    @Override
+    public boolean implicit() {
+        return implicit;
+    }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index f8adbe49a4..a14b5964a5 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -152,12 +152,21 @@ public class TxManagerImpl implements TxManager {
 
     @Override
     public InternalTransaction begin(HybridTimestampTracker timestampTracker, 
boolean readOnly) {
+        return beginTx(timestampTracker, readOnly, false);
+    }
+
+    @Override
+    public InternalTransaction beginImplicit(HybridTimestampTracker 
timestampTracker) {
+        return beginTx(timestampTracker, false, true);
+    }
+
+    private InternalTransaction beginTx(HybridTimestampTracker 
timestampTracker, boolean readOnly, boolean implicit) {
         HybridTimestamp beginTimestamp = clock.now();
         UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp);
         updateTxMeta(txId, old -> new TxStateMeta(PENDING, localNodeId.get(), 
null));
 
         if (!readOnly) {
-            return new ReadWriteTransactionImpl(this, timestampTracker, txId);
+            return new ReadWriteTransactionImpl(this, timestampTracker, txId, 
implicit);
         }
 
         HybridTimestamp observableTimestamp = timestampTracker.get();
diff --git 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 0556b23fb8..5b6d73d1c1 100644
--- 
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++ 
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -107,10 +107,17 @@ public class TxManagerTest extends IgniteAbstractTest {
         InternalTransaction tx0 = txManager.begin(hybridTimestampTracker);
         InternalTransaction tx1 = txManager.begin(hybridTimestampTracker);
         InternalTransaction tx2 = txManager.begin(hybridTimestampTracker, 
true);
+        InternalTransaction tx3 = 
txManager.beginImplicit(hybridTimestampTracker);
 
         assertNotNull(tx0.id());
         assertNotNull(tx1.id());
         assertNotNull(tx2.id());
+        assertNotNull(tx3.id());
+
+        assertFalse(tx0.implicit());
+        assertFalse(tx1.implicit());
+        assertFalse(tx2.implicit());
+        assertTrue(tx3.implicit());
     }
 
     @Test

Reply via email to