This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-26913
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 38c190969d802d83a185fd07deb4a1ef76740e33
Author: amashenkov <[email protected]>
AuthorDate: Mon Nov 3 16:55:14 2025 +0300

    wip
---
 .../ignite/client/fakes/FakeInternalTable.java     |  67 +---
 .../testframework/matchers/DelegatingMatcher.java  |  55 ++++
 .../ItPrimaryReplicaChoiceTest.java                |  27 +-
 .../ItTruncateRaftLogAndRestartNodesTest.java      |   6 +-
 .../ignite/internal/table/ItInternalTableTest.java |   4 +-
 .../ignite/internal/table/ItTableScanTest.java     | 144 ++++-----
 .../sql/engine/exec/ScannableTableImpl.java        | 139 +++-----
 .../engine/exec/rel/ScannableTableSelfTest.java    | 352 +++++++--------------
 .../exec/rel/TableScanNodeExecutionTest.java       |  14 +-
 .../sql/engine/framework/NoOpTransaction.java      |   5 +-
 .../ItInternalTableReadOnlyScanTest.java           |   4 +-
 .../ItInternalTableReadWriteScanTest.java          |  12 +-
 .../ignite/internal/table/IndexScanCriteria.java   |  90 ++++++
 .../ignite/internal/table/InternalTable.java       | 135 ++------
 .../ignite/internal/table/OperationContext.java    |  60 ++++
 .../apache/ignite/internal/table/TxContext.java    | 197 ++++++++++++
 .../distributed/storage/InternalTableImpl.java     | 248 +++++++--------
 .../distributed/storage/InternalTableImplTest.java |   3 +-
 .../ignite/internal/table/TxAbstractTest.java      |  21 +-
 .../tx/distributed/ItTransactionRecoveryTest.java  |   6 +-
 20 files changed, 822 insertions(+), 767 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 381b8de9bca..a7d0ac8319c 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -52,16 +52,16 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.schema.ColumnsExtractor;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.table.DataStreamerReceiverDescriptor;
 import org.apache.ignite.table.QualifiedName;
@@ -405,12 +405,7 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
     @Override
     public Publisher<BinaryRow> scan(
             int partId,
-            @Nullable InternalTransaction tx,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            BitSet columnsToInclude
+            @Nullable InternalTransaction tx
     ) {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
@@ -418,15 +413,9 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
     @Override
     public Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            ReplicationGroupId commitPartition,
-            UUID txCoordinatorId,
-            PrimaryReplica recipient,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude
+            @Nullable InternalTransaction tx,
+            int indexId,
+            IndexScanCriteria.Range criteria
     ) {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
@@ -434,55 +423,21 @@ public class FakeInternalTable implements InternalTable, 
StreamerReceiverRunner
     @Override
     public Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
-            InternalClusterNode recipientNode,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude,
-            UUID txCoordinatorId) {
-        throw new IgniteInternalException(new 
OperationNotSupportedException());
-    }
-
-    @Override
-    public Publisher<BinaryRow> scan(
-            int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
             InternalClusterNode recipientNode,
-            UUID txCoordinatorId
-    ) {
-        return null;
-    }
-
-    @Override
-    public Publisher<BinaryRow> lookup(
-            int partId,
-            UUID txId,
-            ReplicationGroupId commitPartition,
-            UUID txCoordinatorId,
-            PrimaryReplica recipient,
             int indexId,
-            BinaryTuple key,
-            @Nullable BitSet columnsToInclude
+            IndexScanCriteria criteria,
+            OperationContext operationContext
     ) {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
     @Override
-    public Publisher<BinaryRow> lookup(
+    public Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
             InternalClusterNode recipientNode,
-            int indexId,
-            BinaryTuple key,
-            @Nullable BitSet columnsToInclude,
-            UUID txCoordinatorId
+            OperationContext operationContext
     ) {
-        throw new IgniteInternalException(new 
OperationNotSupportedException());
+        return null;
     }
 
     @Override public TxStateStorage txStateStorage() {
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java
new file mode 100644
index 00000000000..fad7c53915b
--- /dev/null
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import java.util.function.Function;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matches an object against a delegate matcher, transforming the object 
before matching.
+ *
+ * @param <O> Object type.
+ * @param <T> Transformed object type.
+ */
+public class DelegatingMatcher<O, T> extends TypeSafeMatcher<O> {
+    /** Creates a new matcher instance. */
+    public static <O, T> DelegatingMatcher<O, T> has(Function<O, T> 
transformer, Matcher<T> delegate) {
+        return new DelegatingMatcher<>(transformer, delegate);
+    }
+
+    private final Matcher<T> delegate;
+
+    private final Function<O, T> transformer;
+
+    private DelegatingMatcher(Function<O, T> transformer, Matcher<T> delegate) 
{
+        this.delegate = delegate;
+        this.transformer = transformer;
+    }
+
+    @Override
+    protected boolean matchesSafely(O item) {
+        return delegate.matches(transformer.apply(item));
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        delegate.describeTo(description);
+    }
+}
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index c5c6fb7fbda..e93028865c9 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -68,13 +68,15 @@ import 
org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import 
org.apache.ignite.internal.storage.impl.schema.TestProfileConfigurationSchema;
 import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
 import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.flow.TestFlowUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
-import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.internal.wrapper.Wrappers;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -340,19 +342,19 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
                     node(0).cluster().nodes().stream().filter(node -> 
node.id().equals(primaryId)).findAny().get()
             );
 
+            OperationContext opCtx = 
OperationContext.create(TxContext.readOnly(tx));
+
             if (idxId == null) {
-                publisher = internalTable.scan(PART_ID, tx.id(), 
tx.readTimestamp(), primaryNode, tx.coordinatorId());
+                publisher = internalTable.scan(PART_ID, primaryNode, opCtx);
             } else if (exactKey == null) {
-                publisher = internalTable.scan(PART_ID, tx.id(), 
tx.readTimestamp(), primaryNode, idxId, null, null, 0, null,
-                        tx.coordinatorId());
+                publisher = internalTable.scan(PART_ID, primaryNode, idxId, 
IndexScanCriteria.unbounded(), opCtx);
             } else {
-                publisher = internalTable.lookup(PART_ID, tx.id(), 
tx.readTimestamp(), primaryNode, idxId, exactKey, null,
-                        tx.coordinatorId());
+                publisher = internalTable.scan(PART_ID, primaryNode, idxId, 
IndexScanCriteria.lookup(exactKey), opCtx);
             }
         } else if (idxId == null) {
             publisher = unwrappedTable.internalTable().scan(PART_ID, tx);
         } else if (exactKey == null) {
-            publisher = unwrappedTable.internalTable().scan(PART_ID, tx, 
idxId, null, null, 0, null);
+            publisher = unwrappedTable.internalTable().scan(PART_ID, tx, 
idxId, IndexScanCriteria.unbounded());
         } else {
             ReadWriteTransactionImpl rwTx = Wrappers.unwrap(tx, 
ReadWriteTransactionImpl.class);
 
@@ -371,15 +373,12 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
                     node(0).cluster().nodes().stream().filter(node -> 
node.id().equals(primaryId)).findAny().get()
             );
 
-            publisher = unwrappedTable.internalTable().lookup(
+            publisher = unwrappedTable.internalTable().scan(
                     PART_ID,
-                    rwTx.id(),
-                    rwTx.commitPartition(),
-                    rwTx.coordinatorId(),
-                    new PrimaryReplica(primaryNode, 
primaryReplicaFut.get().getStartTime().longValue()),
+                    primaryNode,
                     idxId,
-                    exactKey,
-                    null
+                    IndexScanCriteria.lookup(exactKey),
+                    OperationContext.create(TxContext.readWrite(rwTx, 
primaryReplicaFut.get().getStartTime().longValue()))
             );
         }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
index 193a4f55bf6..955b83a8c36 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java
@@ -60,7 +60,9 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
@@ -339,10 +341,8 @@ public class ItTruncateRaftLogAndRestartNodesTest extends 
ClusterPerTestIntegrat
 
         return internalTableImpl.scan(
                 partitionId,
-                roTx.id(),
-                roTx.readTimestamp(),
                 recipientNode,
-                roTx.coordinatorId()
+                OperationContext.create(TxContext.readOnly(roTx))
         );
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
index 437957138a2..e2bfb980af6 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java
@@ -549,8 +549,10 @@ public class ItInternalTableTest extends 
ClusterPerClassIntegrationTest {
         InternalTransaction roTx =
                 (InternalTransaction) node.transactions().begin(new 
TransactionOptions().readOnly(true));
 
+        OperationContext operationContext = 
OperationContext.create(TxContext.readOnly(roTx));
+
         for (int i = 0; i < parts; i++) {
-            Publisher<BinaryRow> res = internalTable.scan(i, roTx.id(), 
node.clock().now(), node.node(), roTx.coordinatorId());
+            Publisher<BinaryRow> res = internalTable.scan(i, node.node(), 
operationContext);
 
             res.subscribe(new Subscriber<>() {
                 @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index c6f160be999..1cb99c0204e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -81,7 +81,6 @@ import 
org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.ErrorGroups.Transactions;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.KeyValueView;
@@ -233,21 +232,18 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
 
         List<BinaryRow> scannedRows = new ArrayList<>();
 
-        PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx1);
+        ReplicationGroupId replicationGroupId = replicationGroup(PART_ID);
+        PendingTxPartitionEnlistment enlistment = 
tx1.enlistedPartition(replicationGroupId);
+        InternalClusterNode recipient = 
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
 
         Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
                 tx1,
                 internalTable.scan(
                         PART_ID,
-                        tx1.id(),
-                        tx1.commitPartition(),
-                        tx1.coordinatorId(),
                         recipient,
                         sortedIndexId,
-                        null,
-                        null,
-                        0,
-                        null
+                        IndexScanCriteria.unbounded(),
+                        OperationContext.create(TxContext.readWrite(tx1, 
enlistment.consistencyToken()))
                 )
         );
 
@@ -293,7 +289,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         List<BinaryRow> scannedRows = new ArrayList<>();
 
-        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, 
sortedIndexId, null, null, 0, null);
+        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, 
sortedIndexId, IndexScanCriteria.unbounded());
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -469,7 +465,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         List<BinaryRow> scannedRows = new ArrayList<>();
 
-        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, 
null, null, null, 0, null);
+        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null);
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -505,7 +501,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest 
{
 
         assertEquals(ROW_IDS.size(), scannedRows.size());
 
-        var pub = internalTable.scan(PART_ID, null, null, null, null, 0, null);
+        var pub = internalTable.scan(PART_ID, null);
 
         assertEquals(ROW_IDS.size() + txOpFut.get(), scanAllRows(pub).size());
     }
@@ -520,21 +516,18 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
 
         InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
 
-        PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+        ReplicationGroupId replicationGroupId = replicationGroup(PART_ID);
+        PendingTxPartitionEnlistment enlistment = 
tx.enlistedPartition(replicationGroupId);
+        InternalClusterNode recipient = 
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
 
         Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
                 tx,
                 internalTable.scan(
                         PART_ID,
-                        tx.id(),
-                        tx.commitPartition(),
-                        tx.coordinatorId(),
                         recipient,
                         sortedIndexId,
-                        null,
-                        null,
-                        0,
-                        null
+                        IndexScanCriteria.unbounded(),
+                        OperationContext.create(TxContext.readWrite(tx, 
enlistment.consistencyToken()))
                 )
         );
 
@@ -569,15 +562,10 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
                 tx,
                 internalTable.scan(
                         PART_ID,
-                        tx.id(),
-                        tx.commitPartition(),
-                        tx.coordinatorId(),
                         recipient,
                         sortedIndexId,
-                        null,
-                        null,
-                        0,
-                        null
+                        IndexScanCriteria.unbounded(),
+                        OperationContext.create(TxContext.readWrite(tx, 
enlistment.consistencyToken()))
                 )
         );
 
@@ -594,31 +582,29 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
     public void testScanWithUpperBound() throws Exception {
         KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
 
-        BinaryTuplePrefix lowBound = BinaryTuplePrefix.fromBinaryTuple(
+        BinaryTuplePrefix lowerBound = BinaryTuplePrefix.fromBinaryTuple(
                 new BinaryTuple(1, new 
BinaryTupleBuilder(1).appendInt(5).build())
         );
         BinaryTuplePrefix upperBound = BinaryTuplePrefix.fromBinaryTuple(
                 new BinaryTuple(1, new 
BinaryTupleBuilder(1).appendInt(9).build())
         );
 
-        int soredIndexId = getSortedIndexId();
+        int sortedIndexId = getSortedIndexId();
 
         InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false, 
LONG_RUNNING_TX_TIMEOUT_MILLIS);
-        PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+
+        ReplicationGroupId replicationGroupId = replicationGroup(PART_ID);
+        PendingTxPartitionEnlistment enlistment = 
tx.enlistedPartition(replicationGroupId);
+        InternalClusterNode recipient = 
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
 
         Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
                 tx,
                 internalTable.scan(
                         PART_ID,
-                        tx.id(),
-                        tx.commitPartition(),
-                        tx.coordinatorId(),
                         recipient,
-                        soredIndexId,
-                        lowBound,
-                        upperBound,
-                        LESS_OR_EQUAL | GREATER_OR_EQUAL,
-                        null
+                        sortedIndexId,
+                        IndexScanCriteria.range(lowerBound, upperBound, 
LESS_OR_EQUAL | GREATER_OR_EQUAL),
+                        OperationContext.create(TxContext.readWrite(tx, 
enlistment.consistencyToken()))
                 )
         );
 
@@ -644,15 +630,10 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
                 tx,
                 internalTable.scan(
                         PART_ID,
-                        tx.id(),
-                        tx.commitPartition(),
-                        tx.coordinatorId(),
                         recipient,
-                        soredIndexId,
-                        lowBound,
-                        upperBound,
-                        LESS_OR_EQUAL | GREATER_OR_EQUAL,
-                        null
+                        sortedIndexId,
+                        IndexScanCriteria.range(lowerBound, upperBound, 
LESS_OR_EQUAL | GREATER_OR_EQUAL),
+                        OperationContext.create(TxContext.readWrite(tx, 
enlistment.consistencyToken()))
                 )
         );
 
@@ -669,11 +650,8 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
         Publisher<BinaryRow> publisher2 = internalTable.scan(
                 PART_ID,
                 null,
-                soredIndexId,
-                lowBound,
-                upperBound,
-                LESS_OR_EQUAL | GREATER_OR_EQUAL,
-                null
+                sortedIndexId,
+                IndexScanCriteria.range(lowerBound, upperBound, LESS_OR_EQUAL 
| GREATER_OR_EQUAL)
         );
 
         List<BinaryRow> scannedRows2 = scanAllRows(publisher2);
@@ -697,21 +675,18 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
             InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, 
false);
 
             try {
-                PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+                ReplicationGroupId replicationGroupId = 
replicationGroup(PART_ID);
+                PendingTxPartitionEnlistment enlistment = 
tx.enlistedPartition(replicationGroupId);
+                InternalClusterNode recipient = 
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
 
                 Publisher<BinaryRow> publisher = new 
RollbackTxOnErrorPublisher<>(
                         tx,
                         internalTable.scan(
                                 PART_ID,
-                                tx.id(),
-                                tx.commitPartition(),
-                                tx.coordinatorId(),
                                 recipient,
                                 sortedIndexId,
-                                null,
-                                null,
-                                0,
-                                null
+                                IndexScanCriteria.unbounded(),
+                                
OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken()))
                         )
                 );
 
@@ -740,15 +715,10 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
                         tx,
                         internalTable.scan(
                                 PART_ID,
-                                tx.id(),
-                                tx.commitPartition(),
-                                tx.coordinatorId(),
                                 recipient,
                                 sortedIndexId,
-                                null,
-                                null,
-                                0,
-                                null
+                                IndexScanCriteria.unbounded(),
+                                
OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken()))
                         )
                 );
 
@@ -803,13 +773,13 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
 
             tx = (InternalTransaction) 
CLUSTER.aliveNode().transactions().begin(new 
TransactionOptions().readOnly(true));
 
-            publisher = internalTable.scan(PART_ID, tx.id(), 
ignite.clock().now(), recipientNode, tx.coordinatorId());
+            publisher = internalTable.scan(PART_ID, recipientNode, 
OperationContext.create(TxContext.readOnly(tx)));
         } else {
             if (!implicit) {
                 tx = (InternalTransaction) 
CLUSTER.aliveNode().transactions().begin();
             }
 
-            publisher = internalTable.scan(PART_ID, tx, null, null, null, 0, 
null);
+            publisher = internalTable.scan(PART_ID, tx);
         }
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -866,25 +836,25 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
                         .map(ClusterNodeImpl::fromPublicClusterNode)
                         .orElseThrow();
 
+                OperationContext operationContext = 
OperationContext.create(TxContext.readOnly(tx));
+
                 //noinspection DataFlowIssue
-                publisher = internalTable.scan(PART_ID, tx.id(), 
tx.readTimestamp(), node0, sortedIndexId, null, null, 0, null,
-                        tx.coordinatorId());
+                publisher = internalTable.scan(PART_ID, node0, sortedIndexId, 
IndexScanCriteria.unbounded(), operationContext);
             } else {
-                PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
+                ReplicationGroupId replicationGroupId = 
replicationGroup(PART_ID);
+                PendingTxPartitionEnlistment enlistment = 
tx.enlistedPartition(replicationGroupId);
+                InternalClusterNode recipient = 
getNodeByConsistentId(enlistment.primaryNodeConsistentId());
+
+                OperationContext operationContext = 
OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken()));
 
                 publisher = new RollbackTxOnErrorPublisher<>(
                         tx,
                         internalTable.scan(
                                 PART_ID,
-                                tx.id(),
-                                tx.commitPartition(),
-                                tx.coordinatorId(),
                                 recipient,
                                 sortedIndexId,
-                                null,
-                                null,
-                                0,
-                                null
+                                IndexScanCriteria.unbounded(),
+                                operationContext
                         )
                 );
             }
@@ -898,22 +868,20 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
         }
     }
 
-    private PrimaryReplica getPrimaryReplica(int partId, InternalTransaction 
tx) {
-        ReplicationGroupId replicationGroupId = colocationEnabled()
-                ? new ZonePartitionId(table.zoneId(), partId)
-                : new TablePartitionId(table.tableId(), partId);
-
-        PendingTxPartitionEnlistment enlistment = 
tx.enlistedPartition(replicationGroupId);
-
+    private static InternalClusterNode getNodeByConsistentId(String 
nodeConsistentId) {
         IgniteImpl ignite = unwrapIgniteImpl(CLUSTER.aliveNode());
 
-        InternalClusterNode primaryNode = ignite.cluster().nodes().stream()
-                .filter(n -> 
n.name().equals(enlistment.primaryNodeConsistentId()))
+        return ignite.cluster().nodes().stream()
+                .filter(n -> n.name().equals(nodeConsistentId))
                 .map(ClusterNodeImpl::fromPublicClusterNode)
                 .findAny()
                 .orElseThrow();
+    }
 
-        return new PrimaryReplica(primaryNode, enlistment.consistencyToken());
+    private ReplicationGroupId replicationGroup(int partId) {
+        return colocationEnabled()
+                ? new ZonePartitionId(table.zoneId(), partId)
+                : new TablePartitionId(table.tableId(), partId);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
index 76010181f03..2256e7047fe 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java
@@ -18,23 +18,28 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
+import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
+import org.apache.ignite.internal.table.IndexScanCriteria;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.subscription.TransformingPublisher;
-import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -56,35 +61,15 @@ public class ScannableTableImpl implements ScannableTable {
     @Override
     public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, 
PartitionWithConsistencyToken partWithConsistencyToken,
             RowFactory<RowT> rowFactory, int @Nullable [] requiredColumns) {
-
-        Publisher<BinaryRow> pub;
-        TxAttributes txAttributes = ctx.txAttributes();
+        TxContext txContext = transactionalContextFrom(ctx.txAttributes(), 
partWithConsistencyToken.enlistmentConsistencyToken());
 
         int partId = partWithConsistencyToken.partId();
 
-        if (txAttributes.readOnly()) {
-            HybridTimestamp readTime = txAttributes.time();
-
-            assert readTime != null;
-
-            pub = internalTable.scan(partId, txAttributes.id(), readTime, 
ctx.localNode(),
-                    txAttributes.coordinatorId());
-        } else {
-            PrimaryReplica recipient = new PrimaryReplica(ctx.localNode(), 
partWithConsistencyToken.enlistmentConsistencyToken());
-
-            pub = internalTable.scan(
-                    partId,
-                    txAttributes.id(),
-                    txAttributes.commitPartition(),
-                    txAttributes.coordinatorId(),
-                    recipient,
-                    null,
-                    null,
-                    null,
-                    0,
-                    null
-            );
-        }
+        Publisher<BinaryRow> pub = internalTable.scan(
+                partId,
+                ctx.localNode(),
+                OperationContext.create(txContext)
+        );
 
         TableRowConverter rowConverter = 
converterFactory.create(requiredColumns, partId);
 
@@ -102,10 +87,10 @@ public class ScannableTableImpl implements ScannableTable {
             @Nullable RangeCondition<RowT> cond,
             int @Nullable [] requiredColumns
     ) {
-        TxAttributes txAttributes = ctx.txAttributes();
+        TxContext txContext = transactionalContextFrom(ctx.txAttributes(), 
partWithConsistencyToken.enlistmentConsistencyToken());
+
         RowHandler<RowT> handler = rowFactory.handler();
 
-        Publisher<BinaryRow> pub;
         BinaryTuplePrefix lower;
         BinaryTuplePrefix upper;
 
@@ -119,43 +104,19 @@ public class ScannableTableImpl implements ScannableTable 
{
             lower = toBinaryTuplePrefix(columns.size(), handler, cond.lower());
             upper = toBinaryTuplePrefix(columns.size(), handler, cond.upper());
 
-            flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : 0;
-            flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : 0;
+            flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : GREATER;
+            flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : LESS;
         }
 
         int partId = partWithConsistencyToken.partId();
 
-        if (txAttributes.readOnly()) {
-            HybridTimestamp readTime = txAttributes.time();
-
-            assert readTime != null;
-
-            pub = internalTable.scan(
-                    partId,
-                    txAttributes.id(),
-                    readTime,
-                    ctx.localNode(),
-                    indexId,
-                    lower,
-                    upper,
-                    flags,
-                    null,
-                    txAttributes.coordinatorId()
-            );
-        } else {
-            pub = internalTable.scan(
-                    partId,
-                    txAttributes.id(),
-                    txAttributes.commitPartition(),
-                    txAttributes.coordinatorId(),
-                    new PrimaryReplica(ctx.localNode(), 
partWithConsistencyToken.enlistmentConsistencyToken()),
-                    indexId,
-                    lower,
-                    upper,
-                    flags,
-                    null
-            );
-        }
+        Publisher<BinaryRow> pub = internalTable.scan(
+                partId,
+                ctx.localNode(),
+                indexId,
+                IndexScanCriteria.range(lower, upper, flags),
+                OperationContext.create(txContext)
+        );
 
         TableRowConverter rowConverter = 
converterFactory.create(requiredColumns, partId);
 
@@ -173,9 +134,9 @@ public class ScannableTableImpl implements ScannableTable {
             RowT key,
             int @Nullable [] requiredColumns
     ) {
-        TxAttributes txAttributes = ctx.txAttributes();
+        TxContext txContext = transactionalContextFrom(ctx.txAttributes(), 
partWithConsistencyToken.enlistmentConsistencyToken());
+
         RowHandler<RowT> handler = rowFactory.handler();
-        Publisher<BinaryRow> pub;
 
         BinaryTuple keyTuple = handler.toBinaryTuple(key);
 
@@ -184,33 +145,13 @@ public class ScannableTableImpl implements ScannableTable 
{
 
         int partId = partWithConsistencyToken.partId();
 
-        if (txAttributes.readOnly()) {
-            HybridTimestamp readTime = txAttributes.time();
-
-            assert readTime != null;
-
-            pub = internalTable.lookup(
-                    partId,
-                    txAttributes.id(),
-                    readTime,
-                    ctx.localNode(),
-                    indexId,
-                    keyTuple,
-                    null,
-                    txAttributes.coordinatorId()
-            );
-        } else {
-            pub = internalTable.lookup(
-                    partId,
-                    txAttributes.id(),
-                    txAttributes.commitPartition(),
-                    txAttributes.coordinatorId(),
-                    new PrimaryReplica(ctx.localNode(), 
partWithConsistencyToken.enlistmentConsistencyToken()),
-                    indexId,
-                    keyTuple,
-                    null
-            );
-        }
+        Publisher<BinaryRow> pub = internalTable.scan(
+                partId,
+                ctx.localNode(),
+                indexId,
+                IndexScanCriteria.lookup(keyTuple),
+                OperationContext.create(txContext)
+        );
 
         TableRowConverter rowConverter = 
converterFactory.create(requiredColumns, partId);
 
@@ -259,4 +200,20 @@ public class ScannableTableImpl implements ScannableTable {
 
         return BinaryTuplePrefix.fromBinaryTuple(searchBoundSize, 
handler.toBinaryTuple(prefix));
     }
+
+    private static TxContext transactionalContextFrom(TxAttributes 
txAttributes, long enlistmentConsistencyToken) {
+        if (txAttributes.readOnly()) {
+            HybridTimestamp timestamp = txAttributes.time();
+
+            assert timestamp != null;
+
+            return TxContext.readOnly(txAttributes.id(), 
txAttributes.coordinatorId(), timestamp);
+        }
+
+        ReplicationGroupId commitPartition = txAttributes.commitPartition();
+
+        assert commitPartition != null;
+
+        return TxContext.readWrite(txAttributes.id(), 
txAttributes.coordinatorId(), commitPartition, enlistmentConsistencyToken);
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
index f43ab415b21..66ca760ffdb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java
@@ -20,7 +20,12 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
 import static 
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
+import static 
org.apache.ignite.internal.testframework.matchers.DelegatingMatcher.has;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -28,8 +33,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
@@ -40,7 +43,6 @@ import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
@@ -54,9 +56,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory.Builder;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -75,10 +75,13 @@ import 
org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.table.IndexScanCriteria;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.internal.type.NativeTypes;
-import org.apache.ignite.internal.utils.PrimaryReplica;
+import org.hamcrest.Matchers;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Named;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -123,27 +126,14 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
 
         ResultCollector collector = tester.tableScan(partitionId, 
consistencyToken, tx);
 
-        if (tx.isReadOnly()) {
-            HybridTimestamp timestamp = tx.readTimestamp();
-            InternalClusterNode clusterNode = tx.clusterNode();
-
-            verify(internalTable).scan(partitionId, tx.id(), timestamp, 
clusterNode, tx.coordinatorId());
-        } else {
-            InternalClusterNode clusterNode = tx.clusterNode();
-
-            verify(internalTable).scan(
-                    partitionId,
-                    tx.id(),
-                    tx.commitPartition(),
-                    tx.coordinatorId(),
-                    new PrimaryReplica(clusterNode, consistencyToken),
-                    null,
-                    null,
-                    null,
-                    0,
-                    null
-            );
-        }
+        TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
+        InternalClusterNode clusterNode = tx.clusterNode();
+
+        verify(internalTable).scan(
+                partitionId,
+                clusterNode,
+                OperationContext.create(txContext)
+        );
 
         data.sendRows();
         data.done();
@@ -197,48 +187,32 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
         condition.setLower(lower, lowerValue);
         condition.setUpper(upper, upperValue);
 
-        int flags = condition.toFlags();
-
         ResultCollector collector = tester.indexScan(partitionId, 
consistencyToken, tx, indexId, condition);
 
-        if (tx.isReadOnly()) {
-            HybridTimestamp timestamp = tx.readTimestamp();
-            InternalClusterNode clusterNode = tx.clusterNode();
+        InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : 
ctx.localNode();
+        TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
 
-            verify(internalTable).scan(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(timestamp),
-                    eq(clusterNode),
-                    eq(indexId),
-                    condition.lowerValue != null ? 
any(BinaryTuplePrefix.class) : isNull(),
-                    condition.upperValue != null ? 
any(BinaryTuplePrefix.class) : isNull(),
-                    eq(flags),
-                    isNull(),
-                    eq(tx.coordinatorId())
-            );
-        } else {
-            PrimaryReplica primaryReplica = new 
PrimaryReplica(ctx.localNode(), consistencyToken);
-
-            verify(internalTable).scan(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(tx.commitPartition()),
-                    any(UUID.class),
-                    eq(primaryReplica),
-                    eq(indexId),
-                    condition.lowerValue != null ? 
any(BinaryTuplePrefix.class) : isNull(),
-                    condition.upperValue != null ? 
any(BinaryTuplePrefix.class) : isNull(),
-                    eq(flags),
-                    isNull()
-            );
-        }
+        ArgumentCaptor<IndexScanCriteria.Range> criteriaCaptor = 
ArgumentCaptor.forClass(IndexScanCriteria.Range.class);
+
+        verify(internalTable).scan(
+                eq(partitionId),
+                eq(clusterNode),
+                eq(indexId),
+                criteriaCaptor.capture(),
+                eq(OperationContext.create(txContext))
+        );
 
         input.sendRows();
         input.done();
 
         collector.expectRow(binaryRow);
         collector.expectCompleted();
+
+        assertThat(criteriaCaptor.getValue(), Matchers.allOf(
+                has(IndexScanCriteria.Range::lowerBound, (lowerValue == null) 
? nullValue() : instanceOf(BinaryTuplePrefix.class)),
+                has(IndexScanCriteria.Range::upperBound, (upperValue == null) 
? nullValue() : instanceOf(BinaryTuplePrefix.class)),
+                has(IndexScanCriteria.Range::flags, 
Matchers.is(condition.toFlags()))
+        ));
     }
 
     private static Stream<Arguments> indexScanParameters() {
@@ -276,38 +250,18 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
 
         ResultCollector collector = tester.indexScan(partitionId, 
consistencyToken, tx, indexId, condition);
 
-        if (tx.isReadOnly()) {
-            HybridTimestamp timestamp = tx.readTimestamp();
-            InternalClusterNode clusterNode = tx.clusterNode();
+        InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : 
ctx.localNode();
+        TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
 
-            verify(internalTable).scan(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(timestamp),
-                    eq(clusterNode),
-                    eq(indexId),
-                    nullable(BinaryTuplePrefix.class),
-                    nullable(BinaryTuplePrefix.class),
-                    anyInt(),
-                    isNull(),
-                    eq(tx.coordinatorId())
-            );
-        } else {
-            PrimaryReplica primaryReplica = new 
PrimaryReplica(ctx.localNode(), consistencyToken);
-
-            verify(internalTable).scan(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(tx.commitPartition()),
-                    any(UUID.class),
-                    eq(primaryReplica),
-                    eq(indexId),
-                    nullable(BinaryTuplePrefix.class),
-                    nullable(BinaryTuplePrefix.class),
-                    anyInt(),
-                    isNull()
-            );
-        }
+        OperationContext operationContext = OperationContext.create(txContext);
+
+        verify(internalTable).scan(
+                eq(partitionId),
+                eq(clusterNode),
+                eq(indexId),
+                any(IndexScanCriteria.Range.class),
+                eq(operationContext)
+        );
 
         input.sendRows();
         input.done();
@@ -393,50 +347,32 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
         TestRangeCondition<Object[]> condition = new TestRangeCondition<>();
         condition.setLower(Bound.INCLUSIVE, new Object[]{1, 2});
 
-        ArgumentCaptor<BinaryTuplePrefix> prefix = 
ArgumentCaptor.forClass(BinaryTuplePrefix.class);
+        ArgumentCaptor<IndexScanCriteria.Range> criteriaCaptor = 
ArgumentCaptor.forClass(IndexScanCriteria.Range.class);
 
         ResultCollector collector = tester.indexScan(partitionId, 
consistencyToken, tx, indexId, condition);
 
-        if (tx.isReadOnly()) {
-            HybridTimestamp timestamp = tx.readTimestamp();
-            InternalClusterNode clusterNode = tx.clusterNode();
+        InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : 
ctx.localNode();
+        TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
 
-            verify(internalTable).scan(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(timestamp),
-                    eq(clusterNode),
-                    eq(indexId),
-                    prefix.capture(),
-                    nullable(BinaryTuplePrefix.class),
-                    anyInt(),
-                    isNull(),
-                    eq(tx.coordinatorId())
-            );
-        } else {
-            PrimaryReplica primaryReplica = new 
PrimaryReplica(ctx.localNode(), consistencyToken);
-
-            verify(internalTable).scan(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(tx.commitPartition()),
-                    any(UUID.class),
-                    eq(primaryReplica),
-                    eq(indexId),
-                    prefix.capture(),
-                    nullable(BinaryTuplePrefix.class),
-                    anyInt(),
-                    isNull()
-            );
-        }
+        verify(internalTable).scan(
+                eq(partitionId),
+                eq(clusterNode),
+                eq(indexId),
+                criteriaCaptor.capture(),
+                eq(OperationContext.create(txContext))
+        );
 
         input.sendRows();
         input.done();
 
         collector.expectCompleted();
 
-        BinaryTuplePrefix lowerBound = prefix.getValue();
+        BinaryTuplePrefix lowerBound = criteriaCaptor.getValue().lowerBound();
+        assertNotNull(lowerBound);
         assertEquals(2, lowerBound.elementCount());
+
+        assertNull(criteriaCaptor.getValue().upperBound());
+        assertEquals(GREATER_OR_EQUAL, criteriaCaptor.getValue().flags());
     }
 
     /**
@@ -455,39 +391,30 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
         int indexId = 3;
         Object[] key = {1};
 
+        ArgumentCaptor<IndexScanCriteria.Lookup> criteriaCaptor = 
ArgumentCaptor.forClass(IndexScanCriteria.Lookup.class);
+
         ResultCollector collector = tester.indexLookUp(partitionId, 
consistencyToken, tx, indexId, key);
 
-        if (tx.isReadOnly()) {
-            verify(internalTable).lookup(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(tx.readTimestamp()),
-                    eq(tx.clusterNode()),
-                    eq(indexId),
-                    any(BinaryTuple.class),
-                    isNull(),
-                    eq(tx.coordinatorId())
-            );
-        } else {
-            PrimaryReplica primaryReplica = new 
PrimaryReplica(ctx.localNode(), consistencyToken);
-
-            verify(internalTable).lookup(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    any(),
-                    any(UUID.class),
-                    eq(primaryReplica),
-                    eq(indexId),
-                    any(BinaryTuple.class),
-                    isNull()
-            );
-        }
+        InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : 
ctx.localNode();
+        TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
+
+        verify(internalTable).scan(
+                eq(partitionId),
+                eq(clusterNode),
+                eq(indexId),
+                criteriaCaptor.capture(),
+                eq(OperationContext.create(txContext))
+        );
 
         input.sendRows();
         input.done();
 
         collector.expectRow(binaryRow);
         collector.expectCompleted();
+
+        BinaryTuple exactKey = criteriaCaptor.getValue().key();
+        assertNotNull(exactKey);
+        assertEquals(1, exactKey.elementCount());
     }
 
     /**
@@ -509,31 +436,16 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
 
         ResultCollector collector = tester.indexLookUp(partitionId, 
consistencyToken, tx, indexId, key);
 
-        if (tx.isReadOnly()) {
-            verify(internalTable).lookup(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    eq(tx.readTimestamp()),
-                    eq(tx.clusterNode()),
-                    eq(indexId),
-                    any(BinaryTuple.class),
-                    eq(null),
-                    eq(tx.coordinatorId())
-            );
-        } else {
-            PrimaryReplica primaryReplica = new 
PrimaryReplica(ctx.localNode(), consistencyToken);
-
-            verify(internalTable).lookup(
-                    eq(partitionId),
-                    eq(tx.id()),
-                    any(),
-                    any(UUID.class),
-                    eq(primaryReplica),
-                    eq(indexId),
-                    any(BinaryTuple.class),
-                    eq(null)
-            );
-        }
+        InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : 
ctx.localNode();
+        TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
+
+        verify(internalTable).scan(
+                eq(partitionId),
+                eq(clusterNode),
+                eq(indexId),
+                any(IndexScanCriteria.Lookup.class),
+                eq(OperationContext.create(txContext))
+        );
 
         input.sendRows();
         input.done();
@@ -596,30 +508,23 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
             when(ctx.localNode()).thenReturn(tx.clusterNode());
 
-            if (tx.isReadOnly()) {
-                doAnswer(invocation -> input.publisher).when(internalTable)
-                        .scan(anyInt(), any(UUID.class), 
any(HybridTimestamp.class), any(InternalClusterNode.class), any(UUID.class));
-            } else {
-                doAnswer(invocation -> 
input.publisher).when(internalTable).scan(
-                        anyInt(),
-                        any(UUID.class),
-                        any(TablePartitionId.class),
-                        any(UUID.class),
-                        any(PrimaryReplica.class),
-                        isNull(),
-                        isNull(),
-                        isNull(),
-                        eq(0),
-                        isNull()
-                );
-            }
+            InternalClusterNode clusterNode = tx.isReadOnly() ? 
tx.clusterNode() : ctx.localNode();
+            TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
+
+            doAnswer(invocation -> input.publisher).when(internalTable).scan(
+                    anyInt(),
+                    eq(clusterNode),
+                    eq(OperationContext.create(txContext))
+            );
 
             RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
             RowFactory<Object[]> rowFactory = 
rowHandler.factory(input.rowSchema);
 
             Publisher<Object[]> publisher = scannableTable.scan(
                     ctx,
-                    new PartitionWithConsistencyToken(partitionId, 
consistencyToken), rowFactory, null
+                    new PartitionWithConsistencyToken(partitionId, 
consistencyToken),
+                    rowFactory,
+                    null
             );
 
             return new ResultCollector(publisher, rowConverter);
@@ -636,31 +541,15 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
             when(ctx.localNode()).thenReturn(tx.clusterNode());
 
-            if (tx.isReadOnly()) {
-                doAnswer(i -> input.publisher).when(internalTable).scan(
-                        anyInt(),
-                        any(UUID.class),
-                        any(HybridTimestamp.class),
-                        any(InternalClusterNode.class),
-                        any(Integer.class),
-                        nullable(BinaryTuplePrefix.class),
-                        nullable(BinaryTuplePrefix.class),
-                        anyInt(),
-                        nullable(BitSet.class),
-                        any(UUID.class));
-            } else {
-                doAnswer(i -> input.publisher).when(internalTable).scan(
-                        anyInt(),
-                        any(UUID.class),
-                        any(TablePartitionId.class),
-                        any(UUID.class),
-                        any(PrimaryReplica.class),
-                        any(Integer.class),
-                        nullable(BinaryTuplePrefix.class),
-                        nullable(BinaryTuplePrefix.class),
-                        anyInt(),
-                        nullable(BitSet.class));
-            }
+            TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
+
+            doAnswer(i -> input.publisher).when(internalTable).scan(
+                    anyInt(),
+                    any(InternalClusterNode.class),
+                    anyInt(),
+                    any(IndexScanCriteria.Range.class),
+                    eq(OperationContext.create(txContext))
+            );
 
             RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
             RowFactory<Object[]> rowFactory = 
rowHandler.factory(input.rowSchema);
@@ -686,27 +575,14 @@ public class ScannableTableSelfTest extends 
BaseIgniteAbstractTest {
             when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx));
             when(ctx.localNode()).thenReturn(tx.clusterNode());
 
-            if (tx.isReadOnly()) {
-                doAnswer(i -> input.publisher).when(internalTable).lookup(
-                        anyInt(),
-                        any(UUID.class),
-                        any(HybridTimestamp.class),
-                        any(InternalClusterNode.class),
-                        any(Integer.class),
-                        nullable(BinaryTuple.class),
-                        isNull(),
-                        any(UUID.class));
-            } else {
-                doAnswer(i -> input.publisher).when(internalTable).lookup(
-                        anyInt(),
-                        any(UUID.class),
-                        any(TablePartitionId.class),
-                        any(UUID.class),
-                        any(PrimaryReplica.class),
-                        any(Integer.class),
-                        nullable(BinaryTuple.class),
-                        isNull());
-            }
+            TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : 
TxContext.readWrite(tx, consistencyToken);
+
+            doAnswer(i -> input.publisher).when(internalTable).scan(
+                    anyInt(),
+                    any(InternalClusterNode.class),
+                    any(Integer.class),
+                    any(IndexScanCriteria.Lookup.class),
+                    eq(OperationContext.create(txContext)));
 
             RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE;
             RowFactory<Object[]> rowFactory = 
rowHandler.factory(input.rowSchema);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 865be91c770..6bfb2cc29c1 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.util.BitSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Spliterator;
@@ -52,7 +51,6 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
@@ -69,7 +67,6 @@ import 
org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
@@ -86,6 +83,7 @@ import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
@@ -104,7 +102,6 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -354,15 +351,8 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
         @Override
         public Publisher<BinaryRow> scan(
                 int partId,
-                UUID txId,
-                HybridTimestamp readTime,
                 InternalClusterNode recipient,
-                @Nullable Integer indexId,
-                @Nullable BinaryTuplePrefix lowerBound,
-                @Nullable BinaryTuplePrefix upperBound,
-                int flags,
-                @Nullable BitSet columnsToInclude,
-                UUID txCoordinatorId
+                OperationContext opCtx
         ) {
             return s -> {
                 s.onSubscribe(new Subscription() {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
index 288a9dc373c..6a5f8518199 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java
@@ -32,6 +32,7 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.tx.TransactionException;
@@ -41,7 +42,7 @@ import org.apache.ignite.tx.TransactionException;
  */
 public final class NoOpTransaction implements InternalTransaction {
 
-    private final UUID id = randomUUID();
+    private final UUID id;
 
     private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
             .addPhysicalTime(System.currentTimeMillis());
@@ -94,6 +95,8 @@ public final class NoOpTransaction implements 
InternalTransaction {
         this.enlistment = new 
PendingTxPartitionEnlistment(enlistmentNode.name(), 1L, groupId.tableId());
         this.implicit = implicit;
         this.readOnly = readOnly;
+
+        this.id = readOnly ?  randomUUID() : 
TransactionIds.transactionId(hybridTimestamp, enlistmentNode.name().hashCode());
     }
 
     /** Node at which this transaction was start. */
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 796a5166c06..9fda77c3f9c 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -27,6 +27,8 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.jetbrains.annotations.Nullable;
@@ -48,7 +50,7 @@ public class ItInternalTableReadOnlyScanTest extends 
ItAbstractInternalTableScan
         InternalClusterNode node = mock(InternalClusterNode.class);
         lenient().when(node.name()).thenReturn("node");
 
-        return internalTbl.scan(part, tx.id(), internalTbl.CLOCK.now(), node, 
tx.coordinatorId());
+        return internalTbl.scan(part, node, 
OperationContext.create(TxContext.readOnly(tx)));
     }
 
     @Override
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
index 09280a2b74d..b89455202b9 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
@@ -28,12 +28,13 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.RollbackTxOnErrorPublisher;
+import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.InternalTxOptions;
 import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
@@ -54,14 +55,13 @@ public class ItInternalTableReadWriteScanTest extends 
ItAbstractInternalTableSca
         PendingTxPartitionEnlistment enlistment =
                 tx.enlistedPartition(targetReplicationGroupId(zoneId, part));
 
-        PrimaryReplica recipient = new PrimaryReplica(
-                
clusterNodeResolver.getByConsistentId(enlistment.primaryNodeConsistentId()),
-                enlistment.consistencyToken()
-        );
+        InternalClusterNode primaryNode = 
clusterNodeResolver.getByConsistentId(enlistment.primaryNodeConsistentId());
+
+        TxContext txContext = TxContext.readWrite(tx, 
enlistment.consistencyToken());
 
         return new RollbackTxOnErrorPublisher<>(
                 tx,
-                internalTbl.scan(part, tx.id(), tx.commitPartition(), 
tx.coordinatorId(), recipient, null, null, null, 0, null)
+                internalTbl.scan(part, primaryNode, 
OperationContext.create(txContext))
         );
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java
new file mode 100644
index 00000000000..08a658e07b0
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Index scan criteria for range scan and lookup index operations.
+ */
+public interface IndexScanCriteria {
+    /** Creates range scan criteria. */
+    static Range range(@Nullable BinaryTuplePrefix lowerBound, @Nullable 
BinaryTuplePrefix upperBound, int flags) {
+        return new Range(lowerBound, upperBound, flags);
+    }
+
+    /** Creates lookup criteria. */
+    static Lookup lookup(BinaryTuple key) {
+        return new Lookup(key);
+    }
+
+    /** Shortcut to create unbounded range scan criteria. */
+    static Range unbounded() {
+        return Range.UNBOUNDED;
+    }
+
+    /**
+     * Range scan criteria.
+     */
+    class Range implements IndexScanCriteria {
+        private static final Range UNBOUNDED = new Range(null, null, 0);
+
+        private final @Nullable BinaryTuplePrefix lowerBound;
+        private final @Nullable BinaryTuplePrefix upperBound;
+        private final int flags;
+
+        private Range(@Nullable BinaryTuplePrefix lowerBound, @Nullable 
BinaryTuplePrefix upperBound, int flags) {
+            this.lowerBound = lowerBound;
+            this.upperBound = upperBound;
+            this.flags = flags;
+        }
+
+        /** Get lower bound. */
+        public @Nullable BinaryTuplePrefix lowerBound() {
+            return lowerBound;
+        }
+
+        /** Get upper bound. */
+        public @Nullable BinaryTuplePrefix upperBound() {
+            return upperBound;
+        }
+
+        /** Get flags. */
+        public int flags() {
+            return flags;
+        }
+    }
+
+    /**
+     * Lookup criteria.
+     */
+    class Lookup implements IndexScanCriteria {
+        private final BinaryTuple key;
+
+        private Lookup(BinaryTuple key) {
+            this.key = key;
+        }
+
+        /** Get lookup key. */
+        public BinaryTuple key() {
+            return key;
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1f9ab422675..bef61054490 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -30,18 +30,16 @@ import 
org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.BinaryTuple;
-import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Internal table facade provides low-level methods for table operations. The 
facade hides TX/replication protocol over table storage
@@ -327,84 +325,44 @@ public interface InternalTable extends ManuallyCloseable {
      * @return {@link Publisher} that reactively notifies about partition rows.
      * @throws IllegalArgumentException If proposed partition index {@code p} 
is out of bounds.
      */
-    default Publisher<BinaryRow> scan(int partId, @Nullable 
InternalTransaction tx) {
-        return scan(partId, tx, null, null, null, 0, null);
-    }
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-26846 Drop the 
method.
+    @TestOnly
+    @Deprecated(forRemoval = true)
+    Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx);
 
     /**
-     * Scans given partition with the proposed read timestamp, providing 
{@link Publisher} that reactively notifies about partition rows.
+     * Scans given partition, providing {@link Publisher} that reactively 
notifies about partition rows.
      *
      * @param partId The partition.
-     * @param readTimestamp Read timestamp.
      * @param recipientNode Cluster node that will handle given get request.
-     * @param txCoordinatorId Transaction coordinator inconsistent id.
+     * @param operationContext Operation context.
      * @return {@link Publisher} that reactively notifies about partition rows.
      * @throws IllegalArgumentException If proposed partition index {@code p} 
is out of bounds.
      * @throws TransactionException If proposed {@code tx} is read-write. 
Transaction itself won't be automatically rolled back.
      */
-    default Publisher<BinaryRow> scan(
+    Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
             InternalClusterNode recipientNode,
-            UUID txCoordinatorId
-    ) {
-        return scan(partId, txId, readTimestamp, recipientNode, null, null, 
null, 0, null, txCoordinatorId);
-    }
+            OperationContext operationContext
+    );
 
     /**
-     * Lookup rows corresponding to the given key given partition index, 
providing {@link Publisher}
+     * Scans given partition index, providing {@link Publisher}
      * that reactively notifies about partition rows.
      *
      * @param partId The partition.
-     * @param readTimestamp Read timestamp.
      * @param recipientNode Cluster node that will handle given get request.
      * @param indexId Index id.
-     * @param lowerBound Lower search bound.
-     * @param upperBound Upper search bound.
-     * @param flags Control flags. See {@link 
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
-     * @param columnsToInclude Row projection.
-     * @param txCoordinatorId Transaction coordinator inconsistent id.
+     * @param criteria Index scan criteria.
+     * @param operationContext Operation context.
      * @return {@link Publisher} that reactively notifies about partition rows.
      */
     Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
             InternalClusterNode recipientNode,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude,
-            UUID txCoordinatorId
-    );
-
-    /**
-     * Scans given partition index, providing {@link Publisher} that 
reactively notifies about partition rows.
-     *
-     * @param partId The partition.
-     * @param txId Transaction id.
-     * @param commitPartition Commit partition id.
-     * @param txCoordinatorId Transaction coordinator id.
-     * @param recipient Primary replica that will handle given get request.
-     * @param lowerBound Lower search bound.
-     * @param upperBound Upper search bound.
-     * @param flags Control flags. See {@link 
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
-     * @param columnsToInclude Row projection.
-     * @return {@link Publisher} that reactively notifies about partition rows.
-     */
-    Publisher<BinaryRow> scan(
-            int partId,
-            UUID txId,
-            ReplicationGroupId commitPartition,
-            UUID txCoordinatorId,
-            PrimaryReplica recipient,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude
+            int indexId,
+            IndexScanCriteria criteria,
+            OperationContext operationContext
     );
 
     /**
@@ -413,68 +371,17 @@ public interface InternalTable extends ManuallyCloseable {
      * @param partId The partition.
      * @param tx The transaction.
      * @param indexId Index id.
-     * @param lowerBound Lower search bound.
-     * @param upperBound Upper search bound.
-     * @param flags Control flags. See {@link 
org.apache.ignite.internal.storage.index.SortedIndexStorage} constants.
-     * @param columnsToInclude Row projection.
+     * @param criteria Index scan criteria.
      * @return {@link Publisher} that reactively notifies about partition rows.
      */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-26846 Drop the 
method.
+    @TestOnly
+    @Deprecated(forRemoval = true)
     Publisher<BinaryRow> scan(
             int partId,
             @Nullable InternalTransaction tx,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude
-    );
-
-    /**
-     * Scans given partition index, providing {@link Publisher} that 
reactively notifies about partition rows.
-     *
-     * @param partId The partition.
-     * @param readTimestamp Read timestamp.
-     * @param recipientNode Cluster node that will handle given get request.
-     * @param indexId Index id.
-     * @param key Key to search.
-     * @param columnsToInclude Row projection.
-     * @param txCoordinatorId Transaction coordinator id.
-     * @return {@link Publisher} that reactively notifies about partition rows.
-     */
-    Publisher<BinaryRow> lookup(
-            int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
-            InternalClusterNode recipientNode,
-            int indexId,
-            BinaryTuple key,
-            @Nullable BitSet columnsToInclude,
-            UUID txCoordinatorId
-    );
-
-    /**
-     * Lookup rows corresponding to the given key given partition index, 
providing {@link Publisher}
-     * that reactively notifies about partition rows.
-     *
-     * @param partId The partition.
-     * @param txId Transaction id.
-     * @param commitPartition Commit partition id.
-     * @param txCoordinatorId Transaction coordinator id.
-     * @param recipient Primary replica that will handle given get request.
-     * @param indexId Index id.
-     * @param key Key to search.
-     * @param columnsToInclude Row projection.
-     * @return {@link Publisher} that reactively notifies about partition rows.
-     */
-    Publisher<BinaryRow> lookup(
-            int partId,
-            UUID txId,
-            ReplicationGroupId commitPartition,
-            UUID txCoordinatorId,
-            PrimaryReplica recipient,
             int indexId,
-            BinaryTuple key,
-            @Nullable BitSet columnsToInclude
+            IndexScanCriteria.Range criteria
     );
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java
new file mode 100644
index 00000000000..d375a393e89
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.Objects;
+
+/**
+ * Context for table/index partition scan/lookup operation mostly for SQL need.
+ */
+public class OperationContext {
+    /** Creates a new instance of {@link OperationContext}. */
+    public static OperationContext create(TxContext txContext) {
+        return new OperationContext(txContext);
+    }
+
+    /** Transactional context. */
+    private final TxContext txContext;
+
+    private OperationContext(TxContext txContext) {
+        this.txContext = Objects.requireNonNull(txContext);
+    }
+
+    /**
+     * Returns the transactional context.
+     *
+     * @return The transactional context.
+     */
+    public TxContext txContext() {
+        return txContext;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OperationContext that = (OperationContext) o;
+        return Objects.equals(txContext, that.txContext);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(txContext.hashCode());
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
new file mode 100644
index 00000000000..117b058b88e
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.TransactionIds;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Transactional operation context used in scan\lookup table operations.
+ *
+ * @see InternalTable
+ */
+public abstract class TxContext {
+    /** Creates operation context from RO transaction. */
+    public static TxContext readOnly(UUID txId, UUID txCoordinatorId, 
HybridTimestamp readTimestamp) {
+        return new ReadOnly(txId, txCoordinatorId, readTimestamp);
+    }
+
+    /** Creates operation context from RO transaction. For test purposes only. 
*/
+    @TestOnly
+    public static TxContext readOnly(InternalTransaction tx) {
+        assert tx.isReadOnly();
+
+        HybridTimestamp readTimestamp = tx.readTimestamp();
+
+        assert readTimestamp != null;
+
+        return new ReadOnly(tx.id(), tx.coordinatorId(), readTimestamp);
+    }
+
+    /** Creates operation context from RW transaction. */
+    public static TxContext readWrite(
+            UUID txId,
+            UUID txCoordinatorId,
+            ReplicationGroupId commitPartition,
+            long enlistmentConsistencyToken
+    ) {
+        return new ReadWrite(txId, txCoordinatorId, commitPartition, 
enlistmentConsistencyToken);
+    }
+
+    /** Creates operation context from RW transaction. For test purposes only. 
*/
+    @TestOnly
+    public static TxContext readWrite(InternalTransaction tx, long 
enlistmentConsistencyToken) {
+        assert !tx.isReadOnly();
+
+        return new ReadWrite(tx.id(), tx.coordinatorId(), 
tx.commitPartition(), enlistmentConsistencyToken);
+    }
+
+    protected final UUID txId;
+    protected final UUID coordinatorId;
+
+    protected TxContext(UUID txId, UUID coordinatorId) {
+        Objects.requireNonNull(txId, "Transaction id is mandatory");
+        Objects.requireNonNull(coordinatorId, "Transaction coordinator id is 
mandatory");
+
+        this.txId = txId;
+        this.coordinatorId = coordinatorId;
+    }
+
+    /** Returns {@code true} for read only transaction, {@code false} 
otherwise. */
+    public abstract boolean isReadOnly();
+
+    /** Returns transaction id. */
+    public UUID txId() {
+        return txId;
+    }
+
+    /** Returns transaction coordinator id. */
+    public UUID coordinatorId() {
+        return coordinatorId;
+    }
+
+    /** Read-only transaction context. */
+    public static class ReadOnly extends TxContext {
+        private final HybridTimestamp readTimestamp;
+
+        private ReadOnly(UUID txId, UUID txCoordinatorId, HybridTimestamp 
readTimestamp) {
+            super(txId, txCoordinatorId);
+
+            this.readTimestamp = readTimestamp;
+        }
+
+        @Override
+        public boolean isReadOnly() {
+            return true;
+        }
+
+        /** Returns transaction read timestamp. */
+        public HybridTimestamp readTimestamp() {
+            return readTimestamp;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            ReadOnly txCtx = (ReadOnly) o;
+            return Objects.equals(txId, txCtx.txId)
+                    && Objects.equals(coordinatorId, txCtx.coordinatorId)
+                    && Objects.equals(readTimestamp, txCtx.readTimestamp);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(txId);
+        }
+
+        @Override
+        public String toString() {
+            return IgniteToStringBuilder.toString(this);
+        }
+    }
+
+    /** Read-write transaction context. */
+    public static class ReadWrite extends TxContext {
+        private final ReplicationGroupId commitPartition;
+        private final long enlistmentConsistencyToken;
+
+        private ReadWrite(UUID txId, UUID txCoordinatorId, ReplicationGroupId 
commitPartition, long enlistmentConsistencyToken) {
+            super(txId, txCoordinatorId);
+
+            Objects.requireNonNull(commitPartition, "Commit partition is 
mandatory for RW transaction");
+
+            if (enlistmentConsistencyToken < 0) {
+                throw new IllegalArgumentException("Consistency token 
partition is mandatory for RW transaction");
+            }
+
+            this.commitPartition = commitPartition;
+            this.enlistmentConsistencyToken = enlistmentConsistencyToken;
+        }
+
+        @Override
+        public boolean isReadOnly() {
+            return false;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ReadWrite opCtx = (ReadWrite) o;
+            return enlistmentConsistencyToken == 
opCtx.enlistmentConsistencyToken
+                    && Objects.equals(txId, opCtx.txId)
+                    && Objects.equals(coordinatorId, opCtx.coordinatorId)
+                    && Objects.equals(commitPartition, opCtx.commitPartition);
+        }
+
+        /** Returns transaction commit partition. */
+        public ReplicationGroupId commitPartition() {
+            return commitPartition;
+        }
+
+        /** Returns transaction begin timestamp. */
+        public HybridTimestamp beginTimestamp() {
+            return TransactionIds.beginTimestamp(txId);
+        }
+
+        /** Returns enlistment consistency token. */
+        public long enlistmentConsistencyToken() {
+            return enlistmentConsistencyToken;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(txId);
+        }
+
+        @Override
+        public String toString() {
+            return IgniteToStringBuilder.toString(this);
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 530f397a6b7..ea666067e76 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -116,8 +116,11 @@ import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
+import org.apache.ignite.internal.table.TxContext;
 import 
org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
 import org.apache.ignite.internal.tx.InternalTransaction;
@@ -129,12 +132,12 @@ import 
org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
-import org.apache.ignite.internal.utils.PrimaryReplica;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.QualifiedNameHelper;
 import org.apache.ignite.tx.TransactionException;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Storage of table rows.
@@ -1562,139 +1565,125 @@ public class InternalTableImpl implements 
InternalTable {
     }
 
     @Override
-    public Publisher<BinaryRow> lookup(
+    public Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
             InternalClusterNode recipientNode,
-            int indexId,
-            BinaryTuple key,
-            @Nullable BitSet columnsToInclude,
-            UUID txCoordinatorId
+            OperationContext operationContext
     ) {
-        return readOnlyScan(partId, txId, readTimestamp, recipientNode, 
indexId, key, null, null, 0, columnsToInclude, txCoordinatorId);
-    }
+        validatePartitionIndex(partId);
 
-    @Override
-    public Publisher<BinaryRow> lookup(
-            int partId,
-            UUID txId,
-            ReplicationGroupId commitPartition,
-            UUID coordinatorId,
-            PrimaryReplica recipient,
-            int indexId,
-            BinaryTuple key,
-            @Nullable BitSet columnsToInclude
-    ) {
-        return readWriteScan(
-                partId,
-                txId,
-                commitPartition,
-                coordinatorId,
-                recipient,
-                indexId,
-                key,
-                null,
-                null,
-                0,
-                columnsToInclude
-        );
+        if (operationContext.txContext().isReadOnly()) {
+            return readOnlyScan(
+                    partId,
+                    recipientNode,
+                    null,
+                    null,
+                    null,
+                    operationContext
+            );
+        } else {
+            return readWriteScan(
+                    partId,
+                    recipientNode,
+                    null,
+                    null,
+                    operationContext
+            );
+        }
     }
 
     @Override
     public Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
             InternalClusterNode recipientNode,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude,
-            UUID txCoordinatorId
+            int indexId,
+            IndexScanCriteria criteria,
+            OperationContext operationContext
     ) {
-        return readOnlyScan(
-                partId,
-                txId,
-                readTimestamp,
-                recipientNode,
-                indexId,
-                null,
-                lowerBound,
-                upperBound,
-                flags,
-                columnsToInclude,
-                txCoordinatorId
-        );
+        validatePartitionIndex(partId);
+
+        if (operationContext.txContext().isReadOnly()) {
+            return readOnlyScan(
+                    partId,
+                    recipientNode,
+                    null,
+                    null,
+                    null,
+                    operationContext
+            );
+        } else {
+            return readWriteScan(
+                    partId,
+                    recipientNode,
+                    null,
+                    null,
+                    operationContext
+            );
+        }
     }
 
+    @TestOnly
     @Override
     public Publisher<BinaryRow> scan(
             int partId,
-            @Nullable InternalTransaction tx,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude
+            @Nullable InternalTransaction tx
     ) {
-        return readWriteScan(partId, tx, indexId, null, lowerBound, 
upperBound, flags, columnsToInclude);
+        validatePartitionIndex(partId);
+
+        InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
+
+        return readWriteScan(partId, actualTx, null, null);
     }
 
+    @TestOnly
     @Override
     public Publisher<BinaryRow> scan(
             int partId,
-            UUID txId,
-            ReplicationGroupId commitPartition,
-            UUID coordinatorId,
-            PrimaryReplica recipient,
-            @Nullable Integer indexId,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude
+            @Nullable InternalTransaction tx,
+            int indexId,
+            IndexScanCriteria.Range criteria
     ) {
-        return readWriteScan(
-                partId,
-                txId,
-                commitPartition,
-                coordinatorId,
-                recipient,
-                indexId,
-                null,
-                lowerBound,
-                upperBound,
-                flags,
-                columnsToInclude
-        );
+
+        validatePartitionIndex(partId);
+
+        InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
+
+        assert !actualTx.isReadOnly();
+
+        return readWriteScan(partId, actualTx, indexId, criteria);
     }
 
     private Publisher<BinaryRow> readOnlyScan(
             int partId,
-            UUID txId,
-            HybridTimestamp readTimestamp,
             InternalClusterNode recipientNode,
             @Nullable Integer indexId,
-            @Nullable BinaryTuple exactKey,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
+            @Nullable IndexScanCriteria criteria,
             @Nullable BitSet columnsToInclude,
-            UUID txCoordinatorId
+            OperationContext opCtx
     ) {
-        validatePartitionIndex(partId);
+        assert opCtx.txContext().isReadOnly();
+
+        boolean rangeScan = criteria instanceof IndexScanCriteria.Range;
+        boolean lookup = criteria instanceof IndexScanCriteria.Lookup;
+
+        BinaryTuple exactKey = lookup ? ((IndexScanCriteria.Lookup) 
criteria).key() : null;
+        BinaryTuplePrefix lowerBound = rangeScan ? ((IndexScanCriteria.Range) 
criteria).lowerBound() : null;
+        BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range) 
criteria).upperBound() : null;
+        int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() : 
0;
+
+        TxContext.ReadOnly txContext = (TxContext.ReadOnly) opCtx.txContext();
 
         ReplicationGroupId replicationGroupId = 
targetReplicationGroupId(partId);
 
-        return new PartitionScanPublisher<>(new 
ReadOnlyInflightBatchRequestTracker(transactionInflights, txId)) {
+        return new PartitionScanPublisher<>(new 
ReadOnlyInflightBatchRequestTracker(transactionInflights, txContext.txId())) {
             @Override
             protected CompletableFuture<Collection<BinaryRow>> 
retrieveBatch(long scanId, int batchSize) {
                 ReadOnlyScanRetrieveBatchReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest()
                         
.groupId(serializeReplicationGroupId(replicationGroupId))
                         .tableId(tableId)
-                        .readTimestamp(readTimestamp)
-                        .transactionId(txId)
+                        .transactionId(txContext.txId())
+                        .coordinatorId(txContext.coordinatorId())
+                        .readTimestamp(txContext.readTimestamp())
                         .scanId(scanId)
                         .batchSize(batchSize)
                         .indexToUse(indexId)
@@ -1703,7 +1692,6 @@ public class InternalTableImpl implements InternalTable {
                         .upperBoundPrefix(binaryTupleMessage(upperBound))
                         .flags(flags)
                         .columnsToInclude(columnsToInclude)
-                        .coordinatorId(txCoordinatorId)
                         .build();
 
                 return replicaSvc.invoke(recipientNode, request);
@@ -1712,7 +1700,7 @@ public class InternalTableImpl implements InternalTable {
             @Override
             protected CompletableFuture<Void> onClose(boolean 
intentionallyClose, long scanId, @Nullable Throwable th) {
                 return completeScan(
-                        txId,
+                        txContext.txId(),
                         replicationGroupId,
                         scanId,
                         th,
@@ -1725,13 +1713,9 @@ public class InternalTableImpl implements InternalTable {
 
     private Publisher<BinaryRow> readWriteScan(
             int partId,
-            @Nullable InternalTransaction tx,
+            InternalTransaction tx,
             @Nullable Integer indexId,
-            @Nullable BinaryTuple exactKey,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude
+            @Nullable IndexScanCriteria.Range criteria
     ) {
         // Check whether proposed tx is read-only. Complete future 
exceptionally if true.
         // Attempting to enlist a read-only in a read-write transaction does 
not corrupt the transaction itself, thus read-write transaction
@@ -1745,22 +1729,26 @@ public class InternalTableImpl implements InternalTable 
{
 
         validatePartitionIndex(partId);
 
-        InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
+        assert indexId == null || criteria != null;
+
+        BinaryTuplePrefix lowerBound = indexId == null ? null : 
criteria.lowerBound();
+        BinaryTuplePrefix upperBound = indexId == null ? null : 
criteria.upperBound();
+        int flags = indexId == null ? 0 : criteria.flags();
 
         return new 
PartitionScanPublisher<>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) {
             @Override
             protected CompletableFuture<Collection<BinaryRow>> 
retrieveBatch(long scanId, int batchSize) {
                 return enlistCursorInTx(
-                        actualTx,
+                        tx,
                         partId,
                         scanId,
                         batchSize,
                         indexId,
-                        exactKey,
+                        null,
                         lowerBound,
                         upperBound,
                         flags,
-                        columnsToInclude
+                        null
                 );
             }
 
@@ -1768,7 +1756,7 @@ public class InternalTableImpl implements InternalTable {
             protected CompletableFuture<Void> onClose(boolean 
intentionallyClose, long scanId, @Nullable Throwable th) {
                 CompletableFuture<Void> opFut;
 
-                if (actualTx.implicit()) {
+                if (tx.implicit()) {
                     opFut = completedOrFailedFuture(null, th);
                 } else {
                     var replicationGrpId = targetReplicationGroupId(partId);
@@ -1787,26 +1775,33 @@ public class InternalTableImpl implements InternalTable 
{
                 return postEnlist(
                         opFut,
                         intentionallyClose,
-                        actualTx,
-                        actualTx.implicit() && !intentionallyClose
+                        tx,
+                        tx.implicit() && !intentionallyClose
                 );
             }
         };
     }
 
+    @TestOnly
     private Publisher<BinaryRow> readWriteScan(
             int partId,
-            UUID txId,
-            ReplicationGroupId commitPartition,
-            UUID coordinatorId,
-            PrimaryReplica recipient,
+            InternalClusterNode recipient,
             @Nullable Integer indexId,
-            @Nullable BinaryTuple exactKey,
-            @Nullable BinaryTuplePrefix lowerBound,
-            @Nullable BinaryTuplePrefix upperBound,
-            int flags,
-            @Nullable BitSet columnsToInclude
+            @Nullable IndexScanCriteria criteria,
+            OperationContext opCtx
     ) {
+        assert !opCtx.txContext().isReadOnly();
+
+        boolean rangeScan = criteria instanceof IndexScanCriteria.Range;
+        boolean lookup = criteria instanceof IndexScanCriteria.Lookup;
+
+        BinaryTuple exactKey = lookup ? ((IndexScanCriteria.Lookup) 
criteria).key() : null;
+        BinaryTuplePrefix lowerBound = rangeScan ? ((IndexScanCriteria.Range) 
criteria).lowerBound() : null;
+        BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range) 
criteria).upperBound() : null;
+        int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() : 
0;
+
+        TxContext.ReadWrite txContext = (TxContext.ReadWrite) 
opCtx.txContext();
+
         ReplicationGroupId replicationGroupId = 
targetReplicationGroupId(partId);
 
         return new 
PartitionScanPublisher<>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) {
@@ -1815,28 +1810,27 @@ public class InternalTableImpl implements InternalTable 
{
                 ReadWriteScanRetrieveBatchReplicaRequest request = 
TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest()
                         
.groupId(serializeReplicationGroupId(replicationGroupId))
                         .tableId(tableId)
-                        .timestamp(TransactionIds.beginTimestamp(txId))
-                        .transactionId(txId)
+                        .transactionId(txContext.txId())
+                        .coordinatorId(txContext.coordinatorId())
+                        .timestamp(txContext.beginTimestamp())
+                        
.commitPartitionId(serializeReplicationGroupId(txContext.commitPartition()))
+                        
.enlistmentConsistencyToken(txContext.enlistmentConsistencyToken())
                         .scanId(scanId)
                         .indexToUse(indexId)
                         .exactKey(binaryTupleMessage(exactKey))
                         .lowerBoundPrefix(binaryTupleMessage(lowerBound))
                         .upperBoundPrefix(binaryTupleMessage(upperBound))
                         .flags(flags)
-                        .columnsToInclude(columnsToInclude)
                         .batchSize(batchSize)
-                        
.enlistmentConsistencyToken(recipient.enlistmentConsistencyToken())
                         .full(false) // Set explicitly.
-                        
.commitPartitionId(serializeReplicationGroupId(commitPartition))
-                        .coordinatorId(coordinatorId)
                         .build();
 
-                return replicaSvc.invoke(recipient.node(), request);
+                return replicaSvc.invoke(recipient, request);
             }
 
             @Override
             protected CompletableFuture<Void> onClose(boolean 
intentionallyClose, long scanId, @Nullable Throwable th) {
-                return completeScan(txId, replicationGroupId, scanId, th, 
recipient.node().name(), intentionallyClose);
+                return completeScan(txContext.txId(), replicationGroupId, 
scanId, th, recipient.name(), intentionallyClose);
             }
         };
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 180be47cce7..f065f652747 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.schema.BinaryRowEx;
 import org.apache.ignite.internal.schema.NullBinaryRow;
 import org.apache.ignite.internal.sql.SqlCommon;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.table.IndexScanCriteria;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.metrics.TableMetricSource;
@@ -427,7 +428,7 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
         DELETE_ALL((table, tx) -> table.deleteAll(List.of(createBinaryRow()), 
tx)),
         DELETE_ALL_EXACT((table, tx) -> 
table.deleteAllExact(List.of(createBinaryRow()), tx)),
         SCAN_MV_STORAGE(adaptScan((table, tx) -> table.scan(0, tx))),
-        SCAN_INDEX(adaptScan((table, tx) -> table.scan(0, tx, 1, null, null, 
0, null)));
+        SCAN_INDEX(adaptScan((table, tx) -> table.scan(0, tx, 1, 
IndexScanCriteria.unbounded())));
 
         private final BiFunction<InternalTable, InternalTransaction, 
CompletableFuture<?>> action;
 
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 55c6281a3be..08b345b4939 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaTestUtils;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
@@ -118,6 +119,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
@@ -1465,15 +1467,13 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
                 ?
                 internalTable.scan(
                         0,
-                        internalTx.id(),
-                        internalTx.readTimestamp(),
                         ReplicaTestUtils.leaderAssignment(
                                 
txTestCluster.replicaManagers().get(txTestCluster.localNodeName()),
                                 
txTestCluster.clusterServices().get(txTestCluster.localNodeName()).topologyService(),
                                 colocationEnabled() ? internalTable.zoneId() : 
internalTable.tableId(),
                                 0
                         ),
-                        internalTx.coordinatorId()
+                        OperationContext.create(TxContext.readOnly(internalTx))
                 )
                 : internalTable.scan(0, internalTx);
 
@@ -2106,10 +2106,8 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertThrowsTxFinishedException(() -> {
             Flow.Publisher<BinaryRow> pub = accounts.internalTable().scan(
                     0,
-                    internalTx.id(),
-                    internalTx.readTimestamp(),
                     new ClusterNodeImpl(UUID.randomUUID(), "node", new 
NetworkAddress("localhost", 123)),
-                    internalTx.coordinatorId()
+                    OperationContext.create(TxContext.readOnly(internalTx))
             );
 
             AtomicReference<Throwable> errorRef = new AtomicReference<>();
@@ -2122,16 +2120,13 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         });
 
         assertThrowsTxFinishedException(() -> {
-            Flow.Publisher<BinaryRow> pub = accounts.internalTable().lookup(
+            Flow.Publisher<BinaryRow> pub = accounts.internalTable().scan(
                     0,
-                    internalTx.id(),
-                    internalTx.readTimestamp(),
                     new ClusterNodeImpl(UUID.randomUUID(), "node", new 
NetworkAddress("localhost", 123)),
                     0,
-                    // Binary tuple is null for testing purposes, assuming 
that it wouldn't be processed anyway.
-                    null,
-                    null,
-                    internalTx.coordinatorId()
+                    // We assume that BinaryTuple will never be accessed.
+                    IndexScanCriteria.lookup(Mockito.mock(BinaryTuple.class)),
+                    OperationContext.create(TxContext.readOnly(internalTx))
             );
 
             AtomicReference<Throwable> errorRef = new AtomicReference<>();
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
index 7bfeb9e7848..56bdf31dd4e 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
@@ -79,8 +79,10 @@ import 
org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaR
 import 
org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
@@ -1085,7 +1087,9 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
                     node(0).cluster().nodes().stream().filter(node -> 
node.name().equals(primary)).findAny().get()
             );
 
-            publisher = tbl.internalTable().scan(PART_ID, tx.id(), 
tx.readTimestamp(), primaryNode, tx.coordinatorId());
+            OperationContext operationContext = 
OperationContext.create(TxContext.readOnly(tx));
+
+            publisher = tbl.internalTable().scan(PART_ID, primaryNode, 
operationContext);
         } else {
             publisher = tbl.internalTable().scan(PART_ID, tx);
         }

Reply via email to