This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 192ea0ed31 IGNITE-20537 Add ability to create implicit transactions
(#2649)
192ea0ed31 is described below
commit 192ea0ed31271ea99f18f9e50aad3a7839c3ed14
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Oct 3 13:18:17 2023 +0400
IGNITE-20537 Add ability to create implicit transactions (#2649)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 10 +++++
.../internal/sql/engine/SqlQueryProcessor.java | 1 +
.../sql/engine/framework/NoOpTransaction.java | 5 +++
.../distributed/storage/InternalTableImpl.java | 46 +++++++++++-----------
.../RepeatedFinishReadWriteTransactionTest.java | 7 +++-
.../ignite/internal/tx/InternalTransaction.java | 6 +++
.../org/apache/ignite/internal/tx/TxManager.java | 13 +++++-
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 5 +++
.../internal/tx/impl/ReadWriteTransactionImpl.java | 22 ++++++++++-
.../ignite/internal/tx/impl/TxManagerImpl.java | 11 +++++-
.../apache/ignite/internal/tx/TxManagerTest.java | 7 ++++
11 files changed, 106 insertions(+), 27 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index f8b5487c84..836b515d5f 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -140,9 +140,19 @@ public class FakeTxManager implements TxManager {
public HybridTimestamp startTimestamp() {
return timestamp;
}
+
+ @Override
+ public boolean implicit() {
+ return false;
+ }
};
}
+ @Override
+ public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker) {
+ throw new UnsupportedOperationException("Not expected to be called
here");
+ }
+
@Override
public @Nullable TxStateMeta stateMeta(UUID txId) {
return null;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 009917388d..f80787f964 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -499,6 +499,7 @@ public class SqlQueryProcessor implements QueryProcessor {
* @return Wrapper for an active transaction.
* @throws SqlException If an outer transaction was started for a {@link
SqlQueryType#DDL DDL} query.
*/
+ // TODO: IGNITE-20539 - unify creation of implicit transactions.
static QueryTransactionWrapper wrapTxOrStartImplicit(
SqlQueryType queryType,
IgniteTransactions transactions,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index c6621c94c5..1b3091a9c3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -125,6 +125,11 @@ public final class NoOpTransaction implements
InternalTransaction {
return hybridTimestamp;
}
+ @Override
+ public boolean implicit() {
+ return false;
+ }
+
@Override
public UUID id() {
return id;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index da8fadab24..1fca0a43e8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -256,22 +256,20 @@ public class InternalTableImpl implements InternalTable {
);
}
- boolean implicit = tx == null;
-
- InternalTransaction tx0 = implicit ?
txManager.begin(observableTimestampTracker) : tx;
+ InternalTransaction actualTx = startImplicitTxIfNeeded(tx);
int partId = partitionId(row);
TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
- IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
tx0.enlistedNodeAndTerm(partGroupId);
+ IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
actualTx.enlistedNodeAndTerm(partGroupId);
CompletableFuture<R> fut;
if (primaryReplicaAndTerm != null) {
- assert !implicit;
+ assert !actualTx.implicit();
- ReplicaRequest request = fac.apply(tx, partGroupId,
primaryReplicaAndTerm.get2());
+ ReplicaRequest request = fac.apply(actualTx, partGroupId,
primaryReplicaAndTerm.get2());
try {
fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
@@ -281,10 +279,10 @@ public class InternalTableImpl implements InternalTable {
throw new TransactionException("Failed to invoke the replica
request.");
}
} else {
- fut = enlistWithRetry(tx0, partId, term -> fac.apply(tx0,
partGroupId, term), ATTEMPTS_TO_ENLIST_PARTITION);
+ fut = enlistWithRetry(actualTx, partId, term ->
fac.apply(actualTx, partGroupId, term), ATTEMPTS_TO_ENLIST_PARTITION);
}
- return postEnlist(fut, false, tx0, implicit);
+ return postEnlist(fut, false, actualTx, actualTx.implicit());
}
/**
@@ -314,33 +312,33 @@ public class InternalTableImpl implements InternalTable {
);
}
- boolean implicit = tx == null;
-
// It's possible to have null txState if transaction isn't started yet.
- if (!implicit && !(tx.state() == TxState.PENDING || tx.state() ==
null)) {
+ if (tx != null && !(tx.state() == TxState.PENDING || tx.state() ==
null)) {
return failedFuture(new TransactionException(
"The operation is attempted for completed transaction"));
}
- InternalTransaction tx0 = implicit ?
txManager.begin(observableTimestampTracker) : tx;
+ InternalTransaction actualTx = startImplicitTxIfNeeded(tx);
Int2ObjectMap<RowBatch> rowBatchByPartitionId =
toRowBatchByPartitionId(keyRows);
boolean singlePart = rowBatchByPartitionId.size() == 1;
+ boolean implicit = actualTx.implicit();
+
for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch :
rowBatchByPartitionId.int2ObjectEntrySet()) {
int partitionId = partitionRowBatch.getIntKey();
RowBatch rowBatch = partitionRowBatch.getValue();
TablePartitionId partGroupId = new TablePartitionId(tableId,
partitionId);
- IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
tx0.enlistedNodeAndTerm(partGroupId);
+ IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
actualTx.enlistedNodeAndTerm(partGroupId);
CompletableFuture<Object> fut;
if (primaryReplicaAndTerm != null) {
assert !implicit;
- ReplicaRequest request = fac.apply(rowBatch.requestedRows,
tx0, partGroupId, primaryReplicaAndTerm.get2(), false);
+ ReplicaRequest request = fac.apply(rowBatch.requestedRows,
actualTx, partGroupId, primaryReplicaAndTerm.get2(), false);
try {
fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(),
request);
@@ -351,9 +349,9 @@ public class InternalTableImpl implements InternalTable {
}
} else {
fut = enlistWithRetry(
- tx0,
+ actualTx,
partitionId,
- term -> fac.apply(rowBatch.requestedRows, tx0,
partGroupId, term, implicit && singlePart),
+ term -> fac.apply(rowBatch.requestedRows, actualTx,
partGroupId, term, implicit && singlePart),
ATTEMPTS_TO_ENLIST_PARTITION
);
}
@@ -363,7 +361,11 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<T> fut =
reducer.apply(rowBatchByPartitionId.values());
- return postEnlist(fut, implicit && !singlePart, tx0, implicit &&
singlePart);
+ return postEnlist(fut, implicit && !singlePart, actualTx, implicit &&
singlePart);
+ }
+
+ private InternalTransaction startImplicitTxIfNeeded(@Nullable
InternalTransaction tx) {
+ return tx == null ?
txManager.beginImplicit(observableTimestampTracker) : tx;
}
/**
@@ -711,7 +713,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, int
partition) {
- InternalTransaction tx = txManager.begin(observableTimestampTracker);
+ InternalTransaction tx =
txManager.beginImplicit(observableTimestampTracker);
TablePartitionId partGroupId = new TablePartitionId(tableId,
partition);
CompletableFuture<Void> fut = enlistWithRetry(
@@ -1070,13 +1072,13 @@ public class InternalTableImpl implements InternalTable
{
validatePartitionIndex(partId);
- boolean implicit = tx == null;
+ InternalTransaction actualTx = startImplicitTxIfNeeded(tx);
- InternalTransaction tx0 = implicit ?
txManager.begin(observableTimestampTracker) : tx;
+ boolean implicit = actualTx.implicit();
return new PartitionScanPublisher(
(scanId, batchSize) -> enlistCursorInTx(
- tx0,
+ actualTx,
partId,
scanId,
batchSize,
@@ -1088,7 +1090,7 @@ public class InternalTableImpl implements InternalTable {
columnsToInclude,
implicit
),
- (commit, fut) -> postEnlist(fut, commit, tx0, implicit &&
!commit)
+ (commit, fut) -> postEnlist(fut, commit, actualTx, implicit &&
!commit)
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
index ed114e14d4..863d01f5e2 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/RepeatedFinishReadWriteTransactionTest.java
@@ -237,7 +237,7 @@ public class RepeatedFinishReadWriteTransactionTest extends
BaseIgniteAbstractTe
@Override
public InternalTransaction begin(HybridTimestampTracker
timestampTracker) {
- return null;
+ throw new UnsupportedOperationException("Not implemented");
}
@Override
@@ -245,6 +245,11 @@ public class RepeatedFinishReadWriteTransactionTest
extends BaseIgniteAbstractTe
return null;
}
+ @Override
+ public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
@Override
public @Nullable TxStateMeta stateMeta(UUID txId) {
return null;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index 1ef01b7c07..784a1a1b35 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -99,4 +99,10 @@ public interface InternalTransaction extends Transaction {
* @return Timestamp that is used to obtain the effective schema version
used inside the transaction.
*/
HybridTimestamp startTimestamp();
+
+ /**
+ * Returns whether this transaction is implicit (i.e. started by the
system automatically when the user
+ * provided no transaction for an operation) or not.
+ */
+ boolean implicit();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 280cc91e74..fc06c3ea15 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -37,7 +37,7 @@ import org.jetbrains.annotations.TestOnly;
*/
public interface TxManager extends IgniteComponent {
/**
- * Starts a read-write transaction coordinated by a local node.
+ * Starts an explicit read-write transaction coordinated by a local node.
*
* @param timestampTracker Observable timestamp tracker is used to track a
timestamp for either read-write or read-only
* transaction execution. The tracker is also used to determine
the read timestamp for read-only transactions.
@@ -46,7 +46,7 @@ public interface TxManager extends IgniteComponent {
InternalTransaction begin(HybridTimestampTracker timestampTracker);
/**
- * Starts either read-write or read-only transaction, depending on {@code
readOnly} parameter value.
+ * Starts either explicit read-write or read-only transaction, depending
on {@code readOnly} parameter value.
*
* @param timestampTracker Observable timestamp tracker is used to track a
timestamp for either read-write or read-only
* transaction execution. The tracker is also used to determine
the read timestamp for read-only transactions. Each client
@@ -59,6 +59,15 @@ public interface TxManager extends IgniteComponent {
*/
InternalTransaction begin(HybridTimestampTracker timestampTracker, boolean
readOnly);
+ /**
+ * Starts an implicit read-write transaction coordinated by a local node.
+ *
+ * @param timestampTracker Observable timestamp tracker is used to track a
timestamp for either read-write or read-only
+ * transaction execution. The tracker is also used to determine
the read timestamp for read-only transactions.
+ * @return The transaction.
+ */
+ InternalTransaction beginImplicit(HybridTimestampTracker timestampTracker);
+
/**
* Returns a transaction state meta.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 7e60c2f745..8cfd7b6da8 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -67,6 +67,11 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
return readTimestamp;
}
+ @Override
+ public boolean implicit() {
+ return false;
+ }
+
@Override
public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
// TODO: IGNITE-17666 Close cursor tx finish and do it on the first
finish invocation only.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index f26090ab2f..e90ed3e2a4 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -62,6 +62,8 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** The tracker is used to track an observable timestamp. */
private final HybridTimestampTracker observableTsTracker;
+ private final boolean implicit;
+
/** A partition which stores the transaction state. */
private volatile TablePartitionId commitPart;
@@ -69,16 +71,29 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
private volatile CompletableFuture<Void> finishFut;
/**
- * The constructor.
+ * Constructs an explicit read-write transaction.
*
* @param txManager The tx manager.
* @param observableTsTracker Observable timestamp tracker.
* @param id The id.
*/
public ReadWriteTransactionImpl(TxManager txManager,
HybridTimestampTracker observableTsTracker, UUID id) {
+ this(txManager, observableTsTracker, id, false);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param txManager The tx manager.
+ * @param observableTsTracker Observable timestamp tracker.
+ * @param id The id.
+ * @param implicit Whether the transaction will be implicit or not.
+ */
+ public ReadWriteTransactionImpl(TxManager txManager,
HybridTimestampTracker observableTsTracker, UUID id, boolean implicit) {
super(txManager, id);
this.observableTsTracker = observableTsTracker;
+ this.implicit = implicit;
}
/** {@inheritDoc} */
@@ -185,4 +200,9 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
public HybridTimestamp startTimestamp() {
return TransactionIds.beginTimestamp(id());
}
+
+ @Override
+ public boolean implicit() {
+ return implicit;
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index f8adbe49a4..a14b5964a5 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -152,12 +152,21 @@ public class TxManagerImpl implements TxManager {
@Override
public InternalTransaction begin(HybridTimestampTracker timestampTracker,
boolean readOnly) {
+ return beginTx(timestampTracker, readOnly, false);
+ }
+
+ @Override
+ public InternalTransaction beginImplicit(HybridTimestampTracker
timestampTracker) {
+ return beginTx(timestampTracker, false, true);
+ }
+
+ private InternalTransaction beginTx(HybridTimestampTracker
timestampTracker, boolean readOnly, boolean implicit) {
HybridTimestamp beginTimestamp = clock.now();
UUID txId = transactionIdGenerator.transactionIdFor(beginTimestamp);
updateTxMeta(txId, old -> new TxStateMeta(PENDING, localNodeId.get(),
null));
if (!readOnly) {
- return new ReadWriteTransactionImpl(this, timestampTracker, txId);
+ return new ReadWriteTransactionImpl(this, timestampTracker, txId,
implicit);
}
HybridTimestamp observableTimestamp = timestampTracker.get();
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 0556b23fb8..5b6d73d1c1 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -107,10 +107,17 @@ public class TxManagerTest extends IgniteAbstractTest {
InternalTransaction tx0 = txManager.begin(hybridTimestampTracker);
InternalTransaction tx1 = txManager.begin(hybridTimestampTracker);
InternalTransaction tx2 = txManager.begin(hybridTimestampTracker,
true);
+ InternalTransaction tx3 =
txManager.beginImplicit(hybridTimestampTracker);
assertNotNull(tx0.id());
assertNotNull(tx1.id());
assertNotNull(tx2.id());
+ assertNotNull(tx3.id());
+
+ assertFalse(tx0.implicit());
+ assertFalse(tx1.implicit());
+ assertFalse(tx2.implicit());
+ assertTrue(tx3.implicit());
}
@Test