This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d66afcc990 IGNITE-20377 Rename term to consistencyToken (#3110)
d66afcc990 is described below
commit d66afcc990a5698bc6306c60ce495be5fb645414
Author: Cyrill <[email protected]>
AuthorDate: Wed Jan 31 12:47:26 2024 +0300
IGNITE-20377 Rename term to consistencyToken (#3110)
---
.../apache/ignite/client/fakes/FakeTxManager.java | 4 +-
.../ignite/internal/table/ItTableScanTest.java | 2 +-
.../internal/sql/engine/SqlQueryProcessor.java | 8 +-
.../sql/engine/exec/DestinationFactory.java | 2 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 11 +-
.../sql/engine/exec/LogicalRelImplementor.java | 4 +-
...WithTerm.java => NodeWithConsistencyToken.java} | 30 ++--
...erm.java => PartitionWithConsistencyToken.java} | 22 +--
.../internal/sql/engine/exec/ScannableTable.java | 38 +++--
.../sql/engine/exec/ScannableTableImpl.java | 24 +--
.../sql/engine/exec/UpdatableTableImpl.java | 18 +-
.../sql/engine/exec/mapping/ColocationGroup.java | 26 +--
.../exec/mapping/ExecutionTargetFactory.java | 6 +-
.../sql/engine/exec/mapping/FragmentMapper.java | 4 +-
.../exec/mapping/smallcluster/AbstractTarget.java | 16 +-
.../mapping/smallcluster/PartitionedTarget.java | 8 +-
.../mapping/smallcluster/SmallClusterFactory.java | 14 +-
.../sql/engine/exec/rel/IndexScanNode.java | 37 +++--
.../sql/engine/exec/rel/TableScanNode.java | 19 ++-
.../exec/rel/IndexScanNodeExecutionTest.java | 28 ++--
.../engine/exec/rel/ScannableTableSelfTest.java | 95 +++++++----
.../exec/rel/TableScanNodeExecutionTest.java | 10 +-
.../sql/engine/framework/NoOpTransaction.java | 6 +-
.../sql/engine/framework/TestBuilders.java | 42 +++--
.../sql/engine/framework/TestClusterTest.java | 14 +-
.../internal/sql/engine/planner/PlannerTest.java | 8 +-
.../ItInternalTableReadWriteScanTest.java | 6 +-
.../replicator/PartitionReplicaListener.java | 1 -
.../distributed/storage/InternalTableImpl.java | 182 ++++++++++++---------
.../ignite/internal/utils/PrimaryReplica.java | 24 +--
.../ignite/internal/tx/InternalTransaction.java | 8 +-
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 7 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 11 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 16 +-
.../apache/ignite/internal/tx/TxManagerTest.java | 2 +-
35 files changed, 433 insertions(+), 320 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 8cc14de9e9..b5f4bb1ffd 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -84,7 +84,7 @@ public class FakeTxManager implements TxManager {
}
@Override
- public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndTerm(TablePartitionId tablePartitionId) {
+ public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) {
return null;
}
@@ -106,7 +106,7 @@ public class FakeTxManager implements TxManager {
@Override
public IgniteBiTuple<ClusterNode, Long> enlist(
TablePartitionId tablePartitionId,
- IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+ IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken) {
return null;
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 8c67ba0eb0..68a0ba44bf 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -678,7 +678,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
}
private PrimaryReplica getPrimaryReplica(int partId, InternalTransaction
tx) {
- IgniteBiTuple<ClusterNode, Long> primaryReplica =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+ IgniteBiTuple<ClusterNode, Long> primaryReplica =
tx.enlistedNodeAndConsistencyToken(new TablePartitionId(table.tableId(),
partId));
return new PrimaryReplica(primaryReplica.get1(),
primaryReplica.get2());
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 0098a8ee2e..48718aa4f8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -83,7 +83,7 @@ import
org.apache.ignite.internal.sql.engine.exec.ExecutionService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionServiceImpl;
import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
@@ -370,7 +370,7 @@ public class SqlQueryProcessor implements QueryProcessor {
// need to be refactored after TODO:
https://issues.apache.org/jira/browse/IGNITE-20925
/** Get primary replicas. */
- private CompletableFuture<List<NodeWithTerm>> primaryReplicas(int tableId)
{
+ private CompletableFuture<List<NodeWithConsistencyToken>>
primaryReplicas(int tableId) {
int catalogVersion = catalogManager.latestCatalogVersion();
Catalog catalog = catalogManager.catalog(catalogVersion);
@@ -381,7 +381,7 @@ public class SqlQueryProcessor implements QueryProcessor {
int partitions = zoneDesc.partitions();
- List<CompletableFuture<NodeWithTerm>> result = new
ArrayList<>(partitions);
+ List<CompletableFuture<NodeWithConsistencyToken>> result = new
ArrayList<>(partitions);
HybridTimestamp clockNow = clock.now();
@@ -408,7 +408,7 @@ public class SqlQueryProcessor implements QueryProcessor {
assert holder != null : "Unable to map query, nothing
holds the lease";
- return new NodeWithTerm(holder,
primaryReplica.getStartTime().longValue());
+ return new NodeWithConsistencyToken(holder,
primaryReplica.getStartTime().longValue());
}
}));
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
index 3e7b14ba84..c41e71d616 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/DestinationFactory.java
@@ -93,7 +93,7 @@ class DestinationFactory<RowT> {
assert !nullOrEmpty(group.assignments()) && !nullOrEmpty(keys);
- List<String> assignments =
Commons.transform(group.assignments(), NodeWithTerm::name);
+ List<String> assignments =
Commons.transform(group.assignments(), NodeWithConsistencyToken::name);
if (function.affinity()) {
int tableId = ((AffinityDistribution) function).tableId();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 16ec98567e..4946f48cd0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -1004,7 +1004,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return super.visit(rel);
}
- private void enlist(int tableId, List<NodeWithTerm>
assignments) {
+ private void enlist(int tableId,
List<NodeWithConsistencyToken> assignments) {
if (assignments.isEmpty()) {
return;
}
@@ -1016,10 +1016,13 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
for (int p = 0; p < partsCnt; p++) {
TablePartitionId tablePartId = new
TablePartitionId(tableId, p);
- NodeWithTerm enlistmentToken = assignments.get(p);
+ NodeWithConsistencyToken assignment =
assignments.get(p);
tx.enlist(tablePartId,
- new
IgniteBiTuple<>(topSrvc.getByConsistentId(enlistmentToken.name()),
enlistmentToken.term()));
+ new IgniteBiTuple<>(
+
topSrvc.getByConsistentId(assignment.name()),
+
assignment.enlistmentConsistencyToken())
+ );
}
}
@@ -1027,7 +1030,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
int tableId =
rel.getTable().unwrap(IgniteTable.class).id();
ColocationGroup colocationGroup =
mappedFragment.groupsBySourceId().get(rel.sourceId());
- List<NodeWithTerm> assignments =
colocationGroup.assignments();
+ List<NodeWithConsistencyToken> assignments =
colocationGroup.assignments();
enlist(tableId, assignments);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 035ace44a0..92beb1150d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -399,7 +399,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
idx,
scannableTable,
tbl.descriptor(),
- group.partitionsWithTerms(ctx.localNode().name()),
+ group.partitionsWithConsistencyTokens(ctx.localNode().name()),
comp,
ranges,
filters,
@@ -440,7 +440,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
ctx,
rowFactory,
scannableTable,
- group.partitionsWithTerms(ctx.localNode().name()),
+ group.partitionsWithConsistencyTokens(ctx.localNode().name()),
filters,
prj,
requiredColumns == null ? null : requiredColumns.toBitSet()
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/NodeWithTerm.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/NodeWithConsistencyToken.java
similarity index 65%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/NodeWithTerm.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/NodeWithConsistencyToken.java
index 32bbe2c840..9e11e80923 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/NodeWithTerm.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/NodeWithConsistencyToken.java
@@ -22,27 +22,27 @@ import java.util.Objects;
import org.apache.ignite.internal.tostring.S;
/**
- * Tuple representing primary replica node name with current term.
+ * Tuple representing primary replica node name with node enlistment
consistency token.
*/
-public class NodeWithTerm implements Serializable {
+public class NodeWithConsistencyToken implements Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
/** Primary replica node name. */
private final String name;
- /** Primary replica term. */
- private final long term;
+ /** Enlistment consistency token. */
+ private final long enlistmentConsistencyToken;
/**
* Constructor.
*
* @param name Primary replica node name.
- * @param term Primary replica term.
+ * @param enlistmentConsistencyToken Enlistment consistency token.
*/
- public NodeWithTerm(String name, long term) {
+ public NodeWithConsistencyToken(String name, long
enlistmentConsistencyToken) {
this.name = name;
- this.term = term;
+ this.enlistmentConsistencyToken = enlistmentConsistencyToken;
}
/**
@@ -55,12 +55,12 @@ public class NodeWithTerm implements Serializable {
}
/**
- * Gets primary replica term.
+ * Gets enlistment consistency token.
*
- * @return Primary replica term.
+ * @return Enlistment consistency token.
*/
- public long term() {
- return term;
+ public long enlistmentConsistencyToken() {
+ return enlistmentConsistencyToken;
}
/** {@inheritDoc} */
@@ -72,19 +72,19 @@ public class NodeWithTerm implements Serializable {
if (o == null || getClass() != o.getClass()) {
return false;
}
- NodeWithTerm that = (NodeWithTerm) o;
- return term == that.term && Objects.equals(name, that.name);
+ NodeWithConsistencyToken that = (NodeWithConsistencyToken) o;
+ return enlistmentConsistencyToken == that.enlistmentConsistencyToken
&& Objects.equals(name, that.name);
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(name, term);
+ return Objects.hash(name, enlistmentConsistencyToken);
}
/** {@inheritDoc} */
@Override
public String toString() {
- return S.toString(NodeWithTerm.class, this);
+ return S.toString(NodeWithConsistencyToken.class, this);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionWithTerm.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionWithConsistencyToken.java
similarity index 65%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionWithTerm.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionWithConsistencyToken.java
index 995534e250..f17a0be06c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionWithTerm.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/PartitionWithConsistencyToken.java
@@ -18,24 +18,24 @@
package org.apache.ignite.internal.sql.engine.exec;
/**
- * Tuple representing the number of the partition with its current primary
replica term.
+ * Tuple representing the number of the partition with node enlistment
consistency token.
*/
-public class PartitionWithTerm {
+public class PartitionWithConsistencyToken {
/** Partition number. */
private final int partId;
- /** Primary replica term. */
- private final long term;
+ /** Enlistment consistency token. */
+ private final long enlistmentConsistencyToken;
/**
* Constructor.
*
* @param partId partition number
- * @param term Primary replica term.
+ * @param enlistmentConsistencyToken Enlistment consistency token.
*/
- public PartitionWithTerm(int partId, Long term) {
+ public PartitionWithConsistencyToken(int partId, Long
enlistmentConsistencyToken) {
this.partId = partId;
- this.term = term;
+ this.enlistmentConsistencyToken = enlistmentConsistencyToken;
}
/**
@@ -48,11 +48,11 @@ public class PartitionWithTerm {
}
/**
- * Gets primary replica term.
+ * Gets enlistment consistency token.
*
- * @return Primary replica term.
+ * @return Enlistment consistency token.
*/
- public long term() {
- return term;
+ public long enlistmentConsistencyToken() {
+ return enlistmentConsistencyToken;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTable.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTable.java
index 3fe4fa5a90..1902e1f250 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTable.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTable.java
@@ -33,21 +33,25 @@ public interface ScannableTable {
* Performs a scan over table.
*
* @param ctx Execution context.
- * @param partWithTerm Partition.
+ * @param partWithConsistencyToken Partition.
* @param rowFactory Row factory.
* @param requiredColumns Required columns.
* @return A publisher that produces rows.
* @param <RowT> A type of row.
*/
- <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, PartitionWithTerm
partWithTerm,
- RowFactory<RowT> rowFactory, @Nullable BitSet requiredColumns);
+ <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ );
/**
* Performs range scan using the given index.
*
* @param <RowT> A type of row.
* @param ctx Execution context.
- * @param partWithTerm Partition.
+ * @param partWithConsistencyToken Partition.
* @param rowFactory Row factory.
* @param indexId Index id.
* @param columns Index columns.
@@ -55,16 +59,22 @@ public interface ScannableTable {
* @param requiredColumns Required columns.
* @return A publisher that produces rows.
*/
- <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm,
- RowFactory<RowT> rowFactory, int indexId, List<String> columns,
- @Nullable RangeCondition<RowT> cond, @Nullable BitSet
requiredColumns);
+ <RowT> Publisher<RowT> indexRangeScan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ int indexId,
+ List<String> columns,
+ @Nullable RangeCondition<RowT> cond,
+ @Nullable BitSet requiredColumns
+ );
/**
* Performs a lookup scan using the given index.
*
* @param <RowT> A type of row.
* @param ctx Execution context.
- * @param partWithTerm Partition.
+ * @param partWithConsistencyToken Partition.
* @param rowFactory Row factory.
* @param indexId Index id.
* @param columns Index columns.
@@ -72,7 +82,13 @@ public interface ScannableTable {
* @param requiredColumns Required columns.
* @return A publisher that produces rows.
*/
- <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm,
- RowFactory<RowT> rowFactory, int indexId, List<String> columns,
- RowT key, @Nullable BitSet requiredColumns);
+ <RowT> Publisher<RowT> indexLookup(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ int indexId,
+ List<String> columns,
+ RowT key,
+ @Nullable BitSet requiredColumns
+ );
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index a68684eeec..b71796758a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -52,7 +52,7 @@ public class ScannableTableImpl implements ScannableTable {
/** {@inheritDoc} */
@Override
- public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, @Nullable BitSet requiredColumns) {
Publisher<BinaryRow> pub;
@@ -63,12 +63,12 @@ public class ScannableTableImpl implements ScannableTable {
assert readTime != null;
- pub = internalTable.scan(partWithTerm.partId(), readTime,
ctx.localNode());
+ pub = internalTable.scan(partWithConsistencyToken.partId(),
readTime, ctx.localNode());
} else {
- PrimaryReplica recipient = new PrimaryReplica(ctx.localNode(),
partWithTerm.term());
+ PrimaryReplica recipient = new PrimaryReplica(ctx.localNode(),
partWithConsistencyToken.enlistmentConsistencyToken());
pub = internalTable.scan(
- partWithTerm.partId(),
+ partWithConsistencyToken.partId(),
txAttributes.id(),
txAttributes.commitPartition(),
recipient,
@@ -89,7 +89,7 @@ public class ScannableTableImpl implements ScannableTable {
@Override
public <RowT> Publisher<RowT> indexRangeScan(
ExecutionContext<RowT> ctx,
- PartitionWithTerm partWithTerm,
+ PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
@@ -123,7 +123,7 @@ public class ScannableTableImpl implements ScannableTable {
assert readTime != null;
pub = internalTable.scan(
- partWithTerm.partId(),
+ partWithConsistencyToken.partId(),
readTime,
ctx.localNode(),
indexId,
@@ -134,10 +134,10 @@ public class ScannableTableImpl implements ScannableTable
{
);
} else {
pub = internalTable.scan(
- partWithTerm.partId(),
+ partWithConsistencyToken.partId(),
txAttributes.id(),
txAttributes.commitPartition(),
- new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
+ new PrimaryReplica(ctx.localNode(),
partWithConsistencyToken.enlistmentConsistencyToken()),
indexId,
lower,
upper,
@@ -155,7 +155,7 @@ public class ScannableTableImpl implements ScannableTable {
@Override
public <RowT> Publisher<RowT> indexLookup(
ExecutionContext<RowT> ctx,
- PartitionWithTerm partWithTerm,
+ PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
@@ -177,7 +177,7 @@ public class ScannableTableImpl implements ScannableTable {
assert readTime != null;
pub = internalTable.lookup(
- partWithTerm.partId(),
+ partWithConsistencyToken.partId(),
readTime,
ctx.localNode(),
indexId,
@@ -186,10 +186,10 @@ public class ScannableTableImpl implements ScannableTable
{
);
} else {
pub = internalTable.lookup(
- partWithTerm.partId(),
+ partWithConsistencyToken.partId(),
txAttributes.id(),
txAttributes.commitPartition(),
- new PrimaryReplica(ctx.localNode(), partWithTerm.term()),
+ new PrimaryReplica(ctx.localNode(),
partWithConsistencyToken.enlistmentConsistencyToken()),
indexId,
keyTuple,
null
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 178a1ae887..f1149c3684 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -119,7 +119,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows :
rowsByPartition.int2ObjectEntrySet()) {
TablePartitionId partGroupId = new TablePartitionId(tableId,
partToRows.getIntKey());
- NodeWithTerm nodeWithTerm =
colocationGroup.assignments().get(partToRows.getIntKey());
+ NodeWithConsistencyToken nodeWithConsistencyToken =
colocationGroup.assignments().get(partToRows.getIntKey());
ReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
@@ -127,13 +127,13 @@ public final class UpdatableTableImpl implements
UpdatableTable {
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
.binaryTuples(binaryRowsToBuffers(partToRows.getValue()))
.transactionId(txAttributes.id())
- .enlistmentConsistencyToken(nodeWithTerm.term())
+
.enlistmentConsistencyToken(nodeWithConsistencyToken.enlistmentConsistencyToken())
.requestType(RequestType.RW_UPSERT_ALL)
.timestampLong(clock.nowLong())
.skipDelayedAck(true)
.build();
- futures[batchNum++] = replicaService.invoke(nodeWithTerm.name(),
request);
+ futures[batchNum++] =
replicaService.invoke(nodeWithConsistencyToken.name(), request);
}
return CompletableFuture.allOf(futures);
@@ -191,7 +191,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
TablePartitionId partGroupId = new TablePartitionId(tableId,
partitionId);
- NodeWithTerm nodeWithTerm =
colocationGroup.assignments().get(partitionId);
+ NodeWithConsistencyToken nodeWithConsistencyToken =
colocationGroup.assignments().get(partitionId);
ReadWriteMultiRowReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
@@ -199,13 +199,13 @@ public final class UpdatableTableImpl implements
UpdatableTable {
.schemaVersion(rowBatch.requestedRows.get(0).schemaVersion())
.binaryTuples(binaryRowsToBuffers(rowBatch.requestedRows))
.transactionId(txAttributes.id())
- .enlistmentConsistencyToken(nodeWithTerm.term())
+
.enlistmentConsistencyToken(nodeWithConsistencyToken.enlistmentConsistencyToken())
.requestType(RequestType.RW_INSERT_ALL)
.timestampLong(clock.nowLong())
.skipDelayedAck(true)
.build();
- rowBatch.resultFuture = replicaService.invoke(nodeWithTerm.name(),
request);
+ rowBatch.resultFuture =
replicaService.invoke(nodeWithConsistencyToken.name(), request);
}
return handleInsertResults(ectx, rowBatchByPartitionId.values());
@@ -258,7 +258,7 @@ public final class UpdatableTableImpl implements
UpdatableTable {
for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows :
keyRowsByPartition.int2ObjectEntrySet()) {
TablePartitionId partGroupId = new TablePartitionId(tableId,
partToRows.getIntKey());
- NodeWithTerm nodeWithTerm =
colocationGroup.assignments().get(partToRows.getIntKey());
+ NodeWithConsistencyToken nodeWithConsistencyToken =
colocationGroup.assignments().get(partToRows.getIntKey());
ReplicaRequest request =
MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest()
.groupId(partGroupId)
@@ -266,13 +266,13 @@ public final class UpdatableTableImpl implements
UpdatableTable {
.schemaVersion(partToRows.getValue().get(0).schemaVersion())
.primaryKeys(serializePrimaryKeys(partToRows.getValue()))
.transactionId(txAttributes.id())
- .enlistmentConsistencyToken(nodeWithTerm.term())
+
.enlistmentConsistencyToken(nodeWithConsistencyToken.enlistmentConsistencyToken())
.requestType(RequestType.RW_DELETE_ALL)
.timestampLong(clock.nowLong())
.skipDelayedAck(true)
.build();
- futures[batchNum++] = replicaService.invoke(nodeWithTerm.name(),
request);
+ futures[batchNum++] =
replicaService.invoke(nodeWithConsistencyToken.name(), request);
}
return CompletableFuture.allOf(futures);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
index 82fab08b85..642965de54 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ColocationGroup.java
@@ -21,8 +21,8 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
/**
* A group of a sources which shares common set of nodes and assignments to be
executed.
@@ -38,10 +38,10 @@ public class ColocationGroup implements Serializable {
private final List<String> nodeNames;
- private final List<NodeWithTerm> assignments;
+ private final List<NodeWithConsistencyToken> assignments;
/** Constructor. */
- public ColocationGroup(List<Long> sourceIds, List<String> nodeNames,
List<NodeWithTerm> assignments) {
+ public ColocationGroup(List<Long> sourceIds, List<String> nodeNames,
List<NodeWithConsistencyToken> assignments) {
this.sourceIds = Objects.requireNonNull(sourceIds, "sourceIds");
this.nodeNames = Objects.requireNonNull(nodeNames, "nodeNames");
this.assignments = Objects.requireNonNull(assignments, "assignments");
@@ -65,27 +65,27 @@ public class ColocationGroup implements Serializable {
* Get list of partitions (index) and nodes (items) having an appropriate
partition in OWNING state, calculated for
* distributed tables, involved in query execution.
*/
- public List<NodeWithTerm> assignments() {
+ public List<NodeWithConsistencyToken> assignments() {
return assignments;
}
/**
- * Returns list of pairs containing the partition number to scan on the
given node with the corresponding primary replica term.
+ * Returns list of pairs containing the partition number to scan on the
given node with the corresponding enlistment consistency token.
*
* @param nodeName Cluster node consistent ID.
- * @return List of pairs containing the partition number to scan on the
given node with the corresponding primary replica term.
+ * @return List of pairs containing the partition number to scan on the
given node with the corresponding enlistment consistency token.
*/
- public List<PartitionWithTerm> partitionsWithTerms(String nodeName) {
- List<PartitionWithTerm> partsWithTerms = new ArrayList<>();
+ public List<PartitionWithConsistencyToken>
partitionsWithConsistencyTokens(String nodeName) {
+ List<PartitionWithConsistencyToken> partsWithConsistencyTokens = new
ArrayList<>();
for (int p = 0; p < assignments.size(); p++) {
- NodeWithTerm nodeWithTerm = assignments.get(p);
+ NodeWithConsistencyToken nodeWithConsistencyToken =
assignments.get(p);
- if (Objects.equals(nodeName, nodeWithTerm.name())) {
- partsWithTerms.add(new PartitionWithTerm(p,
nodeWithTerm.term()));
+ if (Objects.equals(nodeName, nodeWithConsistencyToken.name())) {
+ partsWithConsistencyTokens.add(new
PartitionWithConsistencyToken(p,
nodeWithConsistencyToken.enlistmentConsistencyToken()));
}
}
- return partsWithTerms;
+ return partsWithConsistencyTokens;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
index a10dd14078..9bcbd45e0f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionTargetFactory.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.sql.engine.exec.mapping;
import java.util.List;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
/**
* Factory to create execution target.
@@ -30,7 +30,7 @@ public interface ExecutionTargetFactory {
* @param nodes List of partitions.
* @return An execution target.
*/
- ExecutionTarget partitioned(List<NodeWithTerm> nodes);
+ ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes);
/**
* Creates target from list of required nodes.
@@ -81,5 +81,5 @@ public interface ExecutionTargetFactory {
* @param target A target to resolve assignments from.
* @return The list of assignments the target represents. Never null.
*/
- List<NodeWithTerm> resolveAssignments(ExecutionTarget target);
+ List<NodeWithConsistencyToken> resolveAssignments(ExecutionTarget target);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
index b551d263ff..f42e56ef25 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/FragmentMapper.java
@@ -29,7 +29,7 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
@@ -544,7 +544,7 @@ class FragmentMapper {
ExecutionTarget finalised = target.finalise();
List<String> nodes =
context.targetFactory().resolveNodes(finalised);
- List<NodeWithTerm> assignments =
context.targetFactory().resolveAssignments(finalised);
+ List<NodeWithConsistencyToken> assignments =
context.targetFactory().resolveAssignments(finalised);
return List.of(
new ColocationGroup(
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
index 538e837252..eb224b9c6f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/AbstractTarget.java
@@ -21,7 +21,7 @@ import static
org.apache.ignite.internal.util.IgniteUtils.isPow2;
import java.util.ArrayList;
import java.util.List;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ColocationMappingException;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
@@ -57,14 +57,14 @@ abstract class AbstractTarget implements ExecutionTarget {
return result;
}
- List<NodeWithTerm> assignments(List<String> nodeNames) {
+ List<NodeWithConsistencyToken> assignments(List<String> nodeNames) {
if (!(this instanceof PartitionedTarget)) {
return List.of();
}
PartitionedTarget partitionedTarget = (PartitionedTarget) this;
- List<NodeWithTerm> result = new
ArrayList<>(partitionedTarget.partitionsNodes.length);
+ List<NodeWithConsistencyToken> result = new
ArrayList<>(partitionedTarget.partitionsNodes.length);
for (int partNo = 0; partNo <
partitionedTarget.partitionsNodes.length; partNo++) {
long partitionNodes = partitionedTarget.partitionsNodes[partNo];
@@ -73,9 +73,9 @@ abstract class AbstractTarget implements ExecutionTarget {
int idx = Long.numberOfTrailingZeros(partitionNodes);
- result.add(new NodeWithTerm(
+ result.add(new NodeWithConsistencyToken(
nodeNames.get(idx),
- partitionedTarget.terms[partNo]
+ partitionedTarget.enlistmentConsistencyTokens[partNo]
));
}
@@ -172,7 +172,7 @@ abstract class AbstractTarget implements ExecutionTarget {
throw new ColocationMappingException("Targets are not
colocated");
}
- if (partitioned.terms[partNo] != otherPartitioned.terms[partNo]) {
+ if (partitioned.enlistmentConsistencyTokens[partNo] !=
otherPartitioned.enlistmentConsistencyTokens[partNo]) {
throw new ColocationMappingException("Partitioned targets have
different terms");
}
@@ -180,7 +180,7 @@ abstract class AbstractTarget implements ExecutionTarget {
finalised = finalised && isPow2(newNodes);
}
- return new PartitionedTarget(finalised, newPartitionsNodes,
partitioned.terms);
+ return new PartitionedTarget(finalised, newPartitionsNodes,
partitioned.enlistmentConsistencyTokens);
}
static ExecutionTarget colocate(PartitionedTarget partitioned,
SomeOfTarget someOf) throws ColocationMappingException {
@@ -197,7 +197,7 @@ abstract class AbstractTarget implements ExecutionTarget {
finalised = finalised && isPow2(newNodes);
}
- return new PartitionedTarget(finalised, newPartitionsNodes,
partitioned.terms);
+ return new PartitionedTarget(finalised, newPartitionsNodes,
partitioned.enlistmentConsistencyTokens);
}
static ExecutionTarget colocate(SomeOfTarget someOf, SomeOfTarget
otherSomeOf) throws ColocationMappingException {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
index 4d74b73612..aee148f04e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/PartitionedTarget.java
@@ -30,14 +30,14 @@ import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory
class PartitionedTarget extends AbstractTarget {
private final boolean finalised;
final long[] partitionsNodes;
- final long[] terms;
+ final long[] enlistmentConsistencyTokens;
- PartitionedTarget(boolean finalised, long[] partitionsNodes, long[] terms)
{
+ PartitionedTarget(boolean finalised, long[] partitionsNodes, long[]
enlistmentConsistencyTokens) {
super(computeNodes(partitionsNodes));
this.finalised = finalised;
this.partitionsNodes = partitionsNodes;
- this.terms = terms;
+ this.enlistmentConsistencyTokens = enlistmentConsistencyTokens;
}
@Override
@@ -57,7 +57,7 @@ class PartitionedTarget extends AbstractTarget {
newPartitionsNodes[partNo] = pickOne(partitionsNodes[partNo]);
}
- return new PartitionedTarget(true, newPartitionsNodes, terms);
+ return new PartitionedTarget(true, newPartitionsNodes,
enlistmentConsistencyTokens);
}
@Override
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
index 78dd39cec9..1a45ba61ac 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/smallcluster/SmallClusterFactory.java
@@ -20,7 +20,7 @@ package
org.apache.ignite.internal.sql.engine.exec.mapping.smallcluster;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
import java.util.List;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTarget;
import
org.apache.ignite.internal.sql.engine.exec.mapping.ExecutionTargetFactory;
@@ -63,17 +63,17 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
}
@Override
- public ExecutionTarget partitioned(List<NodeWithTerm> nodes) {
+ public ExecutionTarget partitioned(List<NodeWithConsistencyToken> nodes) {
long[] partitionNodes = new long[nodes.size()];
- long[] terms = new long[nodes.size()];
+ long[] enlistmentConsistencyTokens = new long[nodes.size()];
int idx = 0;
- for (NodeWithTerm e : nodes) {
+ for (NodeWithConsistencyToken e : nodes) {
partitionNodes[idx] = nodeNameToId.getOrDefault(e.name(), 0);
- terms[idx++] = e.term();
+ enlistmentConsistencyTokens[idx++] =
e.enlistmentConsistencyToken();
}
- return new PartitionedTarget(true, partitionNodes, terms);
+ return new PartitionedTarget(true, partitionNodes,
enlistmentConsistencyTokens);
}
@Override
@@ -86,7 +86,7 @@ public class SmallClusterFactory implements
ExecutionTargetFactory {
}
@Override
- public List<NodeWithTerm> resolveAssignments(ExecutionTarget target) {
+ public List<NodeWithConsistencyToken> resolveAssignments(ExecutionTarget
target) {
target = target.finalise();
assert target instanceof AbstractTarget : target == null ? "<null>" :
target.getClass().getCanonicalName();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index 297a4e8243..e85ac8da02 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -28,7 +28,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
@@ -52,8 +52,8 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
private final RowHandler.RowFactory<RowT> factory;
- /** List of pairs containing the partition number to scan with the
corresponding primary replica term. */
- private final Collection<PartitionWithTerm> partsWithTerms;
+ /** List of pairs containing the partition number to scan with the
corresponding enlistment consistency token. */
+ private final Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens;
/** Participating columns. */
private final @Nullable BitSet requiredColumns;
@@ -70,7 +70,8 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
* @param ctx Execution context.
* @param rowFactory Row factory.
* @param tableDescriptor Table descriptor.
- * @param partsWithTerms List of pairs containing the partition number to
scan with the corresponding primary replica term.
+ * @param partsWithConsistencyTokens List of pairs containing the
partition number to scan with the corresponding enlistment
+ * consistency token.
* @param comp Rows comparator.
* @param rangeConditions Range conditions.
* @param filters Optional filter to filter out rows.
@@ -83,7 +84,7 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
IgniteIndex schemaIndex,
ScannableTable table,
TableDescriptor tableDescriptor,
- Collection<PartitionWithTerm> partsWithTerms,
+ Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens,
@Nullable Comparator<RowT> comp,
@Nullable RangeIterable<RowT> rangeConditions,
@Nullable Predicate<RowT> filters,
@@ -92,11 +93,11 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
) {
super(ctx, filters, rowTransformer);
- assert partsWithTerms != null && !partsWithTerms.isEmpty();
+ assert partsWithConsistencyTokens != null &&
!partsWithConsistencyTokens.isEmpty();
this.schemaIndex = schemaIndex;
this.table = table;
- this.partsWithTerms = partsWithTerms;
+ this.partsWithConsistencyTokens = partsWithConsistencyTokens;
this.requiredColumns = requiredColumns;
this.rangeConditions = rangeConditions;
this.comp = comp;
@@ -114,16 +115,19 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
protected Publisher<RowT> scan() {
if (rangeConditions != null) {
return SubscriptionUtils.concat(
- new TransformingIterator<>(rangeConditions.iterator(),
cond -> indexPublisher(partsWithTerms, cond)));
+ new TransformingIterator<>(rangeConditions.iterator(),
cond -> indexPublisher(partsWithConsistencyTokens, cond)));
} else {
- return indexPublisher(partsWithTerms, null);
+ return indexPublisher(partsWithConsistencyTokens, null);
}
}
- private Publisher<RowT> indexPublisher(Collection<PartitionWithTerm>
partsWithTerms, @Nullable RangeCondition<RowT> cond) {
+ private Publisher<RowT> indexPublisher(
+ Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens,
+ @Nullable RangeCondition<RowT> cond
+ ) {
Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
- partsWithTerms.iterator(),
- partWithTerm -> partitionPublisher(partWithTerm, cond)
+ partsWithConsistencyTokens.iterator(),
+ partWithConsistencyToken ->
partitionPublisher(partWithConsistencyToken, cond)
);
if (comp != null) {
@@ -133,17 +137,20 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
}
}
- private Publisher<RowT> partitionPublisher(PartitionWithTerm partWithTerm,
@Nullable RangeCondition<RowT> cond) {
+ private Publisher<RowT> partitionPublisher(
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ @Nullable RangeCondition<RowT> cond
+ ) {
int indexId = schemaIndex.id();
ExecutionContext<RowT> ctx = context();
switch (schemaIndex.type()) {
case SORTED:
- return table.indexRangeScan(ctx, partWithTerm, factory,
indexId,
+ return table.indexRangeScan(ctx, partWithConsistencyToken,
factory, indexId,
columns, cond, requiredColumns);
case HASH:
- return table.indexLookup(ctx, partWithTerm, factory, indexId,
+ return table.indexLookup(ctx, partWithConsistencyToken,
factory, indexId,
columns, cond.lower(), requiredColumns);
default:
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index 64f654f482..bcfcb1e76f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -24,7 +24,7 @@ import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
@@ -40,8 +40,8 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
/** Table that provides access to underlying data. */
private final ScannableTable table;
- /** List of pairs containing the partition number to scan with the
corresponding primary replica term. */
- private final Collection<PartitionWithTerm> partsWithTerms;
+ /** List of pairs containing the partition number to scan with the
corresponding enlistment consistency token. */
+ private final Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens;
private final RowFactory<RowT> rowFactory;
@@ -53,7 +53,8 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
* @param ctx Execution context.
* @param rowFactory Row factory.
* @param table Internal table.
- * @param partsWithTerms List of pairs containing the partition number to
scan with the corresponding primary replica term.
+ * @param partsWithConsistencyTokens List of pairs containing the
partition number to scan with the corresponding enlistment
+ * consistency token.
* @param filters Optional filter to filter out rows.
* @param rowTransformer Optional projection function.
* @param requiredColumns Optional set of column of interest.
@@ -62,17 +63,17 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
ExecutionContext<RowT> ctx,
RowHandler.RowFactory<RowT> rowFactory,
ScannableTable table,
- Collection<PartitionWithTerm> partsWithTerms,
+ Collection<PartitionWithConsistencyToken>
partsWithConsistencyTokens,
@Nullable Predicate<RowT> filters,
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable BitSet requiredColumns
) {
super(ctx, filters, rowTransformer);
- assert partsWithTerms != null && !partsWithTerms.isEmpty();
+ assert partsWithConsistencyTokens != null &&
!partsWithConsistencyTokens.isEmpty();
this.table = table;
- this.partsWithTerms = partsWithTerms;
+ this.partsWithConsistencyTokens = partsWithConsistencyTokens;
this.rowFactory = rowFactory;
this.requiredColumns = requiredColumns;
}
@@ -81,8 +82,8 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
@Override
protected Publisher<RowT> scan() {
Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
- partsWithTerms.iterator(), partWithTerm -> {
- return table.scan(context(), partWithTerm, rowFactory,
requiredColumns);
+ partsWithConsistencyTokens.iterator(),
partWithConsistencyToken -> {
+ return table.scan(context(), partWithConsistencyToken, rowFactory,
requiredColumns);
});
return SubscriptionUtils.concat(it);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index 2c8b1eb2a1..a3a7d44384 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -38,7 +38,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
@@ -205,7 +205,7 @@ public class IndexScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
RowFactory<Object[]> rowFactory = ctx.rowHandler().factory(rowSchema);
SingleRangeIterable<Object[]> conditions = new
SingleRangeIterable<>(new Object[]{}, null, false, false);
- List<PartitionWithTerm> partitions = scannableTable.getPartitions();
+ List<PartitionWithConsistencyToken> partitions =
scannableTable.getPartitions();
return new IndexScanNode<>(ctx, rowFactory, indexDescriptor,
scannableTable, tableDescriptor, partitions,
comparator, conditions, null, null, null);
@@ -219,43 +219,47 @@ public class IndexScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
partitionedData.put(partitionId, List.of(rows));
}
- List<PartitionWithTerm> getPartitions() {
+ List<PartitionWithConsistencyToken> getPartitions() {
return new TreeSet<>(partitionedData.keySet())
.stream()
- .map(k -> new PartitionWithTerm(k, 2L))
+ .map(k -> new PartitionWithConsistencyToken(k, 2L))
.collect(Collectors.toList());
}
/** {@inheritDoc} */
@Override
- public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
- @Nullable BitSet requiredColumns) {
+ public <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ ) {
throw new UnsupportedOperationException("Not supported");
}
/** {@inheritDoc} */
@Override
- public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns,
@Nullable RangeCondition<RowT> cond, @Nullable BitSet
requiredColumns) {
- List<T> list = partitionedData.get(partWithTerm.partId());
+ List<T> list =
partitionedData.get(partWithConsistencyToken.partId());
return new ScanPublisher<>(list, ctx, rowFactory);
}
@Override
- public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String> columns,
RowT key, @Nullable BitSet requiredColumns) {
- return newPublisher(ctx, partWithTerm, rowFactory);
+ return newPublisher(ctx, partWithConsistencyToken, rowFactory);
}
- private <RowT> ScanPublisher<RowT> newPublisher(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ private <RowT> ScanPublisher<RowT> newPublisher(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithToken,
RowFactory<RowT> rowFactory) {
- int partId = partWithTerm.partId();
+ int partId = partWithToken.partId();
List<T> list = partitionedData.get(partId);
Objects.requireNonNull(list, "No data for partition " + partId);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index 8cb1f6a437..8007e40503 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -60,7 +60,7 @@ import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
@@ -118,9 +118,9 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester tester = new Tester(data);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
- ResultCollector collector = tester.tableScan(partitionId, term, tx);
+ ResultCollector collector = tester.tableScan(partitionId,
consistencyToken, tx);
if (tx.isReadOnly()) {
HybridTimestamp timestamp = tx.readTimestamp();
@@ -134,7 +134,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
partitionId,
tx.id(),
tx.commitPartition(),
- new PrimaryReplica(clusterNode, term),
+ new PrimaryReplica(clusterNode, consistencyToken),
null,
null,
null,
@@ -162,9 +162,9 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester tester = new Tester(input);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
- ResultCollector collector = tester.tableScan(partitionId, term, tx);
+ ResultCollector collector = tester.tableScan(partitionId,
consistencyToken, tx);
input.sendRows();
@@ -187,7 +187,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester tester = new Tester(input);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
Object[] lowerValue = lower == Bound.NONE ? null : new Object[]{1};
Object[] upperValue = upper == Bound.NONE ? null : new Object[]{10};
@@ -197,7 +197,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
int flags = condition.toFlags();
- ResultCollector collector = tester.indexScan(partitionId, term, tx,
indexId, condition);
+ ResultCollector collector = tester.indexScan(partitionId,
consistencyToken, tx, indexId, condition);
if (tx.isReadOnly()) {
HybridTimestamp timestamp = tx.readTimestamp();
@@ -214,7 +214,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
isNull()
);
} else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), term);
+ PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
verify(internalTable).scan(
eq(partitionId),
@@ -263,14 +263,14 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
tester.requiredFields.set(1);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
TestRangeCondition<Object[]> condition = new TestRangeCondition<>();
// Set any valid bounds, they are not of our interest here.
condition.setLower(Bound.INCLUSIVE, new Object[]{0});
- ResultCollector collector = tester.indexScan(partitionId, term, tx,
indexId, condition);
+ ResultCollector collector = tester.indexScan(partitionId,
consistencyToken, tx, indexId, condition);
if (tx.isReadOnly()) {
HybridTimestamp timestamp = tx.readTimestamp();
@@ -287,7 +287,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
eq(tester.requiredFields)
);
} else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), term);
+ PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
verify(internalTable).scan(
eq(partitionId),
@@ -323,13 +323,13 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
tester.requiredFields.set(1);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
TestRangeCondition<Object[]> condition = new TestRangeCondition<>();
// Set any valid bounds, they are not of our interest here.
condition.setLower(Bound.INCLUSIVE, new Object[]{0});
- ResultCollector collector = tester.indexScan(partitionId, term, tx,
indexId, condition);
+ ResultCollector collector = tester.indexScan(partitionId,
consistencyToken, tx, indexId, condition);
input.sendRows();
@@ -352,13 +352,14 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester tester = new Tester(input);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
TestRangeCondition<Object[]> condition = new TestRangeCondition<>();
// Bound columns != input columns.
condition.setLower(Bound.INCLUSIVE, new Object[]{1, 2});
- AssertionError err = assertThrows(AssertionError.class, () ->
tester.indexScan(partitionId, term, tx, indexId, condition));
+ AssertionError err = assertThrows(AssertionError.class,
+ () -> tester.indexScan(partitionId, consistencyToken, tx,
indexId, condition));
assertEquals("Invalid range condition", err.getMessage());
verifyNoInteractions(internalTable);
@@ -381,14 +382,14 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester tester = new Tester(input);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
TestRangeCondition<Object[]> condition = new TestRangeCondition<>();
condition.setLower(Bound.INCLUSIVE, new Object[]{1, 2});
ArgumentCaptor<BinaryTuplePrefix> prefix =
ArgumentCaptor.forClass(BinaryTuplePrefix.class);
- ResultCollector collector = tester.indexScan(partitionId, term, tx,
indexId, condition);
+ ResultCollector collector = tester.indexScan(partitionId,
consistencyToken, tx, indexId, condition);
if (tx.isReadOnly()) {
HybridTimestamp timestamp = tx.readTimestamp();
@@ -405,7 +406,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
eq(tester.requiredFields)
);
} else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), term);
+ PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
verify(internalTable).scan(
eq(partitionId),
@@ -441,11 +442,11 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester tester = new Tester(input);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
Object[] key = {1};
- ResultCollector collector = tester.indexLookUp(partitionId, term, tx,
indexId, key);
+ ResultCollector collector = tester.indexLookUp(partitionId,
consistencyToken, tx, indexId, key);
if (tx.isReadOnly()) {
verify(internalTable).lookup(
@@ -457,7 +458,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
isNull()
);
} else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), term);
+ PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
verify(internalTable).lookup(
eq(partitionId),
@@ -491,11 +492,11 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
tester.requiredFields.set(1);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
Object[] key = {1};
- ResultCollector collector = tester.indexLookUp(partitionId, term, tx,
indexId, key);
+ ResultCollector collector = tester.indexLookUp(partitionId,
consistencyToken, tx, indexId, key);
if (tx.isReadOnly()) {
verify(internalTable).lookup(
@@ -507,7 +508,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
eq(null)
);
} else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), term);
+ PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
verify(internalTable).lookup(
eq(partitionId),
@@ -539,11 +540,11 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
Tester tester = new Tester(input);
int partitionId = 1;
- long term = 2;
+ long consistencyToken = 2;
int indexId = 3;
Object[] key = {1};
- ResultCollector collector = tester.indexLookUp(partitionId, term, tx,
indexId, key);
+ ResultCollector collector = tester.indexLookUp(partitionId,
consistencyToken, tx, indexId, key);
input.sendRows();
@@ -577,7 +578,7 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
scannableTable = new ScannableTableImpl(internalTable, rf ->
rowConverter);
}
- ResultCollector tableScan(int partitionId, long term, NoOpTransaction
tx) {
+ ResultCollector tableScan(int partitionId, long consistencyToken,
NoOpTransaction tx) {
when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
when(ctx.localNode()).thenReturn(tx.clusterNode());
@@ -601,13 +602,21 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
RowFactory<Object[]> rowFactory =
rowHandler.factory(input.rowSchema);
- Publisher<Object[]> publisher = scannableTable.scan(ctx, new
PartitionWithTerm(partitionId, term), rowFactory, null);
+ Publisher<Object[]> publisher = scannableTable.scan(
+ ctx,
+ new PartitionWithConsistencyToken(partitionId,
consistencyToken), rowFactory, null
+ );
return new ResultCollector(publisher, rowConverter);
}
- ResultCollector indexScan(int partitionId, long term, NoOpTransaction
tx,
- int indexId, TestRangeCondition<Object[]> condition) {
+ ResultCollector indexScan(
+ int partitionId,
+ long consistencyToken,
+ NoOpTransaction tx,
+ int indexId,
+ TestRangeCondition<Object[]> condition
+ ) {
when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
when(ctx.localNode()).thenReturn(tx.clusterNode());
@@ -640,13 +649,20 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
RangeCondition<Object[]> rangeCondition =
condition.asRangeCondition();
List<String> indexColumns = input.getIndexColumns();
- Publisher<Object[]> publisher = scannableTable.indexRangeScan(ctx,
new PartitionWithTerm(partitionId, term), rowFactory,
- indexId, indexColumns, rangeCondition, requiredFields);
+ Publisher<Object[]> publisher = scannableTable.indexRangeScan(
+ ctx,
+ new PartitionWithConsistencyToken(partitionId,
consistencyToken),
+ rowFactory,
+ indexId,
+ indexColumns,
+ rangeCondition,
+ requiredFields
+ );
return new ResultCollector(publisher, rowConverter);
}
- ResultCollector indexLookUp(int partitionId, long term,
NoOpTransaction tx,
+ ResultCollector indexLookUp(int partitionId, long consistencyToken,
NoOpTransaction tx,
int indexId, Object[] key) {
when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
@@ -675,8 +691,15 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
RowFactory<Object[]> rowFactory =
rowHandler.factory(input.rowSchema);
List<String> indexColumns = input.getIndexColumns();
- Publisher<Object[]> publisher = scannableTable.indexLookup(ctx,
new PartitionWithTerm(partitionId, term), rowFactory,
- indexId, indexColumns, key, requiredFields);
+ Publisher<Object[]> publisher = scannableTable.indexLookup(
+ ctx,
+ new PartitionWithConsistencyToken(partitionId,
consistencyToken),
+ rowFactory,
+ indexId,
+ indexColumns,
+ key,
+ requiredFields
+ );
return new ResultCollector(publisher, rowConverter);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 3f235fe943..8a23e5d86c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -50,7 +50,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl;
@@ -106,8 +106,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
int inBufSize = Commons.IN_BUFFER_SIZE;
- List<PartitionWithTerm> partsWithTerms = IntStream.range(0,
TestInternalTableImpl.PART_CNT)
- .mapToObj(p -> new PartitionWithTerm(p, -1L))
+ List<PartitionWithConsistencyToken> partsWithConsistencyTokens =
IntStream.range(0, TestInternalTableImpl.PART_CNT)
+ .mapToObj(p -> new PartitionWithConsistencyToken(p, -1L))
.collect(Collectors.toList());
int probingCnt = 50;
@@ -171,7 +171,7 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
};
ScannableTableImpl scanableTable = new
ScannableTableImpl(internalTable, rf -> rowConverter);
TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, scanableTable,
- partsWithTerms, null, null, null);
+ partsWithConsistencyTokens, null, null, null);
RootNode<Object[]> root = new RootNode<>(ctx);
@@ -185,7 +185,7 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
}
internalTable.scanComplete.await();
- assertEquals(sizes[i++] * partsWithTerms.size(), cnt);
+ assertEquals(sizes[i++] * partsWithConsistencyTokens.size(), cnt);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index e6216280e8..98e442e3f8 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -131,7 +131,7 @@ public final class NoOpTransaction implements
InternalTransaction {
}
@Override
- public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndTerm(TablePartitionId tablePartitionId) {
+ public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) {
return tuple;
}
@@ -152,8 +152,8 @@ public final class NoOpTransaction implements
InternalTransaction {
@Override
public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId
tablePartitionId,
- IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
- return nodeAndTerm;
+ IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken) {
+ return nodeAndConsistencyToken;
}
/** Returns a {@link CompletableFuture} that completes when this
transaction commits. */
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 3174ddbe34..4cca418348 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -62,8 +62,8 @@ import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTable;
import org.apache.ignite.internal.sql.engine.exec.ExecutableTableRegistry;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
@@ -143,8 +143,12 @@ public class TestBuilders {
public static ScannableTable tableScan(DataProvider<Object[]>
dataProvider) {
return new ScannableTable() {
@Override
- public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
- @Nullable BitSet requiredColumns) {
+ public <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ ) {
return new TransformingPublisher<>(
SubscriptionUtils.fromIterable(
@@ -158,14 +162,14 @@ public class TestBuilders {
}
@Override
- public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken
partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@Override
- public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@@ -179,13 +183,17 @@ public class TestBuilders {
public static ScannableTable indexRangeScan(DataProvider<Object[]>
dataProvider) {
return new ScannableTable() {
@Override
- public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
- @Nullable BitSet requiredColumns) {
+ public <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ ) {
throw new UnsupportedOperationException();
}
@Override
- public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken
partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns) {
return new TransformingPublisher<>(
@@ -200,7 +208,7 @@ public class TestBuilders {
}
@Override
- public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@@ -214,20 +222,24 @@ public class TestBuilders {
public static ScannableTable indexLookup(DataProvider<Object[]>
dataProvider) {
return new ScannableTable() {
@Override
- public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
- @Nullable BitSet requiredColumns) {
+ public <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ ) {
throw new UnsupportedOperationException();
}
@Override
- public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT>
indexRangeScan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken
partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns) {
throw new UnsupportedOperationException();
}
@Override
- public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
return new TransformingPublisher<>(
SubscriptionUtils.fromIterable(
@@ -1343,7 +1355,7 @@ public class TestBuilders {
}
ExecutionTarget target = factory.partitioned(owningNodes.stream()
- .map(name -> new NodeWithTerm(name, 1))
+ .map(name -> new NodeWithConsistencyToken(name, 1))
.collect(Collectors.toList()));
return CompletableFuture.completedFuture(target);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
index cd264f0609..403bbe4573 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterTest.java
@@ -31,7 +31,7 @@ import java.util.UUID;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
-import org.apache.ignite.internal.sql.engine.exec.PartitionWithTerm;
+import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
@@ -59,8 +59,12 @@ public class TestClusterTest extends BaseIgniteAbstractTest {
private final ScannableTable table = new ScannableTable() {
@Override
- public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm, RowFactory<RowT> rowFactory,
- @Nullable BitSet requiredColumns) {
+ public <RowT> Publisher<RowT> scan(
+ ExecutionContext<RowT> ctx,
+ PartitionWithConsistencyToken partWithConsistencyToken,
+ RowFactory<RowT> rowFactory,
+ @Nullable BitSet requiredColumns
+ ) {
return new TransformingPublisher<>(
SubscriptionUtils.fromIterable(
@@ -72,7 +76,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest {
}
@Override
- public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT>
ctx, PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> indexRangeScan(ExecutionContext<RowT>
ctx, PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, @Nullable RangeCondition<RowT> cond,
@Nullable BitSet requiredColumns) {
@@ -86,7 +90,7 @@ public class TestClusterTest extends BaseIgniteAbstractTest {
}
@Override
- public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx,
PartitionWithTerm partWithTerm,
+ public <RowT> Publisher<RowT> indexLookup(ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int indexId, List<String>
columns, RowT key, @Nullable BitSet requiredColumns) {
return new TransformingPublisher<>(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
index 4413b92b57..45754bb944 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
@@ -38,7 +38,7 @@ import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.util.Util;
-import org.apache.ignite.internal.sql.engine.exec.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.exec.NodeWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
@@ -65,7 +65,7 @@ import org.junit.jupiter.api.Test;
public class PlannerTest extends AbstractPlannerTest {
private static List<String> NODES;
- private static List<NodeWithTerm> NODES_WITH_TERM;
+ private static List<NodeWithConsistencyToken> NODES_WITH_CONSISTENCY_TOKEN;
/**
* Init.
@@ -74,13 +74,13 @@ public class PlannerTest extends AbstractPlannerTest {
@BeforeAll
public static void init() {
NODES = new ArrayList<>(4);
- NODES_WITH_TERM = new ArrayList<>(4);
+ NODES_WITH_CONSISTENCY_TOKEN = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
String nodeName = Integer.toString(nextTableId());
NODES.add(nodeName);
- NODES_WITH_TERM.add(new NodeWithTerm(nodeName, 0L));
+ NODES_WITH_CONSISTENCY_TOKEN.add(new
NodeWithConsistencyToken(nodeName, 0L));
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index d355a632cf..1acf55c319 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -43,8 +43,10 @@ public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableSca
return internalTbl.scan(part, null);
}
- IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(internalTbl.tableId(), part));
- PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+ IgniteBiTuple<ClusterNode, Long> leaderWithConsistencyToken =
+ tx.enlistedNodeAndConsistencyToken(new
TablePartitionId(internalTbl.tableId(), part));
+
+ PrimaryReplica recipient = new
PrimaryReplica(leaderWithConsistencyToken.get1(),
leaderWithConsistencyToken.get2());
return new RollbackTxOnErrorPublisher<>(
tx,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ec49200636..c1b3bbcafe 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -3398,7 +3398,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
long currentEnlistmentConsistencyToken =
primaryReplicaMeta.getStartTime().longValue();
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-20377
if (enlistmentConsistencyToken !=
currentEnlistmentConsistencyToken
||
primaryReplicaMeta.getExpirationTime().before(now)
||
!isLocalPeer(primaryReplicaMeta.getLeaseholderId())
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index f1a95e24f8..9a0a6f0c35 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -284,18 +284,31 @@ public class InternalTableImpl implements InternalTable {
TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
- IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
actualTx.enlistedNodeAndTerm(partGroupId);
+ IgniteBiTuple<ClusterNode, Long> primaryReplicaAndConsistencyToken =
actualTx.enlistedNodeAndConsistencyToken(partGroupId);
CompletableFuture<R> fut;
- if (primaryReplicaAndTerm != null) {
+ if (primaryReplicaAndConsistencyToken != null) {
assert !implicit;
- fut = trackingInvoke(actualTx, partId, term -> fac.apply(actualTx,
partGroupId, term), false, primaryReplicaAndTerm,
- noWriteChecker, retryOnLockConflict);
+ fut = trackingInvoke(
+ actualTx,
+ partId,
+ enlistmentConsistencyToken -> fac.apply(actualTx,
partGroupId, enlistmentConsistencyToken),
+ false,
+ primaryReplicaAndConsistencyToken,
+ noWriteChecker,
+ retryOnLockConflict
+ );
} else {
- fut = enlistWithRetry(actualTx, partId, term ->
fac.apply(actualTx, partGroupId, term), implicit, noWriteChecker,
- retryOnLockConflict);
+ fut = enlistWithRetry(
+ actualTx,
+ partId,
+ enlistmentConsistencyToken -> fac.apply(actualTx,
partGroupId, enlistmentConsistencyToken),
+ implicit,
+ noWriteChecker,
+ retryOnLockConflict
+ );
}
return postEnlist(fut, false, actualTx, implicit);
@@ -348,20 +361,29 @@ public class InternalTableImpl implements InternalTable {
TablePartitionId partGroupId = new TablePartitionId(tableId,
partitionId);
- IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
actualTx.enlistedNodeAndTerm(partGroupId);
+ IgniteBiTuple<ClusterNode, Long> primaryReplicaAndConsistencyToken
= actualTx.enlistedNodeAndConsistencyToken(partGroupId);
CompletableFuture<T> fut;
- if (primaryReplicaAndTerm != null) {
+ if (primaryReplicaAndConsistencyToken != null) {
assert !implicit;
- fut = trackingInvoke(actualTx, partitionId, term ->
fac.apply(rowBatch.requestedRows, actualTx, partGroupId, term, false),
- false, primaryReplicaAndTerm, noOpChecker,
retryOnLockConflict);
+ fut = trackingInvoke(
+ actualTx,
+ partitionId,
+ enlistmentConsistencyToken ->
+ fac.apply(rowBatch.requestedRows, actualTx,
partGroupId, enlistmentConsistencyToken, false),
+ false,
+ primaryReplicaAndConsistencyToken,
+ noOpChecker,
+ retryOnLockConflict
+ );
} else {
fut = enlistWithRetry(
actualTx,
partitionId,
- term -> fac.apply(rowBatch.requestedRows, actualTx,
partGroupId, term, full),
+ enlistmentConsistencyToken ->
+ fac.apply(rowBatch.requestedRows, actualTx,
partGroupId, enlistmentConsistencyToken, full),
full,
noOpChecker,
retryOnLockConflict
@@ -410,29 +432,30 @@ public class InternalTableImpl implements InternalTable {
) {
TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
- IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm =
tx.enlistedNodeAndTerm(partGroupId);
+ IgniteBiTuple<ClusterNode, Long> primaryReplicaAndConsistencyToken =
tx.enlistedNodeAndConsistencyToken(partGroupId);
CompletableFuture<Collection<BinaryRow>> fut;
- Function<Long, ReplicaRequest> mapFunc = (term) ->
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
- .groupId(partGroupId)
- .timestampLong(clock.nowLong())
- .transactionId(tx.id())
- .scanId(scanId)
- .indexToUse(indexId)
- .exactKey(binaryTupleMessage(exactKey))
- .lowerBoundPrefix(binaryTupleMessage(lowerBound))
- .upperBoundPrefix(binaryTupleMessage(upperBound))
- .flags(flags)
- .columnsToInclude(columnsToInclude)
- .full(implicit) // Intent for one phase commit.
- .batchSize(batchSize)
- .enlistmentConsistencyToken(term)
-
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
- .build();
-
- if (primaryReplicaAndTerm != null) {
- fut = replicaSvc.invoke(primaryReplicaAndTerm.get1(),
mapFunc.apply(primaryReplicaAndTerm.get2()));
+ Function<Long, ReplicaRequest> mapFunc =
+ (enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
+ .groupId(partGroupId)
+ .timestampLong(clock.nowLong())
+ .transactionId(tx.id())
+ .scanId(scanId)
+ .indexToUse(indexId)
+ .exactKey(binaryTupleMessage(exactKey))
+ .lowerBoundPrefix(binaryTupleMessage(lowerBound))
+ .upperBoundPrefix(binaryTupleMessage(upperBound))
+ .flags(flags)
+ .columnsToInclude(columnsToInclude)
+ .full(implicit) // Intent for one phase commit.
+ .batchSize(batchSize)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
+
.commitPartitionId(serializeTablePartitionId(tx.commitPartition()))
+ .build();
+
+ if (primaryReplicaAndConsistencyToken != null) {
+ fut = replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(),
mapFunc.apply(primaryReplicaAndConsistencyToken.get2()));
} else {
fut = enlistWithRetry(tx, partId, mapFunc, false, null, false);
}
@@ -456,7 +479,7 @@ public class InternalTableImpl implements InternalTable {
*
* @param tx Internal transaction.
* @param partId Partition number.
- * @param mapFunc Function to create replica request with new raft term.
+ * @param mapFunc Function to create replica request with new enlistment
consistency token.
* @param full {@code True} if is a full transaction.
* @param noWriteChecker Used to handle operations producing no updates.
* @param retryOnLockConflict {@code True} to retry on lock conflict.
@@ -471,8 +494,8 @@ public class InternalTableImpl implements InternalTable {
boolean retryOnLockConflict
) {
return (CompletableFuture<R>) enlist(partId, tx)
- .thenCompose(primaryReplicaAndTerm -> trackingInvoke(tx,
partId, mapFunc, full, primaryReplicaAndTerm, noWriteChecker,
- retryOnLockConflict))
+ .thenCompose(primaryReplicaAndConsistencyToken ->
+ trackingInvoke(tx, partId, mapFunc, full,
primaryReplicaAndConsistencyToken, noWriteChecker, retryOnLockConflict))
.handle((res0, e) -> {
if (e != null) {
// We can safely retry indefinitely on deadlock
prevention.
@@ -494,7 +517,7 @@ public class InternalTableImpl implements InternalTable {
* @param partId Partition id.
* @param mapFunc Request factory.
* @param full {@code True} for a full transaction.
- * @param primaryReplicaAndTerm Replica and term.
+ * @param primaryReplicaAndConsistencyToken Replica and enlistment
consistency token.
* @param noWriteChecker Used to handle operations producing no updates.
* @param retryOnLockConflict {@code True} to retry on lock conflics.
* @return The future.
@@ -504,11 +527,11 @@ public class InternalTableImpl implements InternalTable {
int partId,
Function<Long, ReplicaRequest> mapFunc,
boolean full,
- IgniteBiTuple<ClusterNode, Long> primaryReplicaAndTerm,
+ IgniteBiTuple<ClusterNode, Long> primaryReplicaAndConsistencyToken,
@Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
boolean retryOnLockConflict
) {
- ReplicaRequest request = mapFunc.apply(primaryReplicaAndTerm.get2());
+ ReplicaRequest request =
mapFunc.apply(primaryReplicaAndConsistencyToken.get2());
boolean write = request instanceof SingleRowReplicaRequest &&
((SingleRowReplicaRequest) request).requestType() != RW_GET
|| request instanceof MultipleRowReplicaRequest &&
((MultipleRowReplicaRequest) request).requestType() != RW_GET_ALL
@@ -529,7 +552,7 @@ public class InternalTableImpl implements InternalTable {
)));
}
- return replicaSvc.<R>invoke(primaryReplicaAndTerm.get1(),
request).thenApply(res -> {
+ return
replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(),
request).thenApply(res -> {
assert noWriteChecker != null;
// Remove inflight if no replication was scheduled, otherwise
inflight will be removed by delayed response.
@@ -547,7 +570,7 @@ public class InternalTableImpl implements InternalTable {
return null; // Unreachable.
});
} else {
- return replicaSvc.invoke(primaryReplicaAndTerm.get1(), request);
+ return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(),
request);
}
}
@@ -751,13 +774,13 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRow,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
.groupId(groupId)
.schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RW_GET)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -838,7 +861,8 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
keyRows,
tx,
- (keyRows0, txo, groupId, term, full) ->
readWriteMultiRowPkReplicaRequest(RW_GET_ALL, keyRows0, txo, groupId, term,
full),
+ (keyRows0, txo, groupId, enlistmentConsistencyToken, full) ->
+ readWriteMultiRowPkReplicaRequest(RW_GET_ALL,
keyRows0, txo, groupId, enlistmentConsistencyToken, full),
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder,
(res, req) -> false,
false
@@ -876,7 +900,7 @@ public class InternalTableImpl implements InternalTable {
Collection<? extends BinaryRow> rows,
InternalTransaction tx,
ReplicationGroupId groupId,
- Long term,
+ Long enlistmentConsistencyToken,
boolean full
) {
assert allSchemaVersionsSame(rows) : "Different schema versions
encountered: " + uniqueSchemaVersions(rows);
@@ -887,7 +911,7 @@ public class InternalTableImpl implements InternalTable {
.schemaVersion(rows.iterator().next().schemaVersion())
.primaryKeys(serializeBinaryTuples(rows))
.transactionId(tx.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(requestType)
.timestampLong(clock.nowLong())
.full(full)
@@ -947,13 +971,13 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
row,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_UPSERT)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -985,7 +1009,7 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<Void> fut = enlistWithRetry(
tx,
partition,
- term -> upsertAllInternal(rows, tx, partGroupId, term, true),
+ enlistmentConsistencyToken -> upsertAllInternal(rows, tx,
partGroupId, enlistmentConsistencyToken, true),
true,
null,
true // Allow auto retries for data streamer.
@@ -1000,13 +1024,13 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
row,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_GET_AND_UPSERT)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1022,13 +1046,13 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
row,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_INSERT)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1044,9 +1068,8 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term, full) -> {
- return
readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, keyRows0, txo,
groupId, term, full);
- },
+ (keyRows, txo, groupId, enlistmentConsistencyToken, full) ->
+
readWriteMultiRowReplicaRequest(RequestType.RW_INSERT_ALL, keyRows, txo,
groupId, enlistmentConsistencyToken, full),
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
(res, req) -> {
for (BinaryRow row : res) {
@@ -1067,7 +1090,7 @@ public class InternalTableImpl implements InternalTable {
Collection<? extends BinaryRow> rows,
InternalTransaction tx,
ReplicationGroupId groupId,
- Long term,
+ Long enlistmentConsistencyToken,
boolean full
) {
assert allSchemaVersionsSame(rows) : "Different schema versions
encountered: " + uniqueSchemaVersions(rows);
@@ -1078,7 +1101,7 @@ public class InternalTableImpl implements InternalTable {
.schemaVersion(rows.iterator().next().schemaVersion())
.binaryTuples(serializeBinaryTuples(rows))
.transactionId(tx.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(requestType)
.timestampLong(clock.nowLong())
.full(full)
@@ -1091,13 +1114,13 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
row,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_REPLACE_IF_EXIST)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1116,14 +1139,14 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
newRow,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSwapRowReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSwapRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(oldRow.schemaVersion())
.oldBinaryTuple(oldRow.tupleSlice())
.newBinaryTuple(newRow.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_REPLACE)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1139,13 +1162,13 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
row,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.binaryTuple(row.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_GET_AND_REPLACE)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1161,13 +1184,13 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
keyRow,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(keyRow.schemaVersion())
.primaryKey(keyRow.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_DELETE)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1183,13 +1206,13 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
oldRow,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(oldRow.schemaVersion())
.binaryTuple(oldRow.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_DELETE_EXACT)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1205,13 +1228,13 @@ public class InternalTableImpl implements InternalTable
{
return enlistInTx(
row,
tx,
- (txo, groupId, term) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
+ (txo, groupId, enlistmentConsistencyToken) ->
tableMessagesFactory.readWriteSingleRowPkReplicaRequest()
.groupId(groupId)
.commitPartitionId(serializeTablePartitionId(txo.commitPartition()))
.schemaVersion(row.schemaVersion())
.primaryKey(row.tupleSlice())
.transactionId(txo.id())
- .enlistmentConsistencyToken(term)
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
.requestType(RequestType.RW_GET_AND_DELETE)
.timestampLong(clock.nowLong())
.full(tx == null)
@@ -1227,9 +1250,8 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term, full) -> {
- return readWriteMultiRowPkReplicaRequest(RW_DELETE_ALL,
keyRows0, txo, groupId, term, full);
- },
+ (keyRows0, txo, groupId, enlistmentConsistencyToken, full) ->
+ readWriteMultiRowPkReplicaRequest(RW_DELETE_ALL,
keyRows0, txo, groupId, enlistmentConsistencyToken, full),
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
(res, req) -> {
for (BinaryRow row : res) {
@@ -1254,9 +1276,15 @@ public class InternalTableImpl implements InternalTable {
return enlistInTx(
rows,
tx,
- (keyRows0, txo, groupId, term, full) -> {
- return
readWriteMultiRowReplicaRequest(RequestType.RW_DELETE_EXACT_ALL, keyRows0, txo,
groupId, term, full);
- },
+ (keyRows0, txo, groupId, enlistmentConsistencyToken, full) ->
+ readWriteMultiRowReplicaRequest(
+ RequestType.RW_DELETE_EXACT_ALL,
+ keyRows0,
+ txo,
+ groupId,
+ enlistmentConsistencyToken,
+ full
+ ),
InternalTableImpl::collectRejectedRowsResponsesWithRestoreOrder,
(res, req) -> {
for (BinaryRow row : res) {
@@ -1451,7 +1479,7 @@ public class InternalTableImpl implements InternalTable {
.flags(flags)
.columnsToInclude(columnsToInclude)
.batchSize(batchSize)
- .enlistmentConsistencyToken(recipient.term())
+
.enlistmentConsistencyToken(recipient.enlistmentConsistencyToken())
.full(false) // Set explicitly.
.commitPartitionId(serializeTablePartitionId(commitPartition))
.build();
@@ -2002,11 +2030,11 @@ public class InternalTableImpl implements InternalTable
{
Collection<? extends BinaryRow> keyRows0,
InternalTransaction txo,
ReplicationGroupId groupId,
- Long term,
+ Long enlistmentConsistencyToken,
boolean full
) {
assert serializeTablePartitionId(txo.commitPartition()) != null;
- return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL,
keyRows0, txo, groupId, term, full);
+ return readWriteMultiRowReplicaRequest(RequestType.RW_UPSERT_ALL,
keyRows0, txo, groupId, enlistmentConsistencyToken, full);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
index 73969b9213..3a9a1527f7 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
@@ -22,24 +22,24 @@ import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.network.ClusterNode;
/**
- * Tuple representing primary replica node with current term.
+ * Tuple representing primary replica node with its enlistment consistency
token.
*/
public class PrimaryReplica {
/** Primary replica node. */
private final ClusterNode node;
- /** Replica term. */
- private final long term;
+ /** Enlistment consistency token. */
+ private final long enlistmentConsistencyToken;
/**
* Constructor.
*
* @param node Primary replica node.
- * @param term Replica term.
+ * @param enlistmentConsistencyToken Enlistment consistency token.
*/
- public PrimaryReplica(ClusterNode node, long term) {
+ public PrimaryReplica(ClusterNode node, long enlistmentConsistencyToken) {
this.node = node;
- this.term = term;
+ this.enlistmentConsistencyToken = enlistmentConsistencyToken;
}
/**
@@ -52,12 +52,12 @@ public class PrimaryReplica {
}
/**
- * Gets replica term.
+ * Gets enlistment consistency token.
*
- * @return Replica term.
+ * @return Enlistment consistency token.
*/
- public long term() {
- return term;
+ public long enlistmentConsistencyToken() {
+ return enlistmentConsistencyToken;
}
/** {@inheritDoc} */
@@ -70,13 +70,13 @@ public class PrimaryReplica {
return false;
}
PrimaryReplica that = (PrimaryReplica) o;
- return term == that.term && Objects.equals(node, that.node);
+ return enlistmentConsistencyToken == that.enlistmentConsistencyToken
&& Objects.equals(node, that.node);
}
/** {@inheritDoc} */
@Override
public int hashCode() {
- return Objects.hash(node, term);
+ return Objects.hash(node, enlistmentConsistencyToken);
}
/** {@inheritDoc} */
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index def619b54b..f4faee3397 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -41,9 +41,9 @@ public interface InternalTransaction extends Transaction {
* Returns enlisted primary replica node associated with given replication
group.
*
* @param tablePartitionId Table partition id.
- * @return Enlisted primary replica node and raft term associated with
given replication group.
+ * @return Enlisted primary replica node and consistency token associated
with given replication group.
*/
- IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(TablePartitionId
tablePartitionId);
+ IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId);
/**
* Returns a transaction state.
@@ -71,10 +71,10 @@ public interface InternalTransaction extends Transaction {
* Enlists a partition group into a transaction.
*
* @param tablePartitionId Table partition id to enlist.
- * @param nodeAndTerm Primary replica cluster node and raft term to enlist
for given replication group.
+ * @param nodeAndConsistencyToken Primary replica cluster node and
consistency token to enlist for given replication group.
* @return {@code True} if a partition is enlisted into the transaction.
*/
- IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId tablePartitionId,
IgniteBiTuple<ClusterNode, Long> nodeAndTerm);
+ IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId tablePartitionId,
IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken);
/**
* Returns read timestamp for the given transaction if it is a read-only
one or {code null} otherwise.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 3e0e6c4dbb..ce675e5688 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -79,13 +79,16 @@ class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
}
@Override
- public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+ public IgniteBiTuple<ClusterNode, Long> enlist(
+ TablePartitionId tablePartitionId,
+ IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken
+ ) {
// TODO: IGNITE-17666 Close cursor tx finish and do it on the first
finish invocation only.
return null;
}
@Override
- public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndTerm(TablePartitionId tablePartitionId) {
+ public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndConsistencyToken(TablePartitionId tablePartitionId) {
return null;
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index b585cf2f5e..0da8b17b4f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -47,7 +47,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
private static final AtomicReferenceFieldUpdater<ReadWriteTransactionImpl,
TablePartitionId> COMMIT_PART_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(ReadWriteTransactionImpl.class,
TablePartitionId.class, "commitPart");
- /** Enlisted partitions: partition id -> (primary replica node, raft
term). */
+ /** Enlisted partitions: partition id -> (primary replica node, enlistment
consistency token). */
private final Map<TablePartitionId, IgniteBiTuple<ClusterNode, Long>>
enlisted = new ConcurrentHashMap<>();
/** The tracker is used to track an observable timestamp. */
@@ -89,13 +89,16 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/** {@inheritDoc} */
@Override
- public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndTerm(TablePartitionId partGroupId) {
+ public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndConsistencyToken(TablePartitionId partGroupId) {
return enlisted.get(partGroupId);
}
/** {@inheritDoc} */
@Override
- public IgniteBiTuple<ClusterNode, Long> enlist(TablePartitionId
tablePartitionId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+ public IgniteBiTuple<ClusterNode, Long> enlist(
+ TablePartitionId tablePartitionId,
+ IgniteBiTuple<ClusterNode, Long> nodeAndConsistencyToken
+ ) {
checkEnlistPossibility();
enlistPartitionLock.readLock().lock();
@@ -103,7 +106,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
try {
checkEnlistPossibility();
- return enlisted.computeIfAbsent(tablePartitionId, k ->
nodeAndTerm);
+ return enlisted.computeIfAbsent(tablePartitionId, k ->
nodeAndConsistencyToken);
} finally {
enlistPartitionLock.readLock().unlock();
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 56b5cb47dc..557918a950 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -572,17 +572,25 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
String primaryConsistentId,
- Long term,
+ Long enlistmentConsistencyToken,
boolean commit,
Collection<ReplicationGroupId> replicationGroupIds,
UUID txId,
HybridTimestamp commitTimestamp,
CompletableFuture<TransactionMeta> txFinishFuture
) {
- LOG.debug("Finish [partition={}, node={}, term={} commit={}, txId={},
groups={}",
- commitPartition, primaryConsistentId, term, commit, txId,
replicationGroupIds);
+ LOG.debug("Finish [partition={}, node={},
enlistmentConsistencyToken={} commit={}, txId={}, groups={}",
+ commitPartition, primaryConsistentId,
enlistmentConsistencyToken, commit, txId, replicationGroupIds);
- return txMessageSender.finish(primaryConsistentId, commitPartition,
replicationGroupIds, txId, term, commit, commitTimestamp)
+ return txMessageSender.finish(
+ primaryConsistentId,
+ commitPartition,
+ replicationGroupIds,
+ txId,
+ enlistmentConsistencyToken,
+ commit,
+ commitTimestamp
+ )
.thenAccept(txResult -> {
validateTxFinishedAsExpected(commit, txId, txResult);
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index 58b110179e..c51c4a0674 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -189,7 +189,7 @@ public class TxManagerTest extends IgniteAbstractTest {
tx.enlist(tablePartitionId, new IgniteBiTuple<>(node, 1L));
- assertEquals(new IgniteBiTuple<>(node, 1L),
tx.enlistedNodeAndTerm(tablePartitionId));
+ assertEquals(new IgniteBiTuple<>(node, 1L),
tx.enlistedNodeAndConsistencyToken(tablePartitionId));
}
@Test