This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5cfd89174d IGNITE-17951 Sql. Enlist partitions in transaction before
executing a query (#1501)
5cfd89174d is described below
commit 5cfd89174d9d6a23e05a3b920a5c2b07fb1cbd62
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Fri Feb 3 18:16:01 2023 +0300
IGNITE-17951 Sql. Enlist partitions in transaction before executing a query
(#1501)
---
.../ignite/client/fakes/FakeInternalTable.java | 35 +++++
.../apache/ignite/internal/index/HashIndex.java | 13 ++
.../org/apache/ignite/internal/index/Index.java | 21 +++
.../apache/ignite/internal/index/SortedIndex.java | 39 +++--
.../ignite/internal/index/SortedIndexImpl.java | 27 ++++
.../ignite/internal/table/ItTableScanTest.java | 64 ++++++--
.../internal/sql/engine/SqlQueryProcessor.java | 19 +--
.../internal/sql/engine/exec/ExecutionContext.java | 16 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 99 ++++++++++--
.../sql/engine/exec/LogicalRelImplementor.java | 4 +-
.../sql/engine/exec/rel/IndexScanNode.java | 76 +++++++---
.../sql/engine/exec/rel/StorageScanNode.java | 2 +-
.../sql/engine/exec/rel/TableScanNode.java | 37 +++--
.../sql/engine/message/QueryStartRequest.java | 7 +
.../sql/engine/metadata/ColocationGroup.java | 167 +++++++++++++++------
.../engine/metadata/IgniteMdFragmentMapping.java | 6 +-
.../internal/sql/engine/metadata/NodeWithTerm.java | 63 ++++++++
.../sql/engine/metadata/PartitionWithTerm.java | 58 +++++++
.../sql/engine/schema/IgniteTableImpl.java | 4 +-
.../sql/engine/trait/DistributionFunction.java | 4 +-
.../internal/sql/engine/util/BaseQueryContext.java | 21 +--
.../sql/engine/util/LocalTxAttributesHolder.java | 118 +++++++++++++++
.../internal/sql/engine/StopCalciteModuleTest.java | 9 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 12 +-
.../sql/engine/exec/RuntimeSortedIndexTest.java | 3 +-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 12 +-
.../exec/rel/IndexScanNodeExecutionTest.java | 14 +-
.../engine/exec/rel/MergeJoinExecutionTest.java | 2 +-
.../exec/rel/TableScanNodeExecutionTest.java | 63 +++++---
.../sql/engine/framework/TestBuilders.java | 5 +-
.../internal/sql/engine/framework/TestNode.java | 2 +-
.../sql/engine/planner/AbstractPlannerTest.java | 21 +++
.../internal/sql/engine/planner/PlannerTest.java | 49 +++---
.../ItAbstractInternalTableScanTest.java | 47 +++++-
.../ItInternalTableReadWriteScanTest.java | 30 +++-
.../ignite/internal/table/InternalTable.java | 54 +++++++
.../distributed/storage/InternalTableImpl.java | 91 ++++++++++-
.../ignite/internal/utils/PrimaryReplica.java} | 48 ++++--
.../table/impl/DummyInternalTableImpl.java | 9 ++
39 files changed, 1099 insertions(+), 272 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 24551df462..f1ad1c501a 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
@@ -334,6 +335,21 @@ public class FakeInternalTable implements InternalTable {
throw new IgniteInternalException(new
OperationNotSupportedException());
}
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> scan(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ @Nullable UUID indexId,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ int flags,
+ @Nullable BitSet columnsToInclude
+ ) {
+ throw new IgniteInternalException(new
OperationNotSupportedException());
+ }
+
/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> scan(
@@ -370,6 +386,19 @@ public class FakeInternalTable implements InternalTable {
throw new IgniteInternalException(new
OperationNotSupportedException());
}
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> lookup(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ UUID indexId,
+ BinaryTuple key,
+ @Nullable BitSet columnsToInclude
+ ) {
+ throw new IgniteInternalException(new
OperationNotSupportedException());
+ }
+
/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(
@@ -389,6 +418,12 @@ public class FakeInternalTable implements InternalTable {
throw new IgniteInternalException(new
OperationNotSupportedException());
}
+ /** {@inheritDoc} */
+ @Override
+ public List<PrimaryReplica> primaryReplicas() {
+ throw new IgniteInternalException(new
OperationNotSupportedException());
+ }
+
/** {@inheritDoc} */
@Override
public ClusterNode leaderAssignment(int partition) {
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
index 069b1f7b1b..a4fd38bc72 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/HashIndex.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -81,6 +82,18 @@ public class HashIndex implements Index<IndexDescriptor> {
return table.lookup(partId, tx, id, key, columns);
}
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> lookup(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ BinaryTuple key,
+ @Nullable BitSet columns
+ ) {
+ return table.lookup(partId, txId, recipient, id, key, columns);
+ }
+
/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
index 896ee3ab09..5e2fff76b5 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/Index.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -53,9 +54,29 @@ public interface Index<DescriptorT extends IndexDescriptor> {
* @param key Key to lookup.
* @param columns Columns to include.
* @return A cursor from resulting rows.
+ * @deprecated IGNITE-17952 Use {@link #lookup(int, UUID, PrimaryReplica,
BinaryTuple, BitSet)} instead.
*/
+ @Deprecated
Publisher<BinaryRow> lookup(int partId, @Nullable InternalTransaction tx,
BinaryTuple key, @Nullable BitSet columns);
+ /**
+ * Returns cursor for the values corresponding to the given key.
+ *
+ * @param partId Partition id.
+ * @param txId Transaction id.
+ * @param recipient Primary replica that will handle given get request.
+ * @param key Key to lookup.
+ * @param columns Columns to include.
+ * @return A cursor from resulting rows.
+ */
+ Publisher<BinaryRow> lookup(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ BinaryTuple key,
+ @Nullable BitSet columns
+ );
+
/**
* Returns cursor for the values corresponding to the given key.
*
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
index 0676ba557a..ed45802283 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndex.java
@@ -18,11 +18,13 @@
package org.apache.ignite.internal.index;
import java.util.BitSet;
+import java.util.UUID;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -39,24 +41,28 @@ public interface SortedIndex extends
Index<SortedIndexDescriptor> {
byte INCLUDE_RIGHT = 0b10;
/**
- * Opens a range cursor for given bounds with left bound included in
result and right excluded.
+ * Opens a range cursor for given bounds. Inclusion of the bounds is
defined by {@code flags} mask.
*
* @param partId Partition.
- * @param tx Transaction.
- * @param left Left bound of range.
- * @param right Right bound of range.
- * @param columns Columns to include.
+ * @param txId Transaction id.
+ * @param recipient Primary replica that will handle given get request.
+ * @param leftBound Left bound of range.
+ * @param rightBound Right bound of range.
+ * @param flags A mask that defines whether to include bounds into the
final result or not.
+ * @param columnsToInclude Columns to include.
* @return A cursor from resulting rows.
+ * @see SortedIndex#INCLUDE_LEFT
+ * @see SortedIndex#INCLUDE_RIGHT
*/
- default Publisher<BinaryRow> scan(
+ Publisher<BinaryRow> scan(
int partId,
- @Nullable InternalTransaction tx,
- @Nullable BinaryTuplePrefix left,
- @Nullable BinaryTuplePrefix right,
- @Nullable BitSet columns
- ) {
- return scan(partId, tx, left, right, INCLUDE_LEFT, columns);
- }
+ UUID txId,
+ PrimaryReplica recipient,
+ @Nullable BinaryTuplePrefix leftBound,
+ @Nullable BinaryTuplePrefix rightBound,
+ int flags,
+ @Nullable BitSet columnsToInclude
+ );
/**
* Opens a read-only range cursor for given bounds with left bound
included in result and right excluded.
@@ -81,7 +87,7 @@ public interface SortedIndex extends
Index<SortedIndexDescriptor> {
}
/**
- * Opens a range cursor for given bounds. Inclusion of the bounds is
defined by {@code includeBounds} mask.
+ * Opens a range cursor for given bounds. Inclusion of the bounds is
defined by {@code flags} mask.
*
* @param partId Partition.
* @param tx Transaction.
@@ -92,7 +98,9 @@ public interface SortedIndex extends
Index<SortedIndexDescriptor> {
* @return A cursor from resulting rows.
* @see SortedIndex#INCLUDE_LEFT
* @see SortedIndex#INCLUDE_RIGHT
+ * @deprecated IGNITE-17952 Use {@link #scan(int, UUID, PrimaryReplica,
BinaryTuplePrefix, BinaryTuplePrefix, int, BitSet)} instead.
*/
+ @Deprecated
Publisher<BinaryRow> scan(
int partId,
@Nullable InternalTransaction tx,
@@ -102,9 +110,8 @@ public interface SortedIndex extends
Index<SortedIndexDescriptor> {
@Nullable BitSet columnsToInclude
);
-
/**
- * Opens a range cursor for given bounds. Inclusion of the bounds is
defined by {@code includeBounds} mask.
+ * Opens a range cursor for given bounds. Inclusion of the bounds is
defined by {@code flags} mask.
*
* @param partId Partition.
* @param readTimestamp Read timestamp.
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
index 3ec520b895..c3d6f5c65b 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/SortedIndexImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -82,6 +83,18 @@ public class SortedIndexImpl implements SortedIndex {
return table.lookup(partId, tx, id, key, columns);
}
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> lookup(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ BinaryTuple key,
+ @Nullable BitSet columns
+ ) {
+ return table.lookup(partId, txId, recipient, id, key, columns);
+ }
+
/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(
@@ -120,4 +133,18 @@ public class SortedIndexImpl implements SortedIndex {
) {
return table.scan(partId, readTimestamp, recipientNode, id, leftBound,
rightBound, flags, columnsToInclude);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> scan(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ @Nullable BinaryTuplePrefix leftBound,
+ @Nullable BinaryTuplePrefix rightBound,
+ int flags,
+ @Nullable BitSet columnsToInclude
+ ) {
+ return table.scan(partId, txId, recipient, id, leftBound, rightBound,
flags, columnsToInclude);
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 70c0841c5e..9d9f3690c4 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -43,6 +43,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -56,9 +57,13 @@ import
org.apache.ignite.internal.schema.configuration.index.TableIndexConfigura
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.sql.engine.AbstractBasicIntegrationTest;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.IgniteTransactions;
@@ -107,11 +112,12 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
@Test
public void testInsertWaitScanComplete() throws Exception {
+ int partId = 0;
TableImpl table = getOrCreateTable();
IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
InternalTransaction tx0 = (InternalTransaction) transactions.begin();
- InternalTransaction tx1 = (InternalTransaction) transactions.begin();
+ InternalTransaction tx1 = startTxWithEnlistedPartition(partId);
InternalTable internalTable = table.internalTable();
@@ -119,7 +125,11 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
ArrayList<BinaryRow> scannedRows = new ArrayList<>();
- Publisher<BinaryRow> publisher = internalTable.scan(0, tx1,
sortedIndexId, null, null, 0, null);
+ IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx1.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+
+ PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+
+ Publisher<BinaryRow> publisher = internalTable.scan(partId, tx1.id(),
recipient, sortedIndexId, null, null, 0, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -459,9 +469,15 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
ArrayList<BinaryRow> scannedRows = new ArrayList<>();
- InternalTransaction tx = (InternalTransaction)
CLUSTER_NODES.get(0).transactions().begin();
+ int partId = 0;
+
+ InternalTransaction tx = startTxWithEnlistedPartition(partId);
+
+ IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
- Publisher<BinaryRow> publisher = internalTable.scan(0, tx,
sortedIndexId, null, null, 0, null);
+ PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+
+ Publisher<BinaryRow> publisher = internalTable.scan(partId, tx.id(),
recipient, sortedIndexId, null, null, 0, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -487,7 +503,7 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
assertEquals(ROW_IDS.size() + 1, scannedRows.size());
- var publisher1 = internalTable.scan(0, tx, sortedIndexId, null, null,
0, null);
+ Publisher<BinaryRow> publisher1 = internalTable.scan(0, tx.id(),
recipient, sortedIndexId, null, null, 0, null);
assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
@@ -515,11 +531,16 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
UUID soredIndexId = getSortedIndexId();
- InternalTransaction tx = (InternalTransaction)
CLUSTER_NODES.get(0).transactions().begin();
+ int partId = 0;
+
+ InternalTransaction tx = startTxWithEnlistedPartition(partId);
+ IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+ PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
Publisher<BinaryRow> publisher = internalTable.scan(
- 0,
- tx,
+ partId,
+ tx.id(),
+ recipient,
soredIndexId,
lowBound,
upperBound,
@@ -540,8 +561,9 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
kvView.put(null, Tuple.create().set("key", 9),
Tuple.create().set("valInt", 9).set("valStr", "New_9")));
Publisher<BinaryRow> publisher1 = internalTable.scan(
- 0,
- tx,
+ partId,
+ tx.id(),
+ recipient,
soredIndexId,
lowBound,
upperBound,
@@ -763,4 +785,26 @@ public class ItTableScanTest extends
AbstractBasicIntegrationTest {
return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
}
+
+ /**
+ * Starts an RW transaction and enlists the specified partition in it.
+ *
+ * @param partId Partition ID.
+ * @return Transaction.
+ */
+ private InternalTransaction startTxWithEnlistedPartition(int partId) {
+ Ignite ignite = CLUSTER_NODES.get(0);
+
+ InternalTransaction tx = (InternalTransaction)
ignite.transactions().begin();
+
+ InternalTable table = ((TableImpl)
ignite.tables().table(TABLE_NAME)).internalTable();
+ TablePartitionId tblPartId = new TablePartitionId(table.tableId(),
partId);
+ RaftGroupService raftSvc = table.partitionRaftGroupService(partId);
+ long term =
IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term();
+
+ tx.assignCommitPartition(tblPartId);
+ tx.enlist(tblPartId, new
IgniteBiTuple<>(table.leaderAssignment(partId), term));
+
+ return tx;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 2e383ec123..ad699136f6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -45,7 +45,6 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.index.event.IndexEvent;
import org.apache.ignite.internal.index.event.IndexEventParameters;
@@ -74,6 +73,7 @@ import
org.apache.ignite.internal.sql.engine.session.SessionInfo;
import org.apache.ignite.internal.sql.engine.session.SessionManager;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.event.TableEvent;
@@ -399,22 +399,19 @@ public class SqlQueryProcessor implements QueryProcessor {
return nodes.get(0);
})
.thenCompose(sqlNode -> {
- final boolean rwOp = dataModificationOp(sqlNode);
- final HybridTimestamp txTime = outerTx != null ?
outerTx.readTimestamp() : rwOp ? null : clock.now();
+ boolean rwOp = dataModificationOp(sqlNode);
+ boolean useDistributedTraits = outerTx != null ?
outerTx.isReadOnly() : !rwOp;
BaseQueryContext ctx = BaseQueryContext.builder()
.frameworkConfig(
Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
.defaultSchema(schema)
- .traitDefs(rwOp || (outerTx !=
null && !outerTx.isReadOnly()) ? Commons.LOCAL_TRAITS_SET :
-
Commons.DISTRIBUTED_TRAITS_SET)
+ .traitDefs(useDistributedTraits ?
Commons.DISTRIBUTED_TRAITS_SET : Commons.LOCAL_TRAITS_SET)
.build()
)
.logger(LOG)
.cancel(queryCancel)
.parameters(params)
- .transaction(outerTx)
- .transactionTime(txTime)
.plannerTimeout(PLANNER_TIMEOUT)
.build();
@@ -425,17 +422,17 @@ public class SqlQueryProcessor implements QueryProcessor {
boolean implicitTxRequired = outerTx == null
&& rwOp;
- InternalTransaction implicitTx =
implicitTxRequired ? txManager.begin() : null;
+ InternalTransaction tx = implicitTxRequired ?
txManager.begin()
+ : outerTx != null ? outerTx : new
LocalTxAttributesHolder(null, clock.now());
- BaseQueryContext enrichedContext =
- implicitTxRequired ?
ctx.toBuilder().transaction(implicitTx).build() : ctx;
+ BaseQueryContext enrichedContext =
ctx.toBuilder().transaction(tx).build();
var dataCursor =
executionSrvc.executePlan(plan, enrichedContext);
return new AsyncSqlCursorImpl<>(
SqlQueryType.mapPlanTypeToSqlType(plan.type()),
plan.metadata(),
- implicitTx,
+ implicitTxRequired ? tx : null,
new AsyncCursor<List<Object>>() {
@Override
public
CompletableFuture<BatchedResult<List<Object>>> requestNextAsync(int rows) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 42fc639162..18eb033296 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -32,7 +32,6 @@ import java.util.function.Consumer;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.BinaryConverter;
@@ -84,9 +83,6 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
private final AtomicBoolean cancelFlag = new AtomicBoolean();
- /** Transaction. */
- private InternalTransaction tx;
-
/**
* Need to store timestamp, since SQL standard says that functions such as
CURRENT_TIMESTAMP return the same value throughout the
* query.
@@ -104,7 +100,6 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
* @param fragmentDesc Partitions information.
* @param handler Row handler.
* @param params Parameters.
- * @param tx Transaction.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public ExecutionContext(
@@ -115,8 +110,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
String originatingNodeName,
FragmentDescription fragmentDesc,
RowHandler<RowT> handler,
- Map<String, Object> params,
- InternalTransaction tx
+ Map<String, Object> params
) {
super(qctx);
@@ -128,7 +122,6 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
this.params = params;
this.localNode = localNode;
this.originatingNodeName = originatingNodeName;
- this.tx = tx;
expressionFactory = new ExpressionFactoryImpl<>(
this,
@@ -338,12 +331,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
/** Transaction for current context. */
public InternalTransaction transaction() {
- return tx;
- }
-
- /** Read only transaction time. */
- public HybridTimestamp transactionTime() {
- return qctx.transactionTime();
+ return qctx.transaction();
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 406447c379..bf80d7409d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -37,6 +37,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.calcite.tools.Frameworks;
@@ -56,24 +57,35 @@ import
org.apache.ignite.internal.sql.engine.message.QueryStartRequest;
import org.apache.ignite.internal.sql.engine.message.QueryStartResponse;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.metadata.MappingService;
import org.apache.ignite.internal.sql.engine.metadata.MappingServiceImpl;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
import org.apache.ignite.internal.sql.engine.metadata.RemoteException;
import org.apache.ignite.internal.sql.engine.prepare.DdlPlan;
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.FragmentPlan;
+import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
+import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
+import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
+import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
+import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
+import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -99,6 +111,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private final MessageService msgSrvc;
+ private final TopologyService topSrvc;
+
private final ClusterNode localNode;
private final SqlSchemaManager sqlSchemaManager;
@@ -142,8 +156,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
ExchangeService exchangeSrvc
) {
return new ExecutionServiceImpl<>(
- topSrvc.localMember(),
msgSrvc,
+ topSrvc,
new MappingServiceImpl(topSrvc),
sqlSchemaManager,
ddlCommandHandler,
@@ -160,11 +174,21 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
/**
- * Constructor. TODO Documentation
https://issues.apache.org/jira/browse/IGNITE-15859
+ * Constructor.
+ *
+ * @param msgSrvc Message service.
+ * @param topSrvc Topology service.
+ * @param mappingSrvc Nodes mapping calculation service.
+ * @param sqlSchemaManager Schema manager.
+ * @param ddlCmdHnd Handler of the DDL commands.
+ * @param taskExecutor Task executor.
+ * @param handler Row handler.
+ * @param exchangeSrvc Exchange service.
+ * @param implementorFactory Relational node implementor factory.
*/
public ExecutionServiceImpl(
- ClusterNode localNode,
MessageService msgSrvc,
+ TopologyService topSrvc,
MappingService mappingSrvc,
SqlSchemaManager sqlSchemaManager,
DdlCommandHandler ddlCmdHnd,
@@ -173,10 +197,11 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
ExchangeService exchangeSrvc,
ImplementorFactory<RowT> implementorFactory
) {
- this.localNode = localNode;
+ this.localNode = topSrvc.localMember();
this.handler = handler;
this.msgSrvc = msgSrvc;
this.mappingSrvc = mappingSrvc;
+ this.topSrvc = topSrvc;
this.sqlSchemaManager = sqlSchemaManager;
this.taskExecutor = taskExecutor;
this.exchangeSrvc = exchangeSrvc;
@@ -197,11 +222,9 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
BaseQueryContext ctx,
MultiStepPlan plan
) {
- DistributedQueryManager queryManager;
-
- InternalTransaction tx = ctx.transaction();
+ DistributedQueryManager queryManager = new
DistributedQueryManager(ctx, ctx.transaction());
- DistributedQueryManager old = queryManagerMap.put(ctx.queryId(),
queryManager = new DistributedQueryManager(ctx, tx));
+ DistributedQueryManager old = queryManagerMap.put(ctx.queryId(),
queryManager);
assert old == null;
@@ -210,7 +233,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return queryManager.execute(plan);
}
- private BaseQueryContext createQueryContext(UUID queryId, @Nullable String
schema, Object[] params, HybridTimestamp txTime) {
+ private BaseQueryContext createQueryContext(UUID queryId, @Nullable String
schema, Object[] params, HybridTimestamp txTime, UUID txId) {
return BaseQueryContext.builder()
.queryId(queryId)
.parameters(params)
@@ -220,7 +243,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.build()
)
.logger(LOG)
- .transactionTime(txTime)
+ .transaction(new LocalTxAttributesHolder(txId, txTime))
.build();
}
@@ -303,7 +326,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
assert nodeName != null && msg != null;
DistributedQueryManager queryManager =
queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
- BaseQueryContext ctx = createQueryContext(key, msg.schema(),
msg.parameters(), msg.txTime());
+ BaseQueryContext ctx = createQueryContext(key, msg.schema(),
msg.parameters(), msg.txTime(), msg.txId());
return new DistributedQueryManager(ctx);
});
@@ -433,7 +456,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.root(fragment.serialized())
.fragmentDescription(desc)
.parameters(ctx.parameters())
- .txTime(ctx.transactionTime())
+ .txTime(ctx.transaction().readTimestamp())
+ .txId(ctx.transaction().id())
.build();
var fut = new CompletableFuture<Void>();
@@ -538,8 +562,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
initiatorNodeName,
desc,
handler,
- Commons.parametersMap(ctx.parameters()),
- tx
+ Commons.parametersMap(ctx.parameters())
);
}
@@ -583,6 +606,10 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
// start remote execution
for (Fragment fragment : fragments) {
+ if (tx != null && !tx.isReadOnly()) {
+ enlistPartitions(fragment, tx);
+ }
+
if (fragment.rootFragment()) {
assert rootFragmentId == null;
@@ -630,6 +657,50 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
};
}
+ private void enlistPartitions(Fragment fragment, InternalTransaction
tx) {
+ // TODO IGNITE-17952 Next condition should be removed.
+ if (!TraitUtils.distributionEnabled(fragment.root())) {
+ return;
+ }
+
+ new IgniteRelShuttle() {
+ @Override
+ public IgniteRel visit(IgniteIndexScan rel) {
+ enlist(rel);
+
+ return super.visit(rel);
+ }
+
+ @Override
+ public IgniteRel visit(IgniteTableScan rel) {
+ enlist(rel);
+
+ return super.visit(rel);
+ }
+
+ private void enlist(SourceAwareIgniteRel rel) {
+ InternalIgniteTable tbl =
rel.getTable().unwrap(InternalIgniteTable.class);
+ ColocationGroup grp =
fragment.mapping().findGroup(rel.sourceId());
+
+ if (grp.assignments().isEmpty()) {
+ return;
+ }
+
+ int partsCnt = grp.assignments().size();
+
+ tx.assignCommitPartition(new TablePartitionId(tbl.id(),
ThreadLocalRandom.current().nextInt(partsCnt)));
+
+ for (int p = 0; p < partsCnt; p++) {
+ List<NodeWithTerm> assign = grp.assignments().get(p);
+ NodeWithTerm leaderWithTerm = assign.get(0);
+
+ tx.enlist(new TablePartitionId(tbl.id(), p),
+ new
IgniteBiTuple<>(topSrvc.getByConsistentId(leaderWithTerm.name()),
leaderWithTerm.term()));
+ }
+ }
+ }.visit(fragment.root());
+ }
+
private CompletableFuture<Void> close(boolean cancel) {
if (!cancelled.compareAndSet(false, true)) {
return cancelFut.thenApply(Function.identity());
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 5cb79e1cce..63ae7944e7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -367,7 +367,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType),
idx,
tbl,
- group.partitions(ctx.localNode().name()),
+ group.partitionsWithTerms(ctx.localNode().name()),
comp,
ranges,
filters,
@@ -404,7 +404,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
ctx,
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType),
tbl,
- group.partitions(ctx.localNode().name()),
+ group.partitionsWithTerms(ctx.localNode().name()),
filters,
prj,
requiredColumns == null ? null : requiredColumns.toBitSet()
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index ae53e7f8e3..80a1f90a38 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
-import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
-
-import java.util.Arrays;
import java.util.BitSet;
+import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.Flow.Publisher;
@@ -36,12 +34,16 @@ import
org.apache.ignite.internal.sql.engine.exec.RowConverter;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.SubscriptionUtils;
import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.Nullable;
@@ -57,7 +59,8 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
private final RowHandler.RowFactory<RowT> factory;
- private final int[] parts;
+ /** List of pairs containing the partition number to scan with the
corresponding primary replica term. */
+ private final Collection<PartitionWithTerm> partsWithTerms;
/** Participating columns. */
private final @Nullable BitSet requiredColumns;
@@ -72,7 +75,7 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
* @param ctx Execution context.
* @param rowFactory Row factory.
* @param schemaTable The table this node should scan.
- * @param parts Partition numbers to scan.
+ * @param partsWithTerms List of pairs containing the partition number to
scan with the corresponding primary replica term.
* @param comp Rows comparator.
* @param rangeConditions Range conditions.
* @param filters Optional filter to filter out rows.
@@ -84,7 +87,7 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
RowHandler.RowFactory<RowT> rowFactory,
IgniteIndex schemaIndex,
InternalIgniteTable schemaTable,
- int[] parts,
+ Collection<PartitionWithTerm> partsWithTerms,
@Nullable Comparator<RowT> comp,
@Nullable RangeIterable<RowT> rangeConditions,
@Nullable Predicate<RowT> filters,
@@ -93,11 +96,11 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
) {
super(ctx, rowFactory, schemaTable, filters, rowTransformer,
requiredColumns);
- assert !nullOrEmpty(parts);
+ assert partsWithTerms != null && !partsWithTerms.isEmpty();
assert rangeConditions == null || rangeConditions.size() > 0;
this.schemaIndex = schemaIndex;
- this.parts = parts;
+ this.partsWithTerms = partsWithTerms;
this.requiredColumns = requiredColumns;
this.rangeConditions = rangeConditions;
this.comp = comp;
@@ -110,16 +113,17 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
@Override
protected Publisher<RowT> scan() {
if (rangeConditions != null) {
- return SubscriptionUtils.concat(new
TransformingIterator<>(rangeConditions.iterator(), cond ->
indexPublisher(parts, cond)));
+ return SubscriptionUtils.concat(
+ new TransformingIterator<>(rangeConditions.iterator(),
cond -> indexPublisher(partsWithTerms, cond)));
} else {
- return indexPublisher(parts, null);
+ return indexPublisher(partsWithTerms, null);
}
}
- private Publisher<RowT> indexPublisher(int[] parts, @Nullable
RangeCondition<RowT> cond) {
+ private Publisher<RowT> indexPublisher(Collection<PartitionWithTerm>
partsWithTerms, @Nullable RangeCondition<RowT> cond) {
Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
- Arrays.stream(parts).iterator(),
- part -> partitionPublisher(part, cond)
+ partsWithTerms.iterator(),
+ partWithTerm -> partitionPublisher(partWithTerm, cond)
);
if (comp != null) {
@@ -129,9 +133,9 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
}
}
- private Publisher<RowT> partitionPublisher(int part, @Nullable
RangeCondition<RowT> cond) {
+ private Publisher<RowT> partitionPublisher(PartitionWithTerm partWithTerm,
@Nullable RangeCondition<RowT> cond) {
Publisher<BinaryRow> pub;
- boolean roTx = context().transactionTime() != null;
+ InternalTransaction tx = context().transaction();
if (schemaIndex.type() == Type.SORTED) {
int flags = 0;
@@ -148,20 +152,32 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
flags |= (cond.upperInclude()) ? SortedIndex.INCLUDE_RIGHT : 0;
}
- if (roTx) {
+ if (tx.isReadOnly()) {
pub = ((SortedIndex) schemaIndex.index()).scan(
- part,
- context().transactionTime(),
+ partWithTerm.partId(),
+ tx.readTimestamp(),
context().localNode(),
lower,
upper,
flags,
requiredColumns
);
+ } else if (!(tx instanceof LocalTxAttributesHolder)) {
+ // TODO IGNITE-17952 This block should be removed.
+ // Workaround to make RW scan work from tx coordinator.
+ pub = ((SortedIndex) schemaIndex.index()).scan(
+ partWithTerm.partId(),
+ tx,
+ lower,
+ upper,
+ flags,
+ requiredColumns
+ );
} else {
pub = ((SortedIndex) schemaIndex.index()).scan(
- part,
- context().transaction(),
+ partWithTerm.partId(),
+ tx.id(),
+ new PrimaryReplica(context().localNode(),
partWithTerm.term()),
lower,
upper,
flags,
@@ -174,18 +190,28 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
BinaryTuple key = toBinaryTuple(cond.lower());
- if (roTx) {
+ if (tx.isReadOnly()) {
pub = schemaIndex.index().lookup(
- part,
- context().transactionTime(),
+ partWithTerm.partId(),
+ tx.readTimestamp(),
context().localNode(),
key,
requiredColumns
);
+ } else if (!(tx instanceof LocalTxAttributesHolder)) {
+ // TODO IGNITE-17952 This block should be removed.
+ // Workaround to make RW lookup work from tx coordinator.
+ pub = schemaIndex.index().lookup(
+ partWithTerm.partId(),
+ tx,
+ key,
+ requiredColumns
+ );
} else {
pub = schemaIndex.index().lookup(
- part,
- context().transaction(),
+ partWithTerm.partId(),
+ tx.id(),
+ new PrimaryReplica(context().localNode(),
partWithTerm.term()),
key,
requiredColumns
);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
index 881e3a4f0f..4e2382107a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
@@ -80,7 +80,7 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
) {
super(ctx);
- assert context().transaction() != null || context().transactionTime()
!= null : "Transaction not initialized.";
+ assert ctx.transaction() != null : "Transaction not initialized.";
tableRowConverter = row -> schemaTable.toRow(context(), row,
rowFactory, requiredColumns);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index 57227042ec..5c332762e4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
-import static org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
-
-import java.util.Arrays;
import java.util.BitSet;
+import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
@@ -28,10 +26,14 @@ import java.util.function.Predicate;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.SubscriptionUtils;
import org.apache.ignite.internal.util.TransformingIterator;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.jetbrains.annotations.Nullable;
/**
@@ -42,7 +44,8 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
/** Table that provides access to underlying data. */
private final InternalTable physTable;
- private final int[] parts;
+ /** List of pairs containing the partition number to scan with the
corresponding primary replica term. */
+ private final Collection<PartitionWithTerm> partsWithTerms;
/**
* Constructor.
@@ -50,7 +53,7 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
* @param ctx Execution context.
* @param rowFactory Row factory.
* @param schemaTable The table this node should scan.
- * @param parts Partition numbers to scan.
+ * @param partsWithTerms List of pairs containing the partition number to
scan with the corresponding primary replica term.
* @param filters Optional filter to filter out rows.
* @param rowTransformer Optional projection function.
* @param requiredColumns Optional set of column of interest.
@@ -59,31 +62,37 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
ExecutionContext<RowT> ctx,
RowHandler.RowFactory<RowT> rowFactory,
InternalIgniteTable schemaTable,
- int[] parts,
+ Collection<PartitionWithTerm> partsWithTerms,
@Nullable Predicate<RowT> filters,
@Nullable Function<RowT, RowT> rowTransformer,
@Nullable BitSet requiredColumns
) {
super(ctx, rowFactory, schemaTable, filters, rowTransformer,
requiredColumns);
- assert !nullOrEmpty(parts);
+ assert partsWithTerms != null && !partsWithTerms.isEmpty();
this.physTable = schemaTable.table();
- this.parts = parts;
+ this.partsWithTerms = partsWithTerms;
}
/** {@inheritDoc} */
@Override
protected Publisher<RowT> scan() {
- boolean roTx = context().transactionTime() != null;
-
+ InternalTransaction tx = context().transaction();
Iterator<Publisher<? extends RowT>> it = new TransformingIterator<>(
- Arrays.stream(parts).iterator(), part -> {
+ partsWithTerms.iterator(), partWithTerm -> {
Publisher<BinaryRow> pub;
- if (roTx) {
- pub = physTable.scan(part, context().transactionTime(),
context().localNode());
+
+ if (tx.isReadOnly()) {
+ pub = physTable.scan(partWithTerm.partId(),
tx.readTimestamp(), context().localNode());
+ } else if (!(tx instanceof LocalTxAttributesHolder)) {
+ // TODO IGNITE-17952 This block should be removed.
+ // Workaround to make RW scan work from tx coordinator.
+ pub = physTable.scan(partWithTerm.partId(), tx);
} else {
- pub = physTable.scan(part, context().transaction());
+ PrimaryReplica recipient = new
PrimaryReplica(context().localNode(), partWithTerm.term());
+
+ pub = physTable.scan(partWithTerm.partId(), tx.id(),
recipient, null, null, null, 0, null);
}
return convertPublisher(pub);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index 73d85c97a7..d18edca3f6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.message;
+import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.network.annotations.Marshallable;
@@ -55,4 +56,10 @@ public interface QueryStartRequest extends
ExecutionContextAwareMessage {
*/
@Marshallable
@Nullable HybridTimestamp txTime();
+
+ /**
+ * Transaction id.
+ */
+ @Marshallable
+ UUID txId();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
index 4b153cffab..fc33780504 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.sql.engine.metadata;
-import static org.apache.ignite.internal.util.ArrayUtils.asList;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
@@ -27,11 +26,13 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.util.IgniteIntList;
import org.jetbrains.annotations.NotNull;
/**
@@ -39,14 +40,11 @@ import org.jetbrains.annotations.NotNull;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class ColocationGroup implements Serializable {
- private static final int SYNTHETIC_PARTITIONS_COUNT = 512;
- // TODO:
IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT",
512);
-
private final List<Long> sourceIds;
private final List<String> nodeNames;
- private final List<List<String>> assignments;
+ private final List<List<NodeWithTerm>> assignments;
/**
* ForNodes.
@@ -60,7 +58,7 @@ public class ColocationGroup implements Serializable {
* ForAssignments.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public static ColocationGroup forAssignments(List<List<String>>
assignments) {
+ public static ColocationGroup forAssignments(List<List<NodeWithTerm>>
assignments) {
return new ColocationGroup(null, null, assignments);
}
@@ -76,7 +74,7 @@ public class ColocationGroup implements Serializable {
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- private ColocationGroup(List<Long> sourceIds, List<String> nodeNames,
List<List<String>> assignments) {
+ private ColocationGroup(List<Long> sourceIds, List<String> nodeNames,
List<List<NodeWithTerm>> assignments) {
this.sourceIds = sourceIds;
this.nodeNames = nodeNames;
this.assignments = assignments;
@@ -100,7 +98,7 @@ public class ColocationGroup implements Serializable {
* Get list of partitions (index) and nodes (items) having an appropriate
partition in OWNING state, calculated for
* distributed tables, involved in query execution.
*/
- public List<List<String>> assignments() {
+ public List<List<NodeWithTerm>> assignments() {
return assignments == null ? Collections.emptyList() : assignments;
}
@@ -151,18 +149,20 @@ public class ColocationGroup implements Serializable {
+ "Replicated query parts are not co-located on all
nodes");
}
- List<List<String>> assignments;
+ List<List<NodeWithTerm>> assignments;
+ Set<String> nodeNamesSet = nodeNames == null ? null : new
HashSet<>(nodeNames);
+ Predicate<String> nodeNamesFilter = nodeNames == null ? v -> true :
nodeNamesSet::contains;
+
if (this.assignments == null || other.assignments == null) {
assignments = firstNotNull(this.assignments, other.assignments);
- if (assignments != null && nodeNames != null) {
- Set<String> filter = new HashSet<>(nodeNames);
- List<List<String>> assignments0 = new
ArrayList<>(assignments.size());
+ if (assignments != null && nodeNamesSet != null) {
+ List<List<NodeWithTerm>> assignments0 = new
ArrayList<>(assignments.size());
for (int i = 0; i < assignments.size(); i++) {
- List<String> assignment = Commons.intersect(filter,
assignments.get(i));
+ List<NodeWithTerm> assignment =
filterByNodeNames(assignments.get(i), nodeNamesFilter);
- if (assignment.isEmpty()) { // TODO check with partition
filters
+ if (assignment.isEmpty()) {
throw new ColocationMappingException("Failed to map
fragment to location. "
+ "Partition mapping is empty [part=" + i +
"]");
}
@@ -174,17 +174,17 @@ public class ColocationGroup implements Serializable {
}
} else {
assert this.assignments.size() == other.assignments.size();
+
assignments = new ArrayList<>(this.assignments.size());
- Set<String> filter = nodeNames == null ? null : new
HashSet<>(nodeNames);
- for (int i = 0; i < this.assignments.size(); i++) {
- List<String> assignment =
Commons.intersect(this.assignments.get(i), other.assignments.get(i));
- if (filter != null) {
- assignment.retainAll(filter);
- }
+ for (int p = 0; p < this.assignments.size(); p++) {
+ List<NodeWithTerm> assignment0 = this.assignments.get(p);
+ List<NodeWithTerm> assignment1 = other.assignments.get(p);
+
+ List<NodeWithTerm> assignment = intersect(assignment0,
assignment1, nodeNamesFilter, p);
- if (assignment.isEmpty()) { // TODO check with partition
filters
- throw new ColocationMappingException("Failed to map
fragment to location. Partition mapping is empty [part=" + i + "]");
+ if (assignment.isEmpty()) {
+ throw new ColocationMappingException("Failed to map
fragment to location. Partition mapping is empty [part=" + p + "]");
}
assignments.add(assignment);
@@ -194,22 +194,92 @@ public class ColocationGroup implements Serializable {
return new ColocationGroup(sourceIds, nodeNames, assignments);
}
+ private List<NodeWithTerm> intersect(
+ List<NodeWithTerm> assignment0,
+ List<NodeWithTerm> assignment1,
+ Predicate<String> filter,
+ int p
+ ) throws ColocationMappingException {
+ if (assignment0.size() == 1 && assignment1.size() == 1) {
+ NodeWithTerm first = assignment0.get(0);
+ NodeWithTerm second = assignment1.get(0);
+
+ if (filter.test(first.name()) && Objects.equals(first.name(),
second.name())) {
+ validateTerm(first, second, p);
+
+ return assignment0;
+ }
+
+ return Collections.emptyList();
+ }
+
+ if (assignment0.size() > assignment1.size()) {
+ List<NodeWithTerm> tmp = assignment0;
+ assignment0 = assignment1;
+ assignment1 = tmp;
+ }
+
+ List<NodeWithTerm> assignment = new ArrayList<>();
+
+ // Filter and hash a smaller list.
+ Map<String, NodeWithTerm> nameToAssignmentMapping =
assignment0.stream()
+ .filter(v -> filter.test(v.name()))
+ .collect(Collectors.toMap(NodeWithTerm::name, nodeWithTerm ->
nodeWithTerm));
+
+ // Iterate over a larger list.
+ for (NodeWithTerm first : assignment1) {
+ NodeWithTerm second = nameToAssignmentMapping.get(first.name());
+
+ if (second == null) {
+ continue;
+ }
+
+ validateTerm(first, second, p);
+
+ assignment.add(first);
+ }
+
+ return assignment;
+ }
+
+ private void validateTerm(NodeWithTerm first, NodeWithTerm second, int
partId) throws ColocationMappingException {
+ if (first.term() != second.term()) {
+ throw new ColocationMappingException("Primary replica term has
been changed during mapping ["
+ + "node=" + first.name()
+ + ", expectedTerm=" + first.term()
+ + ", actualTerm=" + second.term()
+ + ", part=" + partId
+ + ']');
+ }
+ }
+
+ private List<NodeWithTerm> filterByNodeNames(List<NodeWithTerm>
assignment, Predicate<String> filter) {
+ List<NodeWithTerm> res = new ArrayList<>(assignment.size());
+
+ for (NodeWithTerm nodeWithTerm : assignment) {
+ if (!filter.test(nodeWithTerm.name())) {
+ continue;
+ }
+
+ res.add(nodeWithTerm);
+ }
+
+ return res;
+ }
+
/**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Creates a new colocation group using only primary assignments.
+ *
+ * @return Colocation group with primary assignments.
*/
public ColocationGroup complete() {
- if (assignments == null && nodeNames == null) {
- return this;
- }
-
if (assignments != null) {
- List<List<String>> assignments = new
ArrayList<>(this.assignments.size());
+ List<List<NodeWithTerm>> assignments = new
ArrayList<>(this.assignments.size());
Set<String> nodes = new HashSet<>();
- for (List<String> assignment : this.assignments) {
- String first = first(assignment);
+ for (List<NodeWithTerm> assignment : this.assignments) {
+ NodeWithTerm first = first(assignment);
if (first != null) {
- nodes.add(first);
+ nodes.add(first.name());
}
assignments.add(first != null ?
Collections.singletonList(first) : Collections.emptyList());
}
@@ -217,7 +287,7 @@ public class ColocationGroup implements Serializable {
return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments);
}
- return forNodes0(nodeNames);
+ return mapToNodes(nodeNames);
}
/**
@@ -230,29 +300,30 @@ public class ColocationGroup implements Serializable {
@NotNull
private ColocationGroup forNodes0(List<String> nodeNames) {
- List<List<String>> assignments = new
ArrayList<>(SYNTHETIC_PARTITIONS_COUNT);
- for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++) {
- assignments.add(asList(nodeNames.get(i % nodeNames.size())));
- }
return new ColocationGroup(sourceIds, nodeNames, assignments);
}
/**
- * Returns List of partitions to scan on the given node.
+ * Returns list of pairs containing the partition number to scan on the
given node with the corresponding primary replica term.
*
- * @param nodeNames Cluster node consistent ID.
- * @return List of partitions to scan on the given node.
+ * @param nodeName Cluster node consistent ID.
+ * @return List of pairs containing the partition number to scan on the
given node with the corresponding primary replica term.
*/
- public int[] partitions(String nodeNames) {
- IgniteIntList parts = new IgniteIntList(assignments.size());
+ public List<PartitionWithTerm> partitionsWithTerms(String nodeName) {
+ List<PartitionWithTerm> partsWithTerms = new ArrayList<>();
+
+ for (int p = 0; p < assignments.size(); p++) {
+ List<NodeWithTerm> assignment = assignments.get(p);
+
+ NodeWithTerm nodeWithTerm = first(assignment);
+
+ assert nodeWithTerm != null : "part=" + p;
- for (int i = 0; i < assignments.size(); i++) {
- List<String> assignment = assignments.get(i);
- if (Objects.equals(nodeNames, first(assignment))) {
- parts.add(i);
+ if (Objects.equals(nodeName, nodeWithTerm.name())) {
+ partsWithTerms.add(new PartitionWithTerm(p,
nodeWithTerm.term()));
}
}
- return parts.array();
+ return partsWithTerms;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
index 33d9bf189d..c08820b7f9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
@@ -227,6 +227,7 @@ public class IgniteMdFragmentMapping implements
MetadataHandler<FragmentMappingM
private static FragmentMapping getFragmentMapping(long sourceId,
ProjectableFilterableTableScan rel, MappingQueryContext ctx) {
ColocationGroup group =
rel.getTable().unwrap(InternalIgniteTable.class).colocationGroup(ctx);
+ // TODO IGNITE-17952 The following block should be removed.
// This condition is kinda workaround to make transactional scan works.
//
// For now, scan should be invoked on the node that coordinates the
transaction.
@@ -234,10 +235,9 @@ public class IgniteMdFragmentMapping implements
MetadataHandler<FragmentMappingM
// will need to replace actual distribution with fake one where every
partition
// is owned by a local node.
if (!TraitUtils.distributionEnabled(rel)) {
- List<List<String>> fakeAssignments = new
ArrayList<>(group.assignments().size());
-
+ List<List<NodeWithTerm>> fakeAssignments = new
ArrayList<>(group.assignments().size());
for (int i = 0; i < group.assignments().size(); i++) {
- fakeAssignments.add(List.of(ctx.locNodeName()));
+ fakeAssignments.add(List.of(new
NodeWithTerm(ctx.locNodeName(), -1L)));
}
group = ColocationGroup.forAssignments(fakeAssignments);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java
new file mode 100644
index 0000000000..61e15efa5e
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/NodeWithTerm.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.metadata;
+
+import java.io.Serializable;
+
+/**
+ * Tuple representing primary replica node name with current term.
+ */
+public class NodeWithTerm implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Primary replica node name. */
+ private final String name;
+
+ /** Primary replica term. */
+ private final long term;
+
+ /**
+ * Constructor.
+ *
+ * @param name Primary replica node name.
+ * @param term Primary replica term.
+ */
+ public NodeWithTerm(String name, Long term) {
+ this.name = name;
+ this.term = term;
+ }
+
+ /**
+ * Gets primary replica node name.
+ *
+ * @return Primary replica node name.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Gets primary replica term.
+ *
+ * @return Primary replica term.
+ */
+ public long term() {
+ return term;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java
new file mode 100644
index 0000000000..3da423a687
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/PartitionWithTerm.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.metadata;
+
+/**
+ * Tuple representing the number of the partition with its current primary
replica term.
+ */
+public class PartitionWithTerm {
+ /** Partition number. */
+ private final int partId;
+
+ /** Primary replica term. */
+ private final long term;
+
+ /**
+ * Constructor.
+ *
+ * @param partId partition number
+ * @param term Primary replica term.
+ */
+ public PartitionWithTerm(int partId, Long term) {
+ this.partId = partId;
+ this.term = term;
+ }
+
+ /**
+ * Gets partition number.
+ *
+ * @return Partition number.
+ */
+ public int partId() {
+ return partId;
+ }
+
+ /**
+ * Gets primary replica term.
+ *
+ * @return Primary replica term.
+ */
+ public long term() {
+ return term;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 8993cf29bc..d27c178db1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -55,6 +55,7 @@ import
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
import
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalIndexScan;
import
org.apache.ignite.internal.sql.engine.rel.logical.IgniteLogicalTableScan;
@@ -481,7 +482,8 @@ public class IgniteTableImpl extends AbstractTable
implements InternalIgniteTabl
}
private ColocationGroup partitionedGroup() {
- List<List<String>> assignments = table.assignments().stream()
+ List<List<NodeWithTerm>> assignments = table.primaryReplicas().stream()
+ .map(primaryReplica -> new
NodeWithTerm(primaryReplica.node().name(), primaryReplica.term()))
.map(Collections::singletonList)
.collect(Collectors.toList());
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
index d9ebc797fb..a34258caad 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -223,7 +225,7 @@ public abstract class DistributionFunction {
public <RowT> Destination<RowT> destination(HashFunctionFactory<RowT>
hashFuncFactory, ColocationGroup m, ImmutableIntList k) {
assert m != null && !nullOrEmpty(m.assignments()) && !k.isEmpty();
- List<List<String>> assignments = m.assignments();
+ List<List<String>> assignments =
Commons.transform(m.assignments(), v -> Commons.transform(v,
NodeWithTerm::name));
if (IgniteUtils.assertionsEnabled()) {
for (List<String> assignment : assignments) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
index 2327fa28e2..2cd28e9225 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/BaseQueryContext.java
@@ -49,7 +49,6 @@ import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.QueryCancel;
@@ -162,8 +161,6 @@ public final class BaseQueryContext extends
AbstractQueryContext {
private final InternalTransaction tx;
- private final HybridTimestamp txTs;
-
private CalciteCatalogReader catalogReader;
private long plannerTimeout;
@@ -178,7 +175,6 @@ public final class BaseQueryContext extends
AbstractQueryContext {
Object[] parameters,
IgniteLogger log,
InternalTransaction tx,
- HybridTimestamp txTs,
long plannerTimeout
) {
super(Contexts.chain(cfg.getContext()));
@@ -191,7 +187,6 @@ public final class BaseQueryContext extends
AbstractQueryContext {
this.cancel = cancel;
this.parameters = parameters;
this.tx = tx;
- this.txTs = txTs;
this.plannerTimeout = plannerTimeout;
RelDataTypeSystem typeSys =
CALCITE_CONNECTION_CONFIG.typeSystem(RelDataTypeSystem.class,
cfg.getTypeSystem());
@@ -245,10 +240,6 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return tx;
}
- public HybridTimestamp transactionTime() {
- return txTs;
- }
-
public long plannerTimeout() {
return plannerTimeout;
}
@@ -290,8 +281,7 @@ public final class BaseQueryContext extends
AbstractQueryContext {
.logger(log)
.cancel(cancel)
.parameters(parameters)
- .transaction(tx)
- .transactionTime(txTs);
+ .transaction(tx);
}
/**
@@ -323,8 +313,6 @@ public final class BaseQueryContext extends
AbstractQueryContext {
private InternalTransaction tx;
- private HybridTimestamp txTs;
-
private long plannerTimeout;
public Builder frameworkConfig(FrameworkConfig frameworkCfg) {
@@ -357,18 +345,13 @@ public final class BaseQueryContext extends
AbstractQueryContext {
return this;
}
- public Builder transactionTime(HybridTimestamp txTs) {
- this.txTs = txTs;
- return this;
- }
-
public Builder plannerTimeout(long plannerTimeout) {
this.plannerTimeout = plannerTimeout;
return this;
}
public BaseQueryContext build() {
- return new BaseQueryContext(queryId, frameworkCfg, cancel,
parameters, log, tx, txTs, plannerTimeout);
+ return new BaseQueryContext(queryId, frameworkCfg, cancel,
parameters, log, tx, plannerTimeout);
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java
new file mode 100644
index 0000000000..232bfb72cf
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/LocalTxAttributesHolder.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Local holder of {@link InternalTransaction#id() id} and {@link
InternalTransaction#readTimestamp() readTimestamp} transaction attributes.
+ */
+// TODO IGNITE-17952 This class must not implement the transaction interface
and may be passed to the remote nodes.
+public class LocalTxAttributesHolder implements InternalTransaction {
+ /** Transaction id. */
+ private final UUID id;
+
+ /** Read timestamp. */
+ private final @Nullable HybridTimestamp readTimestamp;
+
+ /**
+ * Constructor.
+ *
+ * @param id Transaction id.
+ * @param readTimestamp Read timestamp.
+ */
+ public LocalTxAttributesHolder(UUID id, @Nullable HybridTimestamp
readTimestamp) {
+ this.readTimestamp = readTimestamp;
+ this.id = id;
+ }
+
+ @Override
+ public boolean isReadOnly() {
+ return readTimestamp != null;
+ }
+
+ @Override
+ public @Nullable HybridTimestamp readTimestamp() {
+ return readTimestamp;
+ }
+
+ @Override
+ public @NotNull UUID id() {
+ return id;
+ }
+
+ @Override
+ public void commit() throws TransactionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> commitAsync() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void rollback() throws TransactionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> rollbackAsync() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IgniteBiTuple<ClusterNode, Long>
enlistedNodeAndTerm(ReplicationGroupId replicationGroupId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TxState state() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean assignCommitPartition(ReplicationGroupId
replicationGroupId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ReplicationGroupId commitPartition() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public IgniteBiTuple<ClusterNode, Long> enlist(ReplicationGroupId
replicationGroupId, IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void enlistResultFuture(CompletableFuture<?> resultFuture) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index b23d82405c..9b1f98e566 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -36,6 +36,7 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -71,6 +72,7 @@ import
org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
@@ -134,6 +136,8 @@ public class StopCalciteModuleTest {
private final TestRevisionRegister testRevisionRegister = new
TestRevisionRegister();
+ private final ClusterNode localNode = new ClusterNode("mock-node-id",
NODE_NAME, null);
+
/**
* Before.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -142,9 +146,7 @@ public class StopCalciteModuleTest {
public void before(TestInfo testInfo) {
when(clusterSrvc.messagingService()).thenReturn(msgSrvc);
when(clusterSrvc.topologyService()).thenReturn(topologySrvc);
-
- ClusterNode node = new ClusterNode("mock-node-id", NODE_NAME, null);
- when(topologySrvc.localMember()).thenReturn(node);
+ when(topologySrvc.localMember()).thenReturn(localNode);
SchemaDescriptor schemaDesc = new SchemaDescriptor(
1,
@@ -221,6 +223,7 @@ public class StopCalciteModuleTest {
);
when(tbl.tableId()).thenReturn(UUID.randomUUID());
+ when(tbl.primaryReplicas()).thenReturn(List.of(new
PrimaryReplica(localNode, -1L)));
when(txManager.begin()).thenReturn(mock(InternalTransaction.class));
when(tbl.storage()).thenReturn(mock(MvTableStorage.class));
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 71b5ca1af5..c29818745b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -83,6 +83,7 @@ import
org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
import org.apache.ignite.internal.sql.engine.util.HashFunctionFactoryImpl;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -90,6 +91,7 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TopologyService;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -383,12 +385,17 @@ public class ExecutionServiceImplTest {
var schemaManagerMock = mock(SqlSchemaManager.class);
+ var clusterNode = new ClusterNode(UUID.randomUUID().toString(),
nodeName, NetworkAddress.from("127.0.0.1:1111"));
+
+ var topologyService = mock(TopologyService.class);
+
+ when(topologyService.localMember()).thenReturn(clusterNode);
+
when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
- var clusterNode = new ClusterNode(UUID.randomUUID().toString(),
nodeName, NetworkAddress.from("127.0.0.1:1111"));
var executionService = new ExecutionServiceImpl<>(
- clusterNode,
messageService,
+ topologyService,
(single, filter) -> single ?
List.of(nodeNames.get(ThreadLocalRandom.current().nextInt(nodeNames.size()))) :
nodeNames,
schemaManagerMock,
mock(DdlCommandHandler.class),
@@ -413,6 +420,7 @@ public class ExecutionServiceImplTest {
.defaultSchema(wrap(schema))
.build()
)
+ .transaction(new LocalTxAttributesHolder(UUID.randomUUID(),
null))
.logger(LOG)
.build();
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index af6d1daf2b..125760b94c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -117,8 +117,7 @@ public class RuntimeSortedIndexTest extends
IgniteAbstractTest {
"fake-test-node",
null,
ArrayRowHandler.INSTANCE,
- Map.of(),
- null
+ Map.of()
),
RelCollations.of(ImmutableIntList.copyOf(idxCols)),
(o1, o2) -> {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 7e81eb4ede..dfa70fe72a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -17,9 +17,6 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.util.ArrayDeque;
import java.util.Deque;
@@ -43,12 +40,12 @@ import
org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.sql.engine.util.LocalTxAttributesHolder;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.thread.LogUncaughtExceptionHandler;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
-import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
@@ -103,11 +100,9 @@ public class AbstractExecutionTest extends
IgniteAbstractTest {
FragmentDescription fragmentDesc = new FragmentDescription(0, null,
null, Long2ObjectMaps.emptyMap());
- InternalTransaction tx = mock(InternalTransaction.class);
-
when(tx.rollbackAsync()).thenReturn(CompletableFuture.completedFuture(null));
-
return new ExecutionContext<>(
BaseQueryContext.builder()
+ .transaction(new
LocalTxAttributesHolder(UUID.randomUUID(), null))
.logger(log)
.build(),
taskExecutor,
@@ -116,8 +111,7 @@ public class AbstractExecutionTest extends
IgniteAbstractTest {
"fake-test-node",
fragmentDesc,
ArrayRowHandler.INSTANCE,
- Map.of(),
- tx
+ Map.of()
);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
index 91526fc9ef..8e0850a577 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNodeExecutionTest.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ThreadLocalRandom;
@@ -59,6 +60,7 @@ import
org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
import org.apache.ignite.internal.sql.engine.exec.exp.RexImpTable;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
@@ -337,13 +339,13 @@ public class IndexScanNodeExecutionTest extends
AbstractExecutionTest {
//CHECKSTYLE:OFF:Indentation
Mockito.doAnswer(invocation -> {
if (key != null) {
- validateBound(indexDescriptor, schemaDescriptor,
invocation.getArgument(2));
+ validateBound(indexDescriptor, schemaDescriptor,
invocation.getArgument(3));
}
return dummyPublisher(partitionData(tableData,
schemaDescriptor, invocation.getArgument(0)));
})
.when(hashIndexMock)
- .lookup(Mockito.anyInt(), any(), any(), any());
+ .lookup(Mockito.anyInt(), (UUID) any(), any(), any(), any());
//CHECKSTYLE:ON:Indentation
IgniteIndex indexMock = mock(IgniteIndex.class);
@@ -382,15 +384,15 @@ public class IndexScanNodeExecutionTest extends
AbstractExecutionTest {
//CHECKSTYLE:OFF:Indentation
Mockito.doAnswer(invocation -> {
if (lowerBound != null) {
- validateBoundPrefix(indexDescriptor, schemaDescriptor,
invocation.getArgument(2));
+ validateBoundPrefix(indexDescriptor, schemaDescriptor,
invocation.getArgument(3));
}
if (upperBound != null) {
- validateBoundPrefix(indexDescriptor, schemaDescriptor,
invocation.getArgument(3));
+ validateBoundPrefix(indexDescriptor, schemaDescriptor,
invocation.getArgument(4));
}
return dummyPublisher(partitionData(tableData,
schemaDescriptor, invocation.getArgument(0)));
}).when(sortedIndexMock)
- .scan(Mockito.anyInt(), any(), any(), any(), Mockito.anyInt(),
any());
+ .scan(Mockito.anyInt(), (UUID) any(), any(), any(), any(),
Mockito.anyInt(), any());
//CHECKSTYLE:ON:Indentation
IgniteIndex indexMock = mock(IgniteIndex.class);
@@ -437,7 +439,7 @@ public class IndexScanNodeExecutionTest extends
AbstractExecutionTest {
ectx.rowHandler().factory(ectx.getTypeFactory(), rowType),
index,
new TestTable(rowType, schemaDescriptor),
- new int[]{0, 2},
+ List.of(new PartitionWithTerm(0, -1L), new
PartitionWithTerm(2, -1L)),
index.type() == Type.SORTED ? comp : null,
rangeIterable,
null,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index ee4680ed19..d5c781c7d6 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -488,7 +488,7 @@ public class MergeJoinExecutionTest extends
AbstractExecutionTest {
ExecutionContext<Object[]> ectx =
new
ExecutionContext<>(BaseQueryContext.builder().logger(log).build(), null, null,
null,
- null, null, ArrayRowHandler.INSTANCE, null, null);
+ null, null, ArrayRowHandler.INSTANCE, null);
ExpressionFactoryImpl<Object[]> expFactory = new
ExpressionFactoryImpl<>(ectx, typeFactory, SqlConformanceEnum.DEFAULT);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index d927e2e61a..6069a9edd5 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -21,23 +21,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collection;
+import java.util.List;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.metadata.PartitionWithTerm;
import org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest;
import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -48,12 +50,11 @@ import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -72,7 +73,9 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
int inBufSize = Commons.IN_BUFFER_SIZE;
- int[] parts = {0, 1, 2};
+ List<PartitionWithTerm> partsWithTerms = Stream.of(0, 1, 2)
+ .map(p -> new PartitionWithTerm(p, -1L))
+ .collect(Collectors.toList());
int probingCnt = 50;
@@ -93,7 +96,7 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
dataAmount = size;
- TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, tbl, parts, null, null, null);
+ TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, tbl, partsWithTerms, null, null, null);
RootNode<Object[]> root = new RootNode<>(ctx);
@@ -106,7 +109,7 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
++cnt;
}
- assertEquals(sizes[i++] * parts.length, cnt);
+ assertEquals(sizes[i++] * partsWithTerms.size(), cnt);
}
}
@@ -161,29 +164,39 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
}
@Override
- protected CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
- @NotNull InternalTransaction tx,
+ public Publisher<BinaryRow> scan(
int partId,
- long scanId,
- int batchSize,
+ UUID txId,
+ PrimaryReplica recipient,
@Nullable UUID indexId,
- @Nullable BinaryTuple exactKey,
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
int flags,
@Nullable BitSet columnsToInclude
) {
- int fillAmount = Math.min(dataAmount - processedPerPart[partId],
batchSize);
-
- Collection<BinaryRow> out = new ArrayList<>(fillAmount);
-
- for (int i = 0; i < fillAmount; ++i) {
- out.add(bbRow);
- }
-
- processedPerPart[partId] += fillAmount;
-
- return CompletableFuture.completedFuture(out);
+ return s -> {
+ s.onSubscribe(new Subscription() {
+ @Override
+ public void request(long n) {
+ int fillAmount = Math.min(dataAmount -
processedPerPart[partId], (int) n);
+
+ processedPerPart[partId] += fillAmount;
+
+ for (int i = 0; i < fillAmount; ++i) {
+ s.onNext(bbRow);
+ }
+
+ if (processedPerPart[partId] == dataAmount) {
+ s.onComplete();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // No-op.
+ }
+ });
+ };
}
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 7885aa5b5e..be091a1c24 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.framework;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
-import static org.mockito.Mockito.mock;
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.util.ArrayList;
@@ -45,7 +44,6 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.network.ClusterNode;
/**
@@ -208,8 +206,7 @@ public class TestBuilders {
node.name(),
description,
ArrayRowHandler.INSTANCE,
- Map.of(),
- mock(InternalTransaction.class)
+ Map.of()
);
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index 8968f50fde..94277e04a4 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -109,8 +109,8 @@ public class TestNode implements LifecycleAware {
));
executionService = registerService(new ExecutionServiceImpl<>(
- topologyService.localMember(),
messageService,
+ topologyService,
new MappingServiceImpl(topologyService),
schemaManager,
mock(DdlCommandHandler.class),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
index 6022b24811..4bfad4f49b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AbstractPlannerTest.java
@@ -130,6 +130,7 @@ import
org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -1229,6 +1230,13 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
throw new AssertionError("Should not be called");
}
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> lookup(int partId, UUID txId,
PrimaryReplica recipient, BinaryTuple key,
+ @Nullable BitSet columns) {
+ throw new AssertionError("Should not be called");
+ }
+
/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(int partId, HybridTimestamp
timestamp, ClusterNode recipient, BinaryTuple key, BitSet columns) {
@@ -1247,6 +1255,12 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
@Nullable BinaryTuplePrefix leftBound, @Nullable
BinaryTuplePrefix rightBound, int flags, BitSet columnsToInclude) {
throw new AssertionError("Should not be called");
}
+
+ @Override
+ public Publisher<BinaryRow> scan(int partId, UUID txId, PrimaryReplica
recipient, @Nullable BinaryTuplePrefix leftBound,
+ @Nullable BinaryTuplePrefix rightBound, int flags, @Nullable
BitSet columnsToInclude) {
+ throw new AssertionError("Should not be called");
+ }
}
static class TestHashIndex implements Index<IndexDescriptor> {
@@ -1296,6 +1310,13 @@ public abstract class AbstractPlannerTest extends
IgniteAbstractTest {
throw new AssertionError("Should not be called");
}
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> lookup(int partId, UUID txId,
PrimaryReplica recipient, BinaryTuple key,
+ @Nullable BitSet columns) {
+ throw new AssertionError("Should not be called");
+ }
+
/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> lookup(int partId, HybridTimestamp
timestamp, ClusterNode recipient, BinaryTuple key, BitSet columns) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
index b4f081d56f..b008e65d44 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
@@ -45,6 +45,7 @@ import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Util;
import org.apache.ignite.internal.index.ColumnCollation;
import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.metadata.NodeWithTerm;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.sql.engine.prepare.IgnitePlanner;
import org.apache.ignite.internal.sql.engine.prepare.MappingQueryContext;
@@ -82,6 +83,8 @@ import org.junit.jupiter.api.condition.OS;
public class PlannerTest extends AbstractPlannerTest {
private static List<String> NODES;
+ private static List<NodeWithTerm> NODES_WITH_TERM;
+
/**
* Init.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -89,9 +92,13 @@ public class PlannerTest extends AbstractPlannerTest {
@BeforeAll
public static void init() {
NODES = new ArrayList<>(4);
+ NODES_WITH_TERM = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
- NODES.add(UUID.randomUUID().toString());
+ String nodeName = UUID.randomUUID().toString();
+
+ NODES.add(nodeName);
+ NODES_WITH_TERM.add(new NodeWithTerm(nodeName, 0L));
}
}
@@ -113,11 +120,11 @@ public class PlannerTest extends AbstractPlannerTest {
@Override
public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
- select(NODES, 0, 1),
- select(NODES, 1, 2),
- select(NODES, 2, 0),
- select(NODES, 0, 1),
- select(NODES, 1, 2)
+ select(NODES_WITH_TERM, 0, 1),
+ select(NODES_WITH_TERM, 1, 2),
+ select(NODES_WITH_TERM, 2, 0),
+ select(NODES_WITH_TERM, 0, 1),
+ select(NODES_WITH_TERM, 1, 2)
));
}
@@ -134,11 +141,11 @@ public class PlannerTest extends AbstractPlannerTest {
.build(), "PROJECT") {
@Override public ColocationGroup
colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
- select(NODES, 0, 1),
- select(NODES, 1, 2),
- select(NODES, 2, 0),
- select(NODES, 0, 1),
- select(NODES, 1, 2)));
+ select(NODES_WITH_TERM, 0, 1),
+ select(NODES_WITH_TERM, 1, 2),
+ select(NODES_WITH_TERM, 2, 0),
+ select(NODES_WITH_TERM, 0, 1),
+ select(NODES_WITH_TERM, 1, 2)));
}
@Override public IgniteDistribution distribution() {
@@ -295,10 +302,10 @@ public class PlannerTest extends AbstractPlannerTest {
@Override
public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
- select(NODES, 1, 2),
- select(NODES, 2, 3),
- select(NODES, 3, 0),
- select(NODES, 0, 1)
+ select(NODES_WITH_TERM, 1, 2),
+ select(NODES_WITH_TERM, 2, 3),
+ select(NODES_WITH_TERM, 3, 0),
+ select(NODES_WITH_TERM, 0, 1)
));
}
@@ -376,9 +383,9 @@ public class PlannerTest extends AbstractPlannerTest {
@Override
public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
- select(NODES, 0),
- select(NODES, 1),
- select(NODES, 2)
+ select(NODES_WITH_TERM, 0),
+ select(NODES_WITH_TERM, 1),
+ select(NODES_WITH_TERM, 2)
));
}
@@ -521,9 +528,9 @@ public class PlannerTest extends AbstractPlannerTest {
@Override
public ColocationGroup colocationGroup(MappingQueryContext ctx) {
return ColocationGroup.forAssignments(Arrays.asList(
- select(NODES, 1),
- select(NODES, 2),
- select(NODES, 3)
+ select(NODES_WITH_TERM, 1),
+ select(NODES_WITH_TERM, 2),
+ select(NODES_WITH_TERM, 3)
));
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 2a0b44b60c..b67ecd4c26 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.distributed;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -37,6 +38,7 @@ import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -61,6 +63,7 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TxState;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -87,7 +90,7 @@ public abstract class ItAbstractInternalTableScanTest extends
IgniteAbstractTest
private MvPartitionStorage mockStorage;
/** Internal table to test. */
- protected InternalTable internalTbl;
+ DummyInternalTableImpl internalTbl;
private final HybridClock clock = new HybridClockImpl();
@@ -196,7 +199,9 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
return cursor;
});
- scan(0, null).subscribe(new Subscriber<>() {
+ InternalTransaction tx = startTx();
+
+ scan(0, tx).subscribe(new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
@@ -212,6 +217,9 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
public void onError(Throwable throwable) {
gotException.set(throwable);
subscriberFinishedLatch.countDown();
+
+ // Rollback the transaction manually, because only ID of the
explicit transaction is passed to the internal table.
+ tx.rollback();
}
@Override
@@ -220,9 +228,13 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
}
});
- subscriberFinishedLatch.await();
+ assertTrue(subscriberFinishedLatch.await(10, TimeUnit.SECONDS),
"count=" + subscriberFinishedLatch.getCount());
assertEquals(gotException.get().getCause().getClass(),
NoSuchElementException.class);
+
+ if (tx != null) {
+ assertEquals(TxState.ABORTED, tx.state());
+ }
}
/**
@@ -237,7 +249,9 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
when(mockStorage.scan(any(HybridTimestamp.class))).thenThrow(new
StorageException("Some storage exception"));
- scan(0, null).subscribe(new Subscriber<>() {
+ InternalTransaction tx = startTx();
+
+ scan(0, tx).subscribe(new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
@@ -253,6 +267,9 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
public void onError(Throwable throwable) {
gotException.set(throwable);
gotExceptionLatch.countDown();
+
+ // Rollback the transaction manually, because only ID of the
explicit transaction is passed to the internal table.
+ tx.rollback();
}
@Override
@@ -264,8 +281,11 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
gotExceptionLatch.await();
assertEquals(gotException.get().getCause().getClass(),
StorageException.class);
- }
+ if (tx != null) {
+ assertEquals(TxState.ABORTED, tx.state());
+ }
+ }
/**
* Checks that {@link IllegalArgumentException} is thrown in case of
invalid partition.
@@ -398,7 +418,9 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
// The latch that allows to await Subscriber.onError() before
asserting test invariants.
CountDownLatch subscriberAllDataAwaitLatch = new CountDownLatch(1);
- scan(0, null).subscribe(new Subscriber<>() {
+ InternalTransaction tx = startTx();
+
+ scan(0, tx).subscribe(new Subscriber<>() {
private Subscription subscription;
@Override
@@ -438,6 +460,19 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
for (int i = 0; i < expItems.size(); i++) {
assertArrayEquals(expItems.get(i), gotItems.get(i));
}
+
+ if (tx != null) {
+ tx.commit();
+ }
+ }
+
+ /**
+ * Start transaction if needed.
+ *
+ * @return Started transaction or {@code null}.
+ */
+ protected InternalTransaction startTx() {
+ return null;
}
/**
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 3f6f15cf13..3f1fd642cc 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -18,9 +18,15 @@
package org.apache.ignite.distributed;
import java.util.concurrent.Flow.Publisher;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
/**
* Tests for {@link InternalTable#scan(int,
org.apache.ignite.internal.tx.InternalTransaction)}.
@@ -29,6 +35,28 @@ public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableSca
/** {@inheritDoc} */
@Override
protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
- return internalTbl.scan(part, tx);
+ if (tx == null) {
+ return internalTbl.scan(part, null);
+ }
+
+ IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(internalTbl.tableId(), part));
+ PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+
+ return internalTbl.scan(part, tx.id(), recipient, null, null, null, 0,
null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected InternalTransaction startTx() {
+ InternalTransaction tx = internalTbl.txManager().begin();
+
+ TablePartitionId tblPartId = new
TablePartitionId(internalTbl.tableId(), ((TablePartitionId)
internalTbl.groupId()).partitionId());
+ RaftGroupService raftSvc =
internalTbl.partitionRaftGroupService(tblPartId.partitionId());
+ long term =
IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term();
+
+ tx.assignCommitPartition(tblPartId);
+ tx.enlist(tblPartId, new
IgniteBiTuple<>(internalTbl.leaderAssignment(tblPartId.partitionId()), term));
+
+ return tx;
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index cb248a0cf7..8c4a67e7f0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.NotNull;
@@ -296,6 +297,29 @@ public interface InternalTable extends ManuallyCloseable {
@Nullable BitSet columnsToInclude
);
+ /**
+ * Scans given partition index, providing {@link Publisher} that
reactively notifies about partition rows.
+ *
+ * @param partId The partition.
+ * @param txId Transaction id.
+ * @param recipient Primary replica that will handle given get request.
+ * @param lowerBound Lower search bound.
+ * @param upperBound Upper search bound.
+ * @param flags Control flags. See {@link
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
+ * @param columnsToInclude Row projection.
+ * @return {@link Publisher} that reactively notifies about partition rows.
+ */
+ Publisher<BinaryRow> scan(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ @Nullable UUID indexId,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ int flags,
+ @Nullable BitSet columnsToInclude
+ );
+
/**
* Scans given partition index, providing {@link Publisher} that
reactively notifies about partition rows.
*
@@ -348,7 +372,9 @@ public interface InternalTable extends ManuallyCloseable {
* @param key Key to search.
* @param columnsToInclude Row projection.
* @return {@link Publisher} that reactively notifies about partition rows.
+ * @deprecated IGNITE-17952 Use {@link #lookup(int, UUID, PrimaryReplica,
UUID, BinaryTuple, BitSet)} instead.
*/
+ @Deprecated
Publisher<BinaryRow> lookup(
int partId,
@Nullable InternalTransaction tx,
@@ -357,6 +383,27 @@ public interface InternalTable extends ManuallyCloseable {
@Nullable BitSet columnsToInclude
);
+ /**
+ * Lookup rows corresponding to the given key given partition index,
providing {@link Publisher}
+ * that reactively notifies about partition rows.
+ *
+ * @param partId The partition.
+ * @param txId Transaction id.
+ * @param recipient Primary replica that will handle given get request.
+ * @param indexId Index id.
+ * @param key Key to search.
+ * @param columnsToInclude Row projection.
+ * @return {@link Publisher} that reactively notifies about partition rows.
+ */
+ Publisher<BinaryRow> lookup(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ UUID indexId,
+ BinaryTuple key,
+ @Nullable BitSet columnsToInclude
+ );
+
/**
* Gets a count of partitions of the table.
*
@@ -374,6 +421,13 @@ public interface InternalTable extends ManuallyCloseable {
*/
List<String> assignments();
+ /**
+ * Gets a list of current primary replicas for each partition.
+ *
+ * @return List of current primary replicas for each partition.
+ */
+ List<PrimaryReplica> primaryReplicas();
+
/**
* Returns cluster node that is the leader of the corresponding partition
group or throws an exception if
* it cannot be found.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 554498ac9d..9dc4315734 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteFiveFunction;
@@ -350,7 +351,7 @@ public class InternalTableImpl implements InternalTable {
* @param columnsToInclude Row projection.
* @return Batch of retrieved rows.
*/
- protected CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
+ private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
@NotNull InternalTransaction tx,
int partId,
long scanId,
@@ -853,6 +854,19 @@ public class InternalTableImpl implements InternalTable {
return scan(partId, tx, indexId, key, null, null, 0, columnsToInclude);
}
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> lookup(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ UUID indexId,
+ BinaryTuple key,
+ @Nullable BitSet columnsToInclude
+ ) {
+ return scan(partId, txId, recipient, indexId, key, null, null, 0,
columnsToInclude);
+ }
+
/** {@inheritDoc} */
@Override
public Publisher<BinaryRow> scan(
@@ -967,6 +981,57 @@ public class InternalTableImpl implements InternalTable {
);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Publisher<BinaryRow> scan(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ @Nullable UUID indexId,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ int flags,
+ @Nullable BitSet columnsToInclude
+ ) {
+ return scan(partId, txId, recipient, indexId, null, lowerBound,
upperBound, flags, columnsToInclude);
+ }
+
+ private Publisher<BinaryRow> scan(
+ int partId,
+ UUID txId,
+ PrimaryReplica recipient,
+ @Nullable UUID indexId,
+ @Nullable BinaryTuple exactKey,
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ int flags,
+ @Nullable BitSet columnsToInclude
+ ) {
+ return new PartitionScanPublisher(
+ (scanId, batchSize) -> {
+ ReplicationGroupId partGroupId =
partitionMap.get(partId).groupId();
+
+ ReadWriteScanRetrieveBatchReplicaRequest request =
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
+ .groupId(partGroupId)
+ .transactionId(txId)
+ .scanId(scanId)
+ .indexToUse(indexId)
+ .exactKey(exactKey)
+ .lowerBound(lowerBound)
+ .upperBound(upperBound)
+ .flags(flags)
+ .columnsToInclude(columnsToInclude)
+ .batchSize(batchSize)
+ .term(recipient.term())
+ .build();
+
+ return replicaSvc.invoke(recipient.node(), request);
+ },
+ // TODO: IGNITE-17666 Close cursor tx finish.
+ Function.identity());
+ }
+
/**
* Validates partition index.
*
@@ -1015,6 +1080,30 @@ public class InternalTableImpl implements InternalTable {
.collect(Collectors.toList());
}
+ /** {@inheritDoc} */
+ @Override
+ public List<PrimaryReplica> primaryReplicas() {
+ List<Entry<RaftGroupService>> entries = new
ArrayList<>(partitionMap.int2ObjectEntrySet());
+ List<CompletableFuture<LeaderWithTerm>> futs = new ArrayList<>();
+
+ entries.sort(Comparator.comparingInt(Entry::getIntKey));
+
+ for (Entry<RaftGroupService> e : entries) {
+ futs.add(e.getValue().refreshAndGetLeaderWithTerm());
+ }
+
+ List<PrimaryReplica> primaryReplicas = new ArrayList<>(entries.size());
+
+ for (CompletableFuture<LeaderWithTerm> fut : futs) {
+ LeaderWithTerm leaderWithTerm = fut.join();
+ ClusterNode primaryNode =
clusterNodeResolver.apply(leaderWithTerm.leader().consistentId());
+
+ primaryReplicas.add(new PrimaryReplica(primaryNode,
leaderWithTerm.term()));
+ }
+
+ return primaryReplicas;
+ }
+
@Override
public ClusterNode leaderAssignment(int partition) {
awaitLeaderInitialization();
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
similarity index 50%
copy from
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
copy to
modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
index 3f6f15cf13..e0114ea1bb 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/utils/PrimaryReplica.java
@@ -15,20 +15,46 @@
* limitations under the License.
*/
-package org.apache.ignite.distributed;
+package org.apache.ignite.internal.utils;
-import java.util.concurrent.Flow.Publisher;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
/**
- * Tests for {@link InternalTable#scan(int,
org.apache.ignite.internal.tx.InternalTransaction)}.
+ * Tuple representing primary replica node with current term.
*/
-public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableScanTest {
- /** {@inheritDoc} */
- @Override
- protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
- return internalTbl.scan(part, tx);
+public class PrimaryReplica {
+ /** Primary replica node. */
+ private final ClusterNode node;
+
+ /** Replica term. */
+ private final long term;
+
+ /**
+ * Constructor.
+ *
+ * @param node Primary replica node.
+ * @param term Replica term.
+ */
+ public PrimaryReplica(ClusterNode node, long term) {
+ this.node = node;
+ this.term = term;
+ }
+
+ /**
+ * Gets primary replica node.
+ *
+ * @return Primary replica node.
+ */
+ public ClusterNode node() {
+ return node;
+ }
+
+ /**
+ * Gets replica term.
+ *
+ * @return Replica term.
+ */
+ public long term() {
+ return term;
}
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3346feb172..f8e4846776 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -317,6 +317,15 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
return groupId;
}
+ /**
+ * Gets the transaction manager that is bound to the table.
+ *
+ * @return Transaction manager.
+ */
+ public TxManager txManager() {
+ return txManager;
+ }
+
/** {@inheritDoc} */
@Override
public @NotNull String name() {