This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0fef619960e IGNITE-26913 Sql. InternalTable interface refactoring
(#6887)
0fef619960e is described below
commit 0fef619960e17fbcdd6dcf1246f7dfe8609da281
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Nov 3 19:48:46 2025 +0300
IGNITE-26913 Sql. InternalTable interface refactoring (#6887)
---
.../ignite/client/fakes/FakeInternalTable.java | 67 +---
.../testframework/matchers/DelegatingMatcher.java | 55 ++++
.../ItPrimaryReplicaChoiceTest.java | 27 +-
.../ItTruncateRaftLogAndRestartNodesTest.java | 6 +-
.../ignite/internal/table/ItInternalTableTest.java | 4 +-
.../ignite/internal/table/ItTableScanTest.java | 144 ++++-----
.../sql/engine/exec/ScannableTableImpl.java | 139 +++-----
.../engine/exec/rel/ScannableTableSelfTest.java | 352 +++++++--------------
.../exec/rel/TableScanNodeExecutionTest.java | 14 +-
.../sql/engine/framework/NoOpTransaction.java | 5 +-
.../ItInternalTableReadOnlyScanTest.java | 4 +-
.../ItInternalTableReadWriteScanTest.java | 12 +-
.../ignite/internal/table/IndexScanCriteria.java | 90 ++++++
.../ignite/internal/table/InternalTable.java | 135 ++------
.../ignite/internal/table/OperationContext.java | 60 ++++
.../apache/ignite/internal/table/TxContext.java | 197 ++++++++++++
.../distributed/storage/InternalTableImpl.java | 254 +++++++--------
.../distributed/storage/InternalTableImplTest.java | 3 +-
.../ignite/internal/table/TxAbstractTest.java | 21 +-
.../tx/distributed/ItTransactionRecoveryTest.java | 6 +-
20 files changed, 821 insertions(+), 774 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 381b8de9bca..a7d0ac8319c 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -52,16 +52,16 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.DataStreamerReceiverDescriptor;
import org.apache.ignite.table.QualifiedName;
@@ -405,12 +405,7 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
@Override
public Publisher<BinaryRow> scan(
int partId,
- @Nullable InternalTransaction tx,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- BitSet columnsToInclude
+ @Nullable InternalTransaction tx
) {
throw new IgniteInternalException(new
OperationNotSupportedException());
}
@@ -418,15 +413,9 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
@Override
public Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- ReplicationGroupId commitPartition,
- UUID txCoordinatorId,
- PrimaryReplica recipient,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
+ @Nullable InternalTransaction tx,
+ int indexId,
+ IndexScanCriteria.Range criteria
) {
throw new IgniteInternalException(new
OperationNotSupportedException());
}
@@ -434,55 +423,21 @@ public class FakeInternalTable implements InternalTable,
StreamerReceiverRunner
@Override
public Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
- InternalClusterNode recipientNode,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId) {
- throw new IgniteInternalException(new
OperationNotSupportedException());
- }
-
- @Override
- public Publisher<BinaryRow> scan(
- int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
InternalClusterNode recipientNode,
- UUID txCoordinatorId
- ) {
- return null;
- }
-
- @Override
- public Publisher<BinaryRow> lookup(
- int partId,
- UUID txId,
- ReplicationGroupId commitPartition,
- UUID txCoordinatorId,
- PrimaryReplica recipient,
int indexId,
- BinaryTuple key,
- @Nullable BitSet columnsToInclude
+ IndexScanCriteria criteria,
+ OperationContext operationContext
) {
throw new IgniteInternalException(new
OperationNotSupportedException());
}
@Override
- public Publisher<BinaryRow> lookup(
+ public Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
InternalClusterNode recipientNode,
- int indexId,
- BinaryTuple key,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId
+ OperationContext operationContext
) {
- throw new IgniteInternalException(new
OperationNotSupportedException());
+ return null;
}
@Override public TxStateStorage txStateStorage() {
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java
new file mode 100644
index 00000000000..fad7c53915b
--- /dev/null
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import java.util.function.Function;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matches an object against a delegate matcher, transforming the object
before matching.
+ *
+ * @param <O> Object type.
+ * @param <T> Transformed object type.
+ */
+public class DelegatingMatcher<O, T> extends TypeSafeMatcher<O> {
+ /** Creates a new matcher instance. */
+ public static <O, T> DelegatingMatcher<O, T> has(Function<O, T>
transformer, Matcher<T> delegate) {
+ return new DelegatingMatcher<>(transformer, delegate);
+ }
+
+ private final Matcher<T> delegate;
+
+ private final Function<O, T> transformer;
+
+ private DelegatingMatcher(Function<O, T> transformer, Matcher<T> delegate)
{
+ this.delegate = delegate;
+ this.transformer = transformer;
+ }
+
+ @Override
+ protected boolean matchesSafely(O item) {
+ return delegate.matches(transformer.apply(item));
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ delegate.describeTo(description);
+ }
+}
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index c5c6fb7fbda..e93028865c9 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -68,13 +68,15 @@ import
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import
org.apache.ignite.internal.storage.impl.schema.TestProfileConfigurationSchema;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.flow.TestFlowUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
-import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
@@ -340,19 +342,19 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
node(0).cluster().nodes().stream().filter(node ->
node.id().equals(primaryId)).findAny().get()
);
+ OperationContext opCtx =
OperationContext.create(TxContext.readOnly(tx));
+
if (idxId == null) {
- publisher = internalTable.scan(PART_ID, tx.id(),
tx.readTimestamp(), primaryNode, tx.coordinatorId());
+ publisher = internalTable.scan(PART_ID, primaryNode, opCtx);
} else if (exactKey == null) {
- publisher = internalTable.scan(PART_ID, tx.id(),
tx.readTimestamp(), primaryNode, idxId, null, null, 0, null,
- tx.coordinatorId());
+ publisher = internalTable.scan(PART_ID, primaryNode, idxId,
IndexScanCriteria.unbounded(), opCtx);
} else {
- publisher = internalTable.lookup(PART_ID, tx.id(),
tx.readTimestamp(), primaryNode, idxId, exactKey, null,
- tx.coordinatorId());
+ publisher = internalTable.scan(PART_ID, primaryNode, idxId,
IndexScanCriteria.lookup(exactKey), opCtx);
}
} else if (idxId == null) {
publisher = unwrappedTable.internalTable().scan(PART_ID, tx);
} else if (exactKey == null) {
- publisher = unwrappedTable.internalTable().scan(PART_ID, tx,
idxId, null, null, 0, null);
+ publisher = unwrappedTable.internalTable().scan(PART_ID, tx,
idxId, IndexScanCriteria.unbounded());
} else {
ReadWriteTransactionImpl rwTx = Wrappers.unwrap(tx,
ReadWriteTransactionImpl.class);
@@ -371,15 +373,12 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
node(0).cluster().nodes().stream().filter(node ->
node.id().equals(primaryId)).findAny().get()
);
- publisher = unwrappedTable.internalTable().lookup(
+ publisher = unwrappedTable.internalTable().scan(
PART_ID,
- rwTx.id(),
- rwTx.commitPartition(),
- rwTx.coordinatorId(),
- new PrimaryReplica(primaryNode,
primaryReplicaFut.get().getStartTime().longValue()),
+ primaryNode,
idxId,
- exactKey,
- null
+ IndexScanCriteria.lookup(exactKey),
+ OperationContext.create(TxContext.readWrite(rwTx,
primaryReplicaFut.get().getStartTime().longValue()))
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
index 193a4f55bf6..955b83a8c36 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
@@ -60,7 +60,9 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -339,10 +341,8 @@ public class ItTruncateRaftLogAndRestartNodesTest extends
ClusterPerTestIntegrat
return internalTableImpl.scan(
partitionId,
- roTx.id(),
- roTx.readTimestamp(),
recipientNode,
- roTx.coordinatorId()
+ OperationContext.create(TxContext.readOnly(roTx))
);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
index 437957138a2..e2bfb980af6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
@@ -549,8 +549,10 @@ public class ItInternalTableTest extends
ClusterPerClassIntegrationTest {
InternalTransaction roTx =
(InternalTransaction) node.transactions().begin(new
TransactionOptions().readOnly(true));
+ OperationContext operationContext =
OperationContext.create(TxContext.readOnly(roTx));
+
for (int i = 0; i < parts; i++) {
- Publisher<BinaryRow> res = internalTable.scan(i, roTx.id(),
node.clock().now(), node.node(), roTx.coordinatorId());
+ Publisher<BinaryRow> res = internalTable.scan(i, node.node(),
operationContext);
res.subscribe(new Subscriber<>() {
@Override
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index c6f160be999..1cb99c0204e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -81,7 +81,6 @@ import
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.KeyValueView;
@@ -233,21 +232,18 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
List<BinaryRow> scannedRows = new ArrayList<>();
- PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx1);
+ ReplicationGroupId replicationGroupId = replicationGroup(PART_ID);
+ PendingTxPartitionEnlistment enlistment =
tx1.enlistedPartition(replicationGroupId);
+ InternalClusterNode recipient =
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx1,
internalTable.scan(
PART_ID,
- tx1.id(),
- tx1.commitPartition(),
- tx1.coordinatorId(),
recipient,
sortedIndexId,
- null,
- null,
- 0,
- null
+ IndexScanCriteria.unbounded(),
+ OperationContext.create(TxContext.readWrite(tx1,
enlistment.consistencyToken()))
)
);
@@ -293,7 +289,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
List<BinaryRow> scannedRows = new ArrayList<>();
- Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortedIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortedIndexId, IndexScanCriteria.unbounded());
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -469,7 +465,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
List<BinaryRow> scannedRows = new ArrayList<>();
- Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
null, null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -505,7 +501,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest
{
assertEquals(ROW_IDS.size(), scannedRows.size());
- var pub = internalTable.scan(PART_ID, null, null, null, null, 0, null);
+ var pub = internalTable.scan(PART_ID, null);
assertEquals(ROW_IDS.size() + txOpFut.get(), scanAllRows(pub).size());
}
@@ -520,21 +516,18 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
- PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+ ReplicationGroupId replicationGroupId = replicationGroup(PART_ID);
+ PendingTxPartitionEnlistment enlistment =
tx.enlistedPartition(replicationGroupId);
+ InternalClusterNode recipient =
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx,
internalTable.scan(
PART_ID,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
recipient,
sortedIndexId,
- null,
- null,
- 0,
- null
+ IndexScanCriteria.unbounded(),
+ OperationContext.create(TxContext.readWrite(tx,
enlistment.consistencyToken()))
)
);
@@ -569,15 +562,10 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
tx,
internalTable.scan(
PART_ID,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
recipient,
sortedIndexId,
- null,
- null,
- 0,
- null
+ IndexScanCriteria.unbounded(),
+ OperationContext.create(TxContext.readWrite(tx,
enlistment.consistencyToken()))
)
);
@@ -594,31 +582,29 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
public void testScanWithUpperBound() throws Exception {
KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
- BinaryTuplePrefix lowBound = BinaryTuplePrefix.fromBinaryTuple(
+ BinaryTuplePrefix lowerBound = BinaryTuplePrefix.fromBinaryTuple(
new BinaryTuple(1, new
BinaryTupleBuilder(1).appendInt(5).build())
);
BinaryTuplePrefix upperBound = BinaryTuplePrefix.fromBinaryTuple(
new BinaryTuple(1, new
BinaryTupleBuilder(1).appendInt(9).build())
);
- int soredIndexId = getSortedIndexId();
+ int sortedIndexId = getSortedIndexId();
InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false,
LONG_RUNNING_TX_TIMEOUT_MILLIS);
- PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+
+ ReplicationGroupId replicationGroupId = replicationGroup(PART_ID);
+ PendingTxPartitionEnlistment enlistment =
tx.enlistedPartition(replicationGroupId);
+ InternalClusterNode recipient =
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx,
internalTable.scan(
PART_ID,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
recipient,
- soredIndexId,
- lowBound,
- upperBound,
- LESS_OR_EQUAL | GREATER_OR_EQUAL,
- null
+ sortedIndexId,
+ IndexScanCriteria.range(lowerBound, upperBound,
LESS_OR_EQUAL | GREATER_OR_EQUAL),
+ OperationContext.create(TxContext.readWrite(tx,
enlistment.consistencyToken()))
)
);
@@ -644,15 +630,10 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
tx,
internalTable.scan(
PART_ID,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
recipient,
- soredIndexId,
- lowBound,
- upperBound,
- LESS_OR_EQUAL | GREATER_OR_EQUAL,
- null
+ sortedIndexId,
+ IndexScanCriteria.range(lowerBound, upperBound,
LESS_OR_EQUAL | GREATER_OR_EQUAL),
+ OperationContext.create(TxContext.readWrite(tx,
enlistment.consistencyToken()))
)
);
@@ -669,11 +650,8 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
Publisher<BinaryRow> publisher2 = internalTable.scan(
PART_ID,
null,
- soredIndexId,
- lowBound,
- upperBound,
- LESS_OR_EQUAL | GREATER_OR_EQUAL,
- null
+ sortedIndexId,
+ IndexScanCriteria.range(lowerBound, upperBound, LESS_OR_EQUAL
| GREATER_OR_EQUAL)
);
List<BinaryRow> scannedRows2 = scanAllRows(publisher2);
@@ -697,21 +675,18 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
InternalTransaction tx = startTxWithEnlistedPartition(PART_ID,
false);
try {
- PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+ ReplicationGroupId replicationGroupId =
replicationGroup(PART_ID);
+ PendingTxPartitionEnlistment enlistment =
tx.enlistedPartition(replicationGroupId);
+ InternalClusterNode recipient =
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
Publisher<BinaryRow> publisher = new
RollbackTxOnErrorPublisher<>(
tx,
internalTable.scan(
PART_ID,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
recipient,
sortedIndexId,
- null,
- null,
- 0,
- null
+ IndexScanCriteria.unbounded(),
+
OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken()))
)
);
@@ -740,15 +715,10 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
tx,
internalTable.scan(
PART_ID,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
recipient,
sortedIndexId,
- null,
- null,
- 0,
- null
+ IndexScanCriteria.unbounded(),
+
OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken()))
)
);
@@ -803,13 +773,13 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
tx = (InternalTransaction)
CLUSTER.aliveNode().transactions().begin(new
TransactionOptions().readOnly(true));
- publisher = internalTable.scan(PART_ID, tx.id(),
ignite.clock().now(), recipientNode, tx.coordinatorId());
+ publisher = internalTable.scan(PART_ID, recipientNode,
OperationContext.create(TxContext.readOnly(tx)));
} else {
if (!implicit) {
tx = (InternalTransaction)
CLUSTER.aliveNode().transactions().begin();
}
- publisher = internalTable.scan(PART_ID, tx, null, null, null, 0,
null);
+ publisher = internalTable.scan(PART_ID, tx);
}
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -866,25 +836,25 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
.map(ClusterNodeImpl::fromPublicClusterNode)
.orElseThrow();
+ OperationContext operationContext =
OperationContext.create(TxContext.readOnly(tx));
+
//noinspection DataFlowIssue
- publisher = internalTable.scan(PART_ID, tx.id(),
tx.readTimestamp(), node0, sortedIndexId, null, null, 0, null,
- tx.coordinatorId());
+ publisher = internalTable.scan(PART_ID, node0, sortedIndexId,
IndexScanCriteria.unbounded(), operationContext);
} else {
- PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+ ReplicationGroupId replicationGroupId =
replicationGroup(PART_ID);
+ PendingTxPartitionEnlistment enlistment =
tx.enlistedPartition(replicationGroupId);
+ InternalClusterNode recipient =
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
+
+ OperationContext operationContext =
OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken()));
publisher = new RollbackTxOnErrorPublisher<>(
tx,
internalTable.scan(
PART_ID,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
recipient,
sortedIndexId,
- null,
- null,
- 0,
- null
+ IndexScanCriteria.unbounded(),
+ operationContext
)
);
}
@@ -898,22 +868,20 @@ public class ItTableScanTest extends
BaseSqlIntegrationTest {
}
}
- private PrimaryReplica getPrimaryReplica(int partId, InternalTransaction
tx) {
- ReplicationGroupId replicationGroupId = colocationEnabled()
- ? new ZonePartitionId(table.zoneId(), partId)
- : new TablePartitionId(table.tableId(), partId);
-
- PendingTxPartitionEnlistment enlistment =
tx.enlistedPartition(replicationGroupId);
-
+ private static InternalClusterNode getNodeByConsistentId(String
nodeConsistentId) {
IgniteImpl ignite = unwrapIgniteImpl(CLUSTER.aliveNode());
- InternalClusterNode primaryNode = ignite.cluster().nodes().stream()
- .filter(n ->
n.name().equals(enlistment.primaryNodeConsistentId()))
+ return ignite.cluster().nodes().stream()
+ .filter(n -> n.name().equals(nodeConsistentId))
.map(ClusterNodeImpl::fromPublicClusterNode)
.findAny()
.orElseThrow();
+ }
- return new PrimaryReplica(primaryNode, enlistment.consistencyToken());
+ private ReplicationGroupId replicationGroup(int partId) {
+ return colocationEnabled()
+ ? new ZonePartitionId(table.zoneId(), partId)
+ : new TablePartitionId(table.tableId(), partId);
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index 76010181f03..2256e7047fe 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -18,23 +18,28 @@
package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER;
import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
+import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS;
import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
+import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.subscription.TransformingPublisher;
-import org.apache.ignite.internal.utils.PrimaryReplica;
import org.jetbrains.annotations.Nullable;
/**
@@ -56,35 +61,15 @@ public class ScannableTableImpl implements ScannableTable {
@Override
public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx,
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory, int @Nullable [] requiredColumns) {
-
- Publisher<BinaryRow> pub;
- TxAttributes txAttributes = ctx.txAttributes();
+ TxContext txContext = transactionalContextFrom(ctx.txAttributes(),
partWithConsistencyToken.enlistmentConsistencyToken());
int partId = partWithConsistencyToken.partId();
- if (txAttributes.readOnly()) {
- HybridTimestamp readTime = txAttributes.time();
-
- assert readTime != null;
-
- pub = internalTable.scan(partId, txAttributes.id(), readTime,
ctx.localNode(),
- txAttributes.coordinatorId());
- } else {
- PrimaryReplica recipient = new PrimaryReplica(ctx.localNode(),
partWithConsistencyToken.enlistmentConsistencyToken());
-
- pub = internalTable.scan(
- partId,
- txAttributes.id(),
- txAttributes.commitPartition(),
- txAttributes.coordinatorId(),
- recipient,
- null,
- null,
- null,
- 0,
- null
- );
- }
+ Publisher<BinaryRow> pub = internalTable.scan(
+ partId,
+ ctx.localNode(),
+ OperationContext.create(txContext)
+ );
TableRowConverter rowConverter =
converterFactory.create(requiredColumns, partId);
@@ -102,10 +87,10 @@ public class ScannableTableImpl implements ScannableTable {
@Nullable RangeCondition<RowT> cond,
int @Nullable [] requiredColumns
) {
- TxAttributes txAttributes = ctx.txAttributes();
+ TxContext txContext = transactionalContextFrom(ctx.txAttributes(),
partWithConsistencyToken.enlistmentConsistencyToken());
+
RowHandler<RowT> handler = rowFactory.handler();
- Publisher<BinaryRow> pub;
BinaryTuplePrefix lower;
BinaryTuplePrefix upper;
@@ -119,43 +104,19 @@ public class ScannableTableImpl implements ScannableTable
{
lower = toBinaryTuplePrefix(columns.size(), handler, cond.lower());
upper = toBinaryTuplePrefix(columns.size(), handler, cond.upper());
- flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : 0;
- flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : 0;
+ flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : GREATER;
+ flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : LESS;
}
int partId = partWithConsistencyToken.partId();
- if (txAttributes.readOnly()) {
- HybridTimestamp readTime = txAttributes.time();
-
- assert readTime != null;
-
- pub = internalTable.scan(
- partId,
- txAttributes.id(),
- readTime,
- ctx.localNode(),
- indexId,
- lower,
- upper,
- flags,
- null,
- txAttributes.coordinatorId()
- );
- } else {
- pub = internalTable.scan(
- partId,
- txAttributes.id(),
- txAttributes.commitPartition(),
- txAttributes.coordinatorId(),
- new PrimaryReplica(ctx.localNode(),
partWithConsistencyToken.enlistmentConsistencyToken()),
- indexId,
- lower,
- upper,
- flags,
- null
- );
- }
+ Publisher<BinaryRow> pub = internalTable.scan(
+ partId,
+ ctx.localNode(),
+ indexId,
+ IndexScanCriteria.range(lower, upper, flags),
+ OperationContext.create(txContext)
+ );
TableRowConverter rowConverter =
converterFactory.create(requiredColumns, partId);
@@ -173,9 +134,9 @@ public class ScannableTableImpl implements ScannableTable {
RowT key,
int @Nullable [] requiredColumns
) {
- TxAttributes txAttributes = ctx.txAttributes();
+ TxContext txContext = transactionalContextFrom(ctx.txAttributes(),
partWithConsistencyToken.enlistmentConsistencyToken());
+
RowHandler<RowT> handler = rowFactory.handler();
- Publisher<BinaryRow> pub;
BinaryTuple keyTuple = handler.toBinaryTuple(key);
@@ -184,33 +145,13 @@ public class ScannableTableImpl implements ScannableTable
{
int partId = partWithConsistencyToken.partId();
- if (txAttributes.readOnly()) {
- HybridTimestamp readTime = txAttributes.time();
-
- assert readTime != null;
-
- pub = internalTable.lookup(
- partId,
- txAttributes.id(),
- readTime,
- ctx.localNode(),
- indexId,
- keyTuple,
- null,
- txAttributes.coordinatorId()
- );
- } else {
- pub = internalTable.lookup(
- partId,
- txAttributes.id(),
- txAttributes.commitPartition(),
- txAttributes.coordinatorId(),
- new PrimaryReplica(ctx.localNode(),
partWithConsistencyToken.enlistmentConsistencyToken()),
- indexId,
- keyTuple,
- null
- );
- }
+ Publisher<BinaryRow> pub = internalTable.scan(
+ partId,
+ ctx.localNode(),
+ indexId,
+ IndexScanCriteria.lookup(keyTuple),
+ OperationContext.create(txContext)
+ );
TableRowConverter rowConverter =
converterFactory.create(requiredColumns, partId);
@@ -259,4 +200,20 @@ public class ScannableTableImpl implements ScannableTable {
return BinaryTuplePrefix.fromBinaryTuple(searchBoundSize,
handler.toBinaryTuple(prefix));
}
+
+ private static TxContext transactionalContextFrom(TxAttributes
txAttributes, long enlistmentConsistencyToken) {
+ if (txAttributes.readOnly()) {
+ HybridTimestamp timestamp = txAttributes.time();
+
+ assert timestamp != null;
+
+ return TxContext.readOnly(txAttributes.id(),
txAttributes.coordinatorId(), timestamp);
+ }
+
+ ReplicationGroupId commitPartition = txAttributes.commitPartition();
+
+ assert commitPartition != null;
+
+ return TxContext.readWrite(txAttributes.id(),
txAttributes.coordinatorId(), commitPartition, enlistmentConsistencyToken);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index f43ab415b21..66ca760ffdb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -20,7 +20,12 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
+import static
org.apache.ignite.internal.testframework.matchers.DelegatingMatcher.has;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -28,8 +33,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -40,7 +43,6 @@ import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
@@ -54,9 +56,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -75,10 +75,13 @@ import
org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.type.NativeTypes;
-import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.hamcrest.Matchers;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -123,27 +126,14 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
ResultCollector collector = tester.tableScan(partitionId,
consistencyToken, tx);
- if (tx.isReadOnly()) {
- HybridTimestamp timestamp = tx.readTimestamp();
- InternalClusterNode clusterNode = tx.clusterNode();
-
- verify(internalTable).scan(partitionId, tx.id(), timestamp,
clusterNode, tx.coordinatorId());
- } else {
- InternalClusterNode clusterNode = tx.clusterNode();
-
- verify(internalTable).scan(
- partitionId,
- tx.id(),
- tx.commitPartition(),
- tx.coordinatorId(),
- new PrimaryReplica(clusterNode, consistencyToken),
- null,
- null,
- null,
- 0,
- null
- );
- }
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
+ InternalClusterNode clusterNode = tx.clusterNode();
+
+ verify(internalTable).scan(
+ partitionId,
+ clusterNode,
+ OperationContext.create(txContext)
+ );
data.sendRows();
data.done();
@@ -197,48 +187,32 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
condition.setLower(lower, lowerValue);
condition.setUpper(upper, upperValue);
- int flags = condition.toFlags();
-
ResultCollector collector = tester.indexScan(partitionId,
consistencyToken, tx, indexId, condition);
- if (tx.isReadOnly()) {
- HybridTimestamp timestamp = tx.readTimestamp();
- InternalClusterNode clusterNode = tx.clusterNode();
+ InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() :
ctx.localNode();
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
- verify(internalTable).scan(
- eq(partitionId),
- eq(tx.id()),
- eq(timestamp),
- eq(clusterNode),
- eq(indexId),
- condition.lowerValue != null ?
any(BinaryTuplePrefix.class) : isNull(),
- condition.upperValue != null ?
any(BinaryTuplePrefix.class) : isNull(),
- eq(flags),
- isNull(),
- eq(tx.coordinatorId())
- );
- } else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
-
- verify(internalTable).scan(
- eq(partitionId),
- eq(tx.id()),
- eq(tx.commitPartition()),
- any(UUID.class),
- eq(primaryReplica),
- eq(indexId),
- condition.lowerValue != null ?
any(BinaryTuplePrefix.class) : isNull(),
- condition.upperValue != null ?
any(BinaryTuplePrefix.class) : isNull(),
- eq(flags),
- isNull()
- );
- }
+ ArgumentCaptor<IndexScanCriteria.Range> criteriaCaptor =
ArgumentCaptor.forClass(IndexScanCriteria.Range.class);
+
+ verify(internalTable).scan(
+ eq(partitionId),
+ eq(clusterNode),
+ eq(indexId),
+ criteriaCaptor.capture(),
+ eq(OperationContext.create(txContext))
+ );
input.sendRows();
input.done();
collector.expectRow(binaryRow);
collector.expectCompleted();
+
+ assertThat(criteriaCaptor.getValue(), Matchers.allOf(
+ has(IndexScanCriteria.Range::lowerBound, (lowerValue == null)
? nullValue() : instanceOf(BinaryTuplePrefix.class)),
+ has(IndexScanCriteria.Range::upperBound, (upperValue == null)
? nullValue() : instanceOf(BinaryTuplePrefix.class)),
+ has(IndexScanCriteria.Range::flags,
Matchers.is(condition.toFlags()))
+ ));
}
private static Stream<Arguments> indexScanParameters() {
@@ -276,38 +250,18 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
ResultCollector collector = tester.indexScan(partitionId,
consistencyToken, tx, indexId, condition);
- if (tx.isReadOnly()) {
- HybridTimestamp timestamp = tx.readTimestamp();
- InternalClusterNode clusterNode = tx.clusterNode();
+ InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() :
ctx.localNode();
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
- verify(internalTable).scan(
- eq(partitionId),
- eq(tx.id()),
- eq(timestamp),
- eq(clusterNode),
- eq(indexId),
- nullable(BinaryTuplePrefix.class),
- nullable(BinaryTuplePrefix.class),
- anyInt(),
- isNull(),
- eq(tx.coordinatorId())
- );
- } else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
-
- verify(internalTable).scan(
- eq(partitionId),
- eq(tx.id()),
- eq(tx.commitPartition()),
- any(UUID.class),
- eq(primaryReplica),
- eq(indexId),
- nullable(BinaryTuplePrefix.class),
- nullable(BinaryTuplePrefix.class),
- anyInt(),
- isNull()
- );
- }
+ OperationContext operationContext = OperationContext.create(txContext);
+
+ verify(internalTable).scan(
+ eq(partitionId),
+ eq(clusterNode),
+ eq(indexId),
+ any(IndexScanCriteria.Range.class),
+ eq(operationContext)
+ );
input.sendRows();
input.done();
@@ -393,50 +347,32 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
TestRangeCondition<Object[]> condition = new TestRangeCondition<>();
condition.setLower(Bound.INCLUSIVE, new Object[]{1, 2});
- ArgumentCaptor<BinaryTuplePrefix> prefix =
ArgumentCaptor.forClass(BinaryTuplePrefix.class);
+ ArgumentCaptor<IndexScanCriteria.Range> criteriaCaptor =
ArgumentCaptor.forClass(IndexScanCriteria.Range.class);
ResultCollector collector = tester.indexScan(partitionId,
consistencyToken, tx, indexId, condition);
- if (tx.isReadOnly()) {
- HybridTimestamp timestamp = tx.readTimestamp();
- InternalClusterNode clusterNode = tx.clusterNode();
+ InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() :
ctx.localNode();
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
- verify(internalTable).scan(
- eq(partitionId),
- eq(tx.id()),
- eq(timestamp),
- eq(clusterNode),
- eq(indexId),
- prefix.capture(),
- nullable(BinaryTuplePrefix.class),
- anyInt(),
- isNull(),
- eq(tx.coordinatorId())
- );
- } else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
-
- verify(internalTable).scan(
- eq(partitionId),
- eq(tx.id()),
- eq(tx.commitPartition()),
- any(UUID.class),
- eq(primaryReplica),
- eq(indexId),
- prefix.capture(),
- nullable(BinaryTuplePrefix.class),
- anyInt(),
- isNull()
- );
- }
+ verify(internalTable).scan(
+ eq(partitionId),
+ eq(clusterNode),
+ eq(indexId),
+ criteriaCaptor.capture(),
+ eq(OperationContext.create(txContext))
+ );
input.sendRows();
input.done();
collector.expectCompleted();
- BinaryTuplePrefix lowerBound = prefix.getValue();
+ BinaryTuplePrefix lowerBound = criteriaCaptor.getValue().lowerBound();
+ assertNotNull(lowerBound);
assertEquals(2, lowerBound.elementCount());
+
+ assertNull(criteriaCaptor.getValue().upperBound());
+ assertEquals(GREATER_OR_EQUAL, criteriaCaptor.getValue().flags());
}
/**
@@ -455,39 +391,30 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
int indexId = 3;
Object[] key = {1};
+ ArgumentCaptor<IndexScanCriteria.Lookup> criteriaCaptor =
ArgumentCaptor.forClass(IndexScanCriteria.Lookup.class);
+
ResultCollector collector = tester.indexLookUp(partitionId,
consistencyToken, tx, indexId, key);
- if (tx.isReadOnly()) {
- verify(internalTable).lookup(
- eq(partitionId),
- eq(tx.id()),
- eq(tx.readTimestamp()),
- eq(tx.clusterNode()),
- eq(indexId),
- any(BinaryTuple.class),
- isNull(),
- eq(tx.coordinatorId())
- );
- } else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
-
- verify(internalTable).lookup(
- eq(partitionId),
- eq(tx.id()),
- any(),
- any(UUID.class),
- eq(primaryReplica),
- eq(indexId),
- any(BinaryTuple.class),
- isNull()
- );
- }
+ InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() :
ctx.localNode();
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
+
+ verify(internalTable).scan(
+ eq(partitionId),
+ eq(clusterNode),
+ eq(indexId),
+ criteriaCaptor.capture(),
+ eq(OperationContext.create(txContext))
+ );
input.sendRows();
input.done();
collector.expectRow(binaryRow);
collector.expectCompleted();
+
+ BinaryTuple exactKey = criteriaCaptor.getValue().key();
+ assertNotNull(exactKey);
+ assertEquals(1, exactKey.elementCount());
}
/**
@@ -509,31 +436,16 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
ResultCollector collector = tester.indexLookUp(partitionId,
consistencyToken, tx, indexId, key);
- if (tx.isReadOnly()) {
- verify(internalTable).lookup(
- eq(partitionId),
- eq(tx.id()),
- eq(tx.readTimestamp()),
- eq(tx.clusterNode()),
- eq(indexId),
- any(BinaryTuple.class),
- eq(null),
- eq(tx.coordinatorId())
- );
- } else {
- PrimaryReplica primaryReplica = new
PrimaryReplica(ctx.localNode(), consistencyToken);
-
- verify(internalTable).lookup(
- eq(partitionId),
- eq(tx.id()),
- any(),
- any(UUID.class),
- eq(primaryReplica),
- eq(indexId),
- any(BinaryTuple.class),
- eq(null)
- );
- }
+ InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() :
ctx.localNode();
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
+
+ verify(internalTable).scan(
+ eq(partitionId),
+ eq(clusterNode),
+ eq(indexId),
+ any(IndexScanCriteria.Lookup.class),
+ eq(OperationContext.create(txContext))
+ );
input.sendRows();
input.done();
@@ -596,30 +508,23 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
when(ctx.localNode()).thenReturn(tx.clusterNode());
- if (tx.isReadOnly()) {
- doAnswer(invocation -> input.publisher).when(internalTable)
- .scan(anyInt(), any(UUID.class),
any(HybridTimestamp.class), any(InternalClusterNode.class), any(UUID.class));
- } else {
- doAnswer(invocation ->
input.publisher).when(internalTable).scan(
- anyInt(),
- any(UUID.class),
- any(TablePartitionId.class),
- any(UUID.class),
- any(PrimaryReplica.class),
- isNull(),
- isNull(),
- isNull(),
- eq(0),
- isNull()
- );
- }
+ InternalClusterNode clusterNode = tx.isReadOnly() ?
tx.clusterNode() : ctx.localNode();
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
+
+ doAnswer(invocation -> input.publisher).when(internalTable).scan(
+ anyInt(),
+ eq(clusterNode),
+ eq(OperationContext.create(txContext))
+ );
RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
RowFactory<Object[]> rowFactory =
rowHandler.factory(input.rowSchema);
Publisher<Object[]> publisher = scannableTable.scan(
ctx,
- new PartitionWithConsistencyToken(partitionId,
consistencyToken), rowFactory, null
+ new PartitionWithConsistencyToken(partitionId,
consistencyToken),
+ rowFactory,
+ null
);
return new ResultCollector(publisher, rowConverter);
@@ -636,31 +541,15 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
when(ctx.localNode()).thenReturn(tx.clusterNode());
- if (tx.isReadOnly()) {
- doAnswer(i -> input.publisher).when(internalTable).scan(
- anyInt(),
- any(UUID.class),
- any(HybridTimestamp.class),
- any(InternalClusterNode.class),
- any(Integer.class),
- nullable(BinaryTuplePrefix.class),
- nullable(BinaryTuplePrefix.class),
- anyInt(),
- nullable(BitSet.class),
- any(UUID.class));
- } else {
- doAnswer(i -> input.publisher).when(internalTable).scan(
- anyInt(),
- any(UUID.class),
- any(TablePartitionId.class),
- any(UUID.class),
- any(PrimaryReplica.class),
- any(Integer.class),
- nullable(BinaryTuplePrefix.class),
- nullable(BinaryTuplePrefix.class),
- anyInt(),
- nullable(BitSet.class));
- }
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
+
+ doAnswer(i -> input.publisher).when(internalTable).scan(
+ anyInt(),
+ any(InternalClusterNode.class),
+ anyInt(),
+ any(IndexScanCriteria.Range.class),
+ eq(OperationContext.create(txContext))
+ );
RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
RowFactory<Object[]> rowFactory =
rowHandler.factory(input.rowSchema);
@@ -686,27 +575,14 @@ public class ScannableTableSelfTest extends
BaseIgniteAbstractTest {
when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
when(ctx.localNode()).thenReturn(tx.clusterNode());
- if (tx.isReadOnly()) {
- doAnswer(i -> input.publisher).when(internalTable).lookup(
- anyInt(),
- any(UUID.class),
- any(HybridTimestamp.class),
- any(InternalClusterNode.class),
- any(Integer.class),
- nullable(BinaryTuple.class),
- isNull(),
- any(UUID.class));
- } else {
- doAnswer(i -> input.publisher).when(internalTable).lookup(
- anyInt(),
- any(UUID.class),
- any(TablePartitionId.class),
- any(UUID.class),
- any(PrimaryReplica.class),
- any(Integer.class),
- nullable(BinaryTuple.class),
- isNull());
- }
+ TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) :
TxContext.readWrite(tx, consistencyToken);
+
+ doAnswer(i -> input.publisher).when(internalTable).scan(
+ anyInt(),
+ any(InternalClusterNode.class),
+ any(Integer.class),
+ any(IndexScanCriteria.Lookup.class),
+ eq(OperationContext.create(txContext)));
RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
RowFactory<Object[]> rowFactory =
rowHandler.factory(input.rowSchema);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 865be91c770..6bfb2cc29c1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.BitSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Spliterator;
@@ -52,7 +51,6 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
@@ -69,7 +67,6 @@ import
org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
@@ -86,6 +83,7 @@ import
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
@@ -104,7 +102,6 @@ import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -354,15 +351,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
@Override
public Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- HybridTimestamp readTime,
InternalClusterNode recipient,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId
+ OperationContext opCtx
) {
return s -> {
s.onSubscribe(new Subscription() {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 288a9dc373c..6a5f8518199 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -32,6 +32,7 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionIds;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.tx.TransactionException;
@@ -41,7 +42,7 @@ import org.apache.ignite.tx.TransactionException;
*/
public final class NoOpTransaction implements InternalTransaction {
- private final UUID id = randomUUID();
+ private final UUID id;
private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
.addPhysicalTime(System.currentTimeMillis());
@@ -94,6 +95,8 @@ public final class NoOpTransaction implements
InternalTransaction {
this.enlistment = new
PendingTxPartitionEnlistment(enlistmentNode.name(), 1L, groupId.tableId());
this.implicit = implicit;
this.readOnly = readOnly;
+
+ this.id = readOnly ? randomUUID() :
TransactionIds.transactionId(hybridTimestamp, enlistmentNode.name().hashCode());
}
/** Node at which this transaction was start. */
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 796a5166c06..9fda77c3f9c 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -27,6 +27,8 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.jetbrains.annotations.Nullable;
@@ -48,7 +50,7 @@ public class ItInternalTableReadOnlyScanTest extends
ItAbstractInternalTableScan
InternalClusterNode node = mock(InternalClusterNode.class);
lenient().when(node.name()).thenReturn("node");
- return internalTbl.scan(part, tx.id(), internalTbl.CLOCK.now(), node,
tx.coordinatorId());
+ return internalTbl.scan(part, node,
OperationContext.create(TxContext.readOnly(tx)));
}
@Override
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 09280a2b74d..b89455202b9 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -28,12 +28,13 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.RollbackTxOnErrorPublisher;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.utils.PrimaryReplica;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -54,14 +55,13 @@ public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableSca
PendingTxPartitionEnlistment enlistment =
tx.enlistedPartition(targetReplicationGroupId(zoneId, part));
- PrimaryReplica recipient = new PrimaryReplica(
-
clusterNodeResolver.getByConsistentId(enlistment.primaryNodeConsistentId()),
- enlistment.consistencyToken()
- );
+ InternalClusterNode primaryNode =
clusterNodeResolver.getByConsistentId(enlistment.primaryNodeConsistentId());
+
+ TxContext txContext = TxContext.readWrite(tx,
enlistment.consistencyToken());
return new RollbackTxOnErrorPublisher<>(
tx,
- internalTbl.scan(part, tx.id(), tx.commitPartition(),
tx.coordinatorId(), recipient, null, null, null, 0, null)
+ internalTbl.scan(part, primaryNode,
OperationContext.create(txContext))
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java
new file mode 100644
index 00000000000..08a658e07b0
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Index scan criteria for range scan and lookup index operations.
+ */
+public interface IndexScanCriteria {
+ /** Creates range scan criteria. */
+ static Range range(@Nullable BinaryTuplePrefix lowerBound, @Nullable
BinaryTuplePrefix upperBound, int flags) {
+ return new Range(lowerBound, upperBound, flags);
+ }
+
+ /** Creates lookup criteria. */
+ static Lookup lookup(BinaryTuple key) {
+ return new Lookup(key);
+ }
+
+ /** Shortcut to create unbounded range scan criteria. */
+ static Range unbounded() {
+ return Range.UNBOUNDED;
+ }
+
+ /**
+ * Range scan criteria.
+ */
+ class Range implements IndexScanCriteria {
+ private static final Range UNBOUNDED = new Range(null, null, 0);
+
+ private final @Nullable BinaryTuplePrefix lowerBound;
+ private final @Nullable BinaryTuplePrefix upperBound;
+ private final int flags;
+
+ private Range(@Nullable BinaryTuplePrefix lowerBound, @Nullable
BinaryTuplePrefix upperBound, int flags) {
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ this.flags = flags;
+ }
+
+ /** Get lower bound. */
+ public @Nullable BinaryTuplePrefix lowerBound() {
+ return lowerBound;
+ }
+
+ /** Get upper bound. */
+ public @Nullable BinaryTuplePrefix upperBound() {
+ return upperBound;
+ }
+
+ /** Get flags. */
+ public int flags() {
+ return flags;
+ }
+ }
+
+ /**
+ * Lookup criteria.
+ */
+ class Lookup implements IndexScanCriteria {
+ private final BinaryTuple key;
+
+ private Lookup(BinaryTuple key) {
+ this.key = key;
+ }
+
+ /** Get lookup key. */
+ public BinaryTuple key() {
+ return key;
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1f9ab422675..bef61054490 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -30,18 +30,16 @@ import
org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Internal table facade provides low-level methods for table operations. The
facade hides TX/replication protocol over table storage
@@ -327,84 +325,44 @@ public interface InternalTable extends ManuallyCloseable {
* @return {@link Publisher} that reactively notifies about partition rows.
* @throws IllegalArgumentException If proposed partition index {@code p}
is out of bounds.
*/
- default Publisher<BinaryRow> scan(int partId, @Nullable
InternalTransaction tx) {
- return scan(partId, tx, null, null, null, 0, null);
- }
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-26846 Drop the
method.
+ @TestOnly
+ @Deprecated(forRemoval = true)
+ Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx);
/**
- * Scans given partition with the proposed read timestamp, providing
{@link Publisher} that reactively notifies about partition rows.
+ * Scans given partition, providing {@link Publisher} that reactively
notifies about partition rows.
*
* @param partId The partition.
- * @param readTimestamp Read timestamp.
* @param recipientNode Cluster node that will handle given get request.
- * @param txCoordinatorId Transaction coordinator inconsistent id.
+ * @param operationContext Operation context.
* @return {@link Publisher} that reactively notifies about partition rows.
* @throws IllegalArgumentException If proposed partition index {@code p}
is out of bounds.
* @throws TransactionException If proposed {@code tx} is read-write.
Transaction itself won't be automatically rolled back.
*/
- default Publisher<BinaryRow> scan(
+ Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
InternalClusterNode recipientNode,
- UUID txCoordinatorId
- ) {
- return scan(partId, txId, readTimestamp, recipientNode, null, null,
null, 0, null, txCoordinatorId);
- }
+ OperationContext operationContext
+ );
/**
- * Lookup rows corresponding to the given key given partition index,
providing {@link Publisher}
+ * Scans given partition index, providing {@link Publisher}
* that reactively notifies about partition rows.
*
* @param partId The partition.
- * @param readTimestamp Read timestamp.
* @param recipientNode Cluster node that will handle given get request.
* @param indexId Index id.
- * @param lowerBound Lower search bound.
- * @param upperBound Upper search bound.
- * @param flags Control flags. See {@link
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
- * @param columnsToInclude Row projection.
- * @param txCoordinatorId Transaction coordinator inconsistent id.
+ * @param criteria Index scan criteria.
+ * @param operationContext Operation context.
* @return {@link Publisher} that reactively notifies about partition rows.
*/
Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
InternalClusterNode recipientNode,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId
- );
-
- /**
- * Scans given partition index, providing {@link Publisher} that
reactively notifies about partition rows.
- *
- * @param partId The partition.
- * @param txId Transaction id.
- * @param commitPartition Commit partition id.
- * @param txCoordinatorId Transaction coordinator id.
- * @param recipient Primary replica that will handle given get request.
- * @param lowerBound Lower search bound.
- * @param upperBound Upper search bound.
- * @param flags Control flags. See {@link
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
- * @param columnsToInclude Row projection.
- * @return {@link Publisher} that reactively notifies about partition rows.
- */
- Publisher<BinaryRow> scan(
- int partId,
- UUID txId,
- ReplicationGroupId commitPartition,
- UUID txCoordinatorId,
- PrimaryReplica recipient,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
+ int indexId,
+ IndexScanCriteria criteria,
+ OperationContext operationContext
);
/**
@@ -413,68 +371,17 @@ public interface InternalTable extends ManuallyCloseable {
* @param partId The partition.
* @param tx The transaction.
* @param indexId Index id.
- * @param lowerBound Lower search bound.
- * @param upperBound Upper search bound.
- * @param flags Control flags. See {@link
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
- * @param columnsToInclude Row projection.
+ * @param criteria Index scan criteria.
* @return {@link Publisher} that reactively notifies about partition rows.
*/
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-26846 Drop the
method.
+ @TestOnly
+ @Deprecated(forRemoval = true)
Publisher<BinaryRow> scan(
int partId,
@Nullable InternalTransaction tx,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
- );
-
- /**
- * Scans given partition index, providing {@link Publisher} that
reactively notifies about partition rows.
- *
- * @param partId The partition.
- * @param readTimestamp Read timestamp.
- * @param recipientNode Cluster node that will handle given get request.
- * @param indexId Index id.
- * @param key Key to search.
- * @param columnsToInclude Row projection.
- * @param txCoordinatorId Transaction coordinator id.
- * @return {@link Publisher} that reactively notifies about partition rows.
- */
- Publisher<BinaryRow> lookup(
- int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
- InternalClusterNode recipientNode,
- int indexId,
- BinaryTuple key,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId
- );
-
- /**
- * Lookup rows corresponding to the given key given partition index,
providing {@link Publisher}
- * that reactively notifies about partition rows.
- *
- * @param partId The partition.
- * @param txId Transaction id.
- * @param commitPartition Commit partition id.
- * @param txCoordinatorId Transaction coordinator id.
- * @param recipient Primary replica that will handle given get request.
- * @param indexId Index id.
- * @param key Key to search.
- * @param columnsToInclude Row projection.
- * @return {@link Publisher} that reactively notifies about partition rows.
- */
- Publisher<BinaryRow> lookup(
- int partId,
- UUID txId,
- ReplicationGroupId commitPartition,
- UUID txCoordinatorId,
- PrimaryReplica recipient,
int indexId,
- BinaryTuple key,
- @Nullable BitSet columnsToInclude
+ IndexScanCriteria.Range criteria
);
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java
new file mode 100644
index 00000000000..d375a393e89
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.Objects;
+
+/**
+ * Context for table/index partition scan/lookup operation mostly for SQL need.
+ */
+public class OperationContext {
+ /** Creates a new instance of {@link OperationContext}. */
+ public static OperationContext create(TxContext txContext) {
+ return new OperationContext(txContext);
+ }
+
+ /** Transactional context. */
+ private final TxContext txContext;
+
+ private OperationContext(TxContext txContext) {
+ this.txContext = Objects.requireNonNull(txContext);
+ }
+
+ /**
+ * Returns the transactional context.
+ *
+ * @return The transactional context.
+ */
+ public TxContext txContext() {
+ return txContext;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OperationContext that = (OperationContext) o;
+ return Objects.equals(txContext, that.txContext);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(txContext.hashCode());
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
new file mode 100644
index 00000000000..117b058b88e
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Transactional operation context used in scan\lookup table operations.
+ *
+ * @see InternalTable
+ */
+public abstract class TxContext {
+ /** Creates operation context from RO transaction. */
+ public static TxContext readOnly(UUID txId, UUID txCoordinatorId,
HybridTimestamp readTimestamp) {
+ return new ReadOnly(txId, txCoordinatorId, readTimestamp);
+ }
+
+ /** Creates operation context from RO transaction. For test purposes only.
*/
+ @TestOnly
+ public static TxContext readOnly(InternalTransaction tx) {
+ assert tx.isReadOnly();
+
+ HybridTimestamp readTimestamp = tx.readTimestamp();
+
+ assert readTimestamp != null;
+
+ return new ReadOnly(tx.id(), tx.coordinatorId(), readTimestamp);
+ }
+
+ /** Creates operation context from RW transaction. */
+ public static TxContext readWrite(
+ UUID txId,
+ UUID txCoordinatorId,
+ ReplicationGroupId commitPartition,
+ long enlistmentConsistencyToken
+ ) {
+ return new ReadWrite(txId, txCoordinatorId, commitPartition,
enlistmentConsistencyToken);
+ }
+
+ /** Creates operation context from RW transaction. For test purposes only.
*/
+ @TestOnly
+ public static TxContext readWrite(InternalTransaction tx, long
enlistmentConsistencyToken) {
+ assert !tx.isReadOnly();
+
+ return new ReadWrite(tx.id(), tx.coordinatorId(),
tx.commitPartition(), enlistmentConsistencyToken);
+ }
+
+ protected final UUID txId;
+ protected final UUID coordinatorId;
+
+ protected TxContext(UUID txId, UUID coordinatorId) {
+ Objects.requireNonNull(txId, "Transaction id is mandatory");
+ Objects.requireNonNull(coordinatorId, "Transaction coordinator id is
mandatory");
+
+ this.txId = txId;
+ this.coordinatorId = coordinatorId;
+ }
+
+ /** Returns {@code true} for read only transaction, {@code false}
otherwise. */
+ public abstract boolean isReadOnly();
+
+ /** Returns transaction id. */
+ public UUID txId() {
+ return txId;
+ }
+
+ /** Returns transaction coordinator id. */
+ public UUID coordinatorId() {
+ return coordinatorId;
+ }
+
+ /** Read-only transaction context. */
+ public static class ReadOnly extends TxContext {
+ private final HybridTimestamp readTimestamp;
+
+ private ReadOnly(UUID txId, UUID txCoordinatorId, HybridTimestamp
readTimestamp) {
+ super(txId, txCoordinatorId);
+
+ this.readTimestamp = readTimestamp;
+ }
+
+ @Override
+ public boolean isReadOnly() {
+ return true;
+ }
+
+ /** Returns transaction read timestamp. */
+ public HybridTimestamp readTimestamp() {
+ return readTimestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ReadOnly txCtx = (ReadOnly) o;
+ return Objects.equals(txId, txCtx.txId)
+ && Objects.equals(coordinatorId, txCtx.coordinatorId)
+ && Objects.equals(readTimestamp, txCtx.readTimestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(txId);
+ }
+
+ @Override
+ public String toString() {
+ return IgniteToStringBuilder.toString(this);
+ }
+ }
+
+ /** Read-write transaction context. */
+ public static class ReadWrite extends TxContext {
+ private final ReplicationGroupId commitPartition;
+ private final long enlistmentConsistencyToken;
+
+ private ReadWrite(UUID txId, UUID txCoordinatorId, ReplicationGroupId
commitPartition, long enlistmentConsistencyToken) {
+ super(txId, txCoordinatorId);
+
+ Objects.requireNonNull(commitPartition, "Commit partition is
mandatory for RW transaction");
+
+ if (enlistmentConsistencyToken < 0) {
+ throw new IllegalArgumentException("Consistency token
partition is mandatory for RW transaction");
+ }
+
+ this.commitPartition = commitPartition;
+ this.enlistmentConsistencyToken = enlistmentConsistencyToken;
+ }
+
+ @Override
+ public boolean isReadOnly() {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReadWrite opCtx = (ReadWrite) o;
+ return enlistmentConsistencyToken ==
opCtx.enlistmentConsistencyToken
+ && Objects.equals(txId, opCtx.txId)
+ && Objects.equals(coordinatorId, opCtx.coordinatorId)
+ && Objects.equals(commitPartition, opCtx.commitPartition);
+ }
+
+ /** Returns transaction commit partition. */
+ public ReplicationGroupId commitPartition() {
+ return commitPartition;
+ }
+
+ /** Returns transaction begin timestamp. */
+ public HybridTimestamp beginTimestamp() {
+ return TransactionIds.beginTimestamp(txId);
+ }
+
+ /** Returns enlistment consistency token. */
+ public long enlistmentConsistencyToken() {
+ return enlistmentConsistencyToken;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(txId);
+ }
+
+ @Override
+ public String toString() {
+ return IgniteToStringBuilder.toString(this);
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 530f397a6b7..126c368b57a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -116,8 +116,11 @@ import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TxContext;
import
org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -129,12 +132,12 @@ import
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Storage of table rows.
@@ -544,7 +547,6 @@ public class InternalTableImpl implements InternalTable {
* @param lowerBound Lower search bound.
* @param upperBound Upper search bound.
* @param flags Control flags. See {@link
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
- * @param columnsToInclude Row projection.
* @return Batch of retrieved rows.
*/
private CompletableFuture<Collection<BinaryRow>> enlistCursorInTx(
@@ -556,8 +558,7 @@ public class InternalTableImpl implements InternalTable {
@Nullable BinaryTuple exactKey,
@Nullable BinaryTuplePrefix lowerBound,
@Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
+ int flags
) {
ReplicationGroupId replicationGroupId =
targetReplicationGroupId(partId);
@@ -577,7 +578,6 @@ public class InternalTableImpl implements InternalTable {
.lowerBoundPrefix(binaryTupleMessage(lowerBound))
.upperBoundPrefix(binaryTupleMessage(upperBound))
.flags(flags)
- .columnsToInclude(columnsToInclude)
.full(tx.implicit()) // Intent for one phase commit.
.batchSize(batchSize)
.enlistmentConsistencyToken(enlistmentConsistencyToken)
@@ -1562,139 +1562,122 @@ public class InternalTableImpl implements
InternalTable {
}
@Override
- public Publisher<BinaryRow> lookup(
+ public Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
InternalClusterNode recipientNode,
- int indexId,
- BinaryTuple key,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId
+ OperationContext operationContext
) {
- return readOnlyScan(partId, txId, readTimestamp, recipientNode,
indexId, key, null, null, 0, columnsToInclude, txCoordinatorId);
- }
+ validatePartitionIndex(partId);
- @Override
- public Publisher<BinaryRow> lookup(
- int partId,
- UUID txId,
- ReplicationGroupId commitPartition,
- UUID coordinatorId,
- PrimaryReplica recipient,
- int indexId,
- BinaryTuple key,
- @Nullable BitSet columnsToInclude
- ) {
- return readWriteScan(
- partId,
- txId,
- commitPartition,
- coordinatorId,
- recipient,
- indexId,
- key,
- null,
- null,
- 0,
- columnsToInclude
- );
+ if (operationContext.txContext().isReadOnly()) {
+ return readOnlyScan(
+ partId,
+ recipientNode,
+ null,
+ null,
+ operationContext
+ );
+ } else {
+ return readWriteScan(
+ partId,
+ recipientNode,
+ null,
+ null,
+ operationContext
+ );
+ }
}
@Override
public Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
InternalClusterNode recipientNode,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId
+ int indexId,
+ IndexScanCriteria criteria,
+ OperationContext operationContext
) {
- return readOnlyScan(
- partId,
- txId,
- readTimestamp,
- recipientNode,
- indexId,
- null,
- lowerBound,
- upperBound,
- flags,
- columnsToInclude,
- txCoordinatorId
- );
+ validatePartitionIndex(partId);
+
+ if (operationContext.txContext().isReadOnly()) {
+ return readOnlyScan(
+ partId,
+ recipientNode,
+ indexId,
+ criteria,
+ operationContext
+ );
+ } else {
+ return readWriteScan(
+ partId,
+ recipientNode,
+ indexId,
+ criteria,
+ operationContext
+ );
+ }
}
+ @TestOnly
@Override
public Publisher<BinaryRow> scan(
int partId,
- @Nullable InternalTransaction tx,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
+ @Nullable InternalTransaction tx
) {
- return readWriteScan(partId, tx, indexId, null, lowerBound,
upperBound, flags, columnsToInclude);
+ validatePartitionIndex(partId);
+
+ InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
+
+ return readWriteScan(partId, actualTx, null, null);
}
+ @TestOnly
@Override
public Publisher<BinaryRow> scan(
int partId,
- UUID txId,
- ReplicationGroupId commitPartition,
- UUID coordinatorId,
- PrimaryReplica recipient,
- @Nullable Integer indexId,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
+ @Nullable InternalTransaction tx,
+ int indexId,
+ IndexScanCriteria.Range criteria
) {
- return readWriteScan(
- partId,
- txId,
- commitPartition,
- coordinatorId,
- recipient,
- indexId,
- null,
- lowerBound,
- upperBound,
- flags,
- columnsToInclude
- );
+
+ validatePartitionIndex(partId);
+
+ InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
+
+ assert !actualTx.isReadOnly();
+
+ return readWriteScan(partId, actualTx, indexId, criteria);
}
private Publisher<BinaryRow> readOnlyScan(
int partId,
- UUID txId,
- HybridTimestamp readTimestamp,
InternalClusterNode recipientNode,
@Nullable Integer indexId,
- @Nullable BinaryTuple exactKey,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude,
- UUID txCoordinatorId
+ @Nullable IndexScanCriteria criteria,
+ OperationContext opCtx
) {
- validatePartitionIndex(partId);
+ assert opCtx.txContext().isReadOnly();
+
+ boolean rangeScan = criteria instanceof IndexScanCriteria.Range;
+ boolean lookup = criteria instanceof IndexScanCriteria.Lookup;
+
+ BinaryTuple exactKey = lookup ? ((IndexScanCriteria.Lookup)
criteria).key() : null;
+ BinaryTuplePrefix lowerBound = rangeScan ? ((IndexScanCriteria.Range)
criteria).lowerBound() : null;
+ BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range)
criteria).upperBound() : null;
+ int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() :
0;
+
+ TxContext.ReadOnly txContext = (TxContext.ReadOnly) opCtx.txContext();
ReplicationGroupId replicationGroupId =
targetReplicationGroupId(partId);
- return new PartitionScanPublisher<>(new
ReadOnlyInflightBatchRequestTracker(transactionInflights, txId)) {
+ return new PartitionScanPublisher<>(new
ReadOnlyInflightBatchRequestTracker(transactionInflights, txContext.txId())) {
@Override
protected CompletableFuture<Collection<BinaryRow>>
retrieveBatch(long scanId, int batchSize) {
ReadOnlyScanRetrieveBatchReplicaRequest request =
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(serializeReplicationGroupId(replicationGroupId))
.tableId(tableId)
- .readTimestamp(readTimestamp)
- .transactionId(txId)
+ .transactionId(txContext.txId())
+ .coordinatorId(txContext.coordinatorId())
+ .readTimestamp(txContext.readTimestamp())
.scanId(scanId)
.batchSize(batchSize)
.indexToUse(indexId)
@@ -1702,8 +1685,6 @@ public class InternalTableImpl implements InternalTable {
.lowerBoundPrefix(binaryTupleMessage(lowerBound))
.upperBoundPrefix(binaryTupleMessage(upperBound))
.flags(flags)
- .columnsToInclude(columnsToInclude)
- .coordinatorId(txCoordinatorId)
.build();
return replicaSvc.invoke(recipientNode, request);
@@ -1712,7 +1693,7 @@ public class InternalTableImpl implements InternalTable {
@Override
protected CompletableFuture<Void> onClose(boolean
intentionallyClose, long scanId, @Nullable Throwable th) {
return completeScan(
- txId,
+ txContext.txId(),
replicationGroupId,
scanId,
th,
@@ -1725,13 +1706,9 @@ public class InternalTableImpl implements InternalTable {
private Publisher<BinaryRow> readWriteScan(
int partId,
- @Nullable InternalTransaction tx,
+ InternalTransaction tx,
@Nullable Integer indexId,
- @Nullable BinaryTuple exactKey,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
+ @Nullable IndexScanCriteria.Range criteria
) {
// Check whether proposed tx is read-only. Complete future
exceptionally if true.
// Attempting to enlist a read-only in a read-write transaction does
not corrupt the transaction itself, thus read-write transaction
@@ -1745,22 +1722,25 @@ public class InternalTableImpl implements InternalTable
{
validatePartitionIndex(partId);
- InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
+ assert indexId == null || criteria != null;
+
+ BinaryTuplePrefix lowerBound = indexId == null ? null :
criteria.lowerBound();
+ BinaryTuplePrefix upperBound = indexId == null ? null :
criteria.upperBound();
+ int flags = indexId == null ? 0 : criteria.flags();
return new
PartitionScanPublisher<>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) {
@Override
protected CompletableFuture<Collection<BinaryRow>>
retrieveBatch(long scanId, int batchSize) {
return enlistCursorInTx(
- actualTx,
+ tx,
partId,
scanId,
batchSize,
indexId,
- exactKey,
+ null,
lowerBound,
upperBound,
- flags,
- columnsToInclude
+ flags
);
}
@@ -1768,7 +1748,7 @@ public class InternalTableImpl implements InternalTable {
protected CompletableFuture<Void> onClose(boolean
intentionallyClose, long scanId, @Nullable Throwable th) {
CompletableFuture<Void> opFut;
- if (actualTx.implicit()) {
+ if (tx.implicit()) {
opFut = completedOrFailedFuture(null, th);
} else {
var replicationGrpId = targetReplicationGroupId(partId);
@@ -1787,26 +1767,33 @@ public class InternalTableImpl implements InternalTable
{
return postEnlist(
opFut,
intentionallyClose,
- actualTx,
- actualTx.implicit() && !intentionallyClose
+ tx,
+ tx.implicit() && !intentionallyClose
);
}
};
}
+ @TestOnly
private Publisher<BinaryRow> readWriteScan(
int partId,
- UUID txId,
- ReplicationGroupId commitPartition,
- UUID coordinatorId,
- PrimaryReplica recipient,
+ InternalClusterNode recipient,
@Nullable Integer indexId,
- @Nullable BinaryTuple exactKey,
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags,
- @Nullable BitSet columnsToInclude
+ @Nullable IndexScanCriteria criteria,
+ OperationContext opCtx
) {
+ assert !opCtx.txContext().isReadOnly();
+
+ boolean rangeScan = criteria instanceof IndexScanCriteria.Range;
+ boolean lookup = criteria instanceof IndexScanCriteria.Lookup;
+
+ BinaryTuple exactKey = lookup ? ((IndexScanCriteria.Lookup)
criteria).key() : null;
+ BinaryTuplePrefix lowerBound = rangeScan ? ((IndexScanCriteria.Range)
criteria).lowerBound() : null;
+ BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range)
criteria).upperBound() : null;
+ int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() :
0;
+
+ TxContext.ReadWrite txContext = (TxContext.ReadWrite)
opCtx.txContext();
+
ReplicationGroupId replicationGroupId =
targetReplicationGroupId(partId);
return new
PartitionScanPublisher<>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) {
@@ -1815,28 +1802,27 @@ public class InternalTableImpl implements InternalTable
{
ReadWriteScanRetrieveBatchReplicaRequest request =
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
.groupId(serializeReplicationGroupId(replicationGroupId))
.tableId(tableId)
- .timestamp(TransactionIds.beginTimestamp(txId))
- .transactionId(txId)
+ .transactionId(txContext.txId())
+ .coordinatorId(txContext.coordinatorId())
+ .timestamp(txContext.beginTimestamp())
+
.commitPartitionId(serializeReplicationGroupId(txContext.commitPartition()))
+
.enlistmentConsistencyToken(txContext.enlistmentConsistencyToken())
.scanId(scanId)
.indexToUse(indexId)
.exactKey(binaryTupleMessage(exactKey))
.lowerBoundPrefix(binaryTupleMessage(lowerBound))
.upperBoundPrefix(binaryTupleMessage(upperBound))
.flags(flags)
- .columnsToInclude(columnsToInclude)
.batchSize(batchSize)
-
.enlistmentConsistencyToken(recipient.enlistmentConsistencyToken())
.full(false) // Set explicitly.
-
.commitPartitionId(serializeReplicationGroupId(commitPartition))
- .coordinatorId(coordinatorId)
.build();
- return replicaSvc.invoke(recipient.node(), request);
+ return replicaSvc.invoke(recipient, request);
}
@Override
protected CompletableFuture<Void> onClose(boolean
intentionallyClose, long scanId, @Nullable Throwable th) {
- return completeScan(txId, replicationGroupId, scanId, th,
recipient.node().name(), intentionallyClose);
+ return completeScan(txContext.txId(), replicationGroupId,
scanId, th, recipient.name(), intentionallyClose);
}
};
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 180be47cce7..f065f652747 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.NullBinaryRow;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
@@ -427,7 +428,7 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
DELETE_ALL((table, tx) -> table.deleteAll(List.of(createBinaryRow()),
tx)),
DELETE_ALL_EXACT((table, tx) ->
table.deleteAllExact(List.of(createBinaryRow()), tx)),
SCAN_MV_STORAGE(adaptScan((table, tx) -> table.scan(0, tx))),
- SCAN_INDEX(adaptScan((table, tx) -> table.scan(0, tx, 1, null, null,
0, null)));
+ SCAN_INDEX(adaptScan((table, tx) -> table.scan(0, tx, 1,
IndexScanCriteria.unbounded())));
private final BiFunction<InternalTable, InternalTransaction,
CompletableFuture<?>> action;
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 55c6281a3be..08b345b4939 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaTestUtils;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
@@ -118,6 +119,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@@ -1465,15 +1467,13 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
?
internalTable.scan(
0,
- internalTx.id(),
- internalTx.readTimestamp(),
ReplicaTestUtils.leaderAssignment(
txTestCluster.replicaManagers().get(txTestCluster.localNodeName()),
txTestCluster.clusterServices().get(txTestCluster.localNodeName()).topologyService(),
colocationEnabled() ? internalTable.zoneId() :
internalTable.tableId(),
0
),
- internalTx.coordinatorId()
+ OperationContext.create(TxContext.readOnly(internalTx))
)
: internalTable.scan(0, internalTx);
@@ -2106,10 +2106,8 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertThrowsTxFinishedException(() -> {
Flow.Publisher<BinaryRow> pub = accounts.internalTable().scan(
0,
- internalTx.id(),
- internalTx.readTimestamp(),
new ClusterNodeImpl(UUID.randomUUID(), "node", new
NetworkAddress("localhost", 123)),
- internalTx.coordinatorId()
+ OperationContext.create(TxContext.readOnly(internalTx))
);
AtomicReference<Throwable> errorRef = new AtomicReference<>();
@@ -2122,16 +2120,13 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
});
assertThrowsTxFinishedException(() -> {
- Flow.Publisher<BinaryRow> pub = accounts.internalTable().lookup(
+ Flow.Publisher<BinaryRow> pub = accounts.internalTable().scan(
0,
- internalTx.id(),
- internalTx.readTimestamp(),
new ClusterNodeImpl(UUID.randomUUID(), "node", new
NetworkAddress("localhost", 123)),
0,
- // Binary tuple is null for testing purposes, assuming
that it wouldn't be processed anyway.
- null,
- null,
- internalTx.coordinatorId()
+ // We assume that BinaryTuple will never be accessed.
+ IndexScanCriteria.lookup(Mockito.mock(BinaryTuple.class)),
+ OperationContext.create(TxContext.readOnly(internalTx))
);
AtomicReference<Throwable> errorRef = new AtomicReference<>();
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
index 7bfeb9e7848..56bdf31dd4e 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
@@ -79,8 +79,10 @@ import
org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaR
import
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -1085,7 +1087,9 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
node(0).cluster().nodes().stream().filter(node ->
node.name().equals(primary)).findAny().get()
);
- publisher = tbl.internalTable().scan(PART_ID, tx.id(),
tx.readTimestamp(), primaryNode, tx.coordinatorId());
+ OperationContext operationContext =
OperationContext.create(TxContext.readOnly(tx));
+
+ publisher = tbl.internalTable().scan(PART_ID, primaryNode,
operationContext);
} else {
publisher = tbl.internalTable().scan(PART_ID, tx);
}