This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-26913 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 38c190969d802d83a185fd07deb4a1ef76740e33 Author: amashenkov <[email protected]> AuthorDate: Mon Nov 3 16:55:14 2025 +0300 wip --- .../ignite/client/fakes/FakeInternalTable.java | 67 +--- .../testframework/matchers/DelegatingMatcher.java | 55 ++++ .../ItPrimaryReplicaChoiceTest.java | 27 +- .../ItTruncateRaftLogAndRestartNodesTest.java | 6 +- .../ignite/internal/table/ItInternalTableTest.java | 4 +- .../ignite/internal/table/ItTableScanTest.java | 144 ++++----- .../sql/engine/exec/ScannableTableImpl.java | 139 +++----- .../engine/exec/rel/ScannableTableSelfTest.java | 352 +++++++-------------- .../exec/rel/TableScanNodeExecutionTest.java | 14 +- .../sql/engine/framework/NoOpTransaction.java | 5 +- .../ItInternalTableReadOnlyScanTest.java | 4 +- .../ItInternalTableReadWriteScanTest.java | 12 +- .../ignite/internal/table/IndexScanCriteria.java | 90 ++++++ .../ignite/internal/table/InternalTable.java | 135 ++------ .../ignite/internal/table/OperationContext.java | 60 ++++ .../apache/ignite/internal/table/TxContext.java | 197 ++++++++++++ .../distributed/storage/InternalTableImpl.java | 248 +++++++-------- .../distributed/storage/InternalTableImplTest.java | 3 +- .../ignite/internal/table/TxAbstractTest.java | 21 +- .../tx/distributed/ItTransactionRecoveryTest.java | 6 +- 20 files changed, 822 insertions(+), 767 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index 381b8de9bca..a7d0ac8319c 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -52,16 +52,16 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.BinaryTuple; -import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.schema.ColumnsExtractor; import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.table.IndexScanCriteria; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.StreamerReceiverRunner; import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.PendingComparableValuesTracker; -import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.table.DataStreamerReceiverDescriptor; import org.apache.ignite.table.QualifiedName; @@ -405,12 +405,7 @@ public class FakeInternalTable implements InternalTable, StreamerReceiverRunner @Override public Publisher<BinaryRow> scan( int partId, - @Nullable InternalTransaction tx, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - BitSet columnsToInclude + @Nullable InternalTransaction tx ) { throw new IgniteInternalException(new OperationNotSupportedException()); } @@ -418,15 +413,9 @@ public class FakeInternalTable implements InternalTable, StreamerReceiverRunner @Override public Publisher<BinaryRow> scan( int partId, - UUID txId, - ReplicationGroupId commitPartition, - UUID txCoordinatorId, - PrimaryReplica recipient, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude + @Nullable InternalTransaction tx, + int indexId, + IndexScanCriteria.Range criteria ) { throw new IgniteInternalException(new OperationNotSupportedException()); } @@ -434,55 +423,21 @@ public class FakeInternalTable implements InternalTable, StreamerReceiverRunner @Override public Publisher<BinaryRow> scan( int partId, - UUID txId, - HybridTimestamp readTimestamp, - InternalClusterNode recipientNode, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude, - UUID txCoordinatorId) { - throw new IgniteInternalException(new OperationNotSupportedException()); - } - - @Override - public Publisher<BinaryRow> scan( - int partId, - UUID txId, - HybridTimestamp readTimestamp, InternalClusterNode recipientNode, - UUID txCoordinatorId - ) { - return null; - } - - @Override - public Publisher<BinaryRow> lookup( - int partId, - UUID txId, - ReplicationGroupId commitPartition, - UUID txCoordinatorId, - PrimaryReplica recipient, int indexId, - BinaryTuple key, - @Nullable BitSet columnsToInclude + IndexScanCriteria criteria, + OperationContext operationContext ) { throw new IgniteInternalException(new OperationNotSupportedException()); } @Override - public Publisher<BinaryRow> lookup( + public Publisher<BinaryRow> scan( int partId, - UUID txId, - HybridTimestamp readTimestamp, InternalClusterNode recipientNode, - int indexId, - BinaryTuple key, - @Nullable BitSet columnsToInclude, - UUID txCoordinatorId + OperationContext operationContext ) { - throw new IgniteInternalException(new OperationNotSupportedException()); + return null; } @Override public TxStateStorage txStateStorage() { diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java new file mode 100644 index 00000000000..fad7c53915b --- /dev/null +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/DelegatingMatcher.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.testframework.matchers; + +import java.util.function.Function; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matches an object against a delegate matcher, transforming the object before matching. + * + * @param <O> Object type. + * @param <T> Transformed object type. + */ +public class DelegatingMatcher<O, T> extends TypeSafeMatcher<O> { + /** Creates a new matcher instance. */ + public static <O, T> DelegatingMatcher<O, T> has(Function<O, T> transformer, Matcher<T> delegate) { + return new DelegatingMatcher<>(transformer, delegate); + } + + private final Matcher<T> delegate; + + private final Function<O, T> transformer; + + private DelegatingMatcher(Function<O, T> transformer, Matcher<T> delegate) { + this.delegate = delegate; + this.transformer = transformer; + } + + @Override + protected boolean matchesSafely(O item) { + return delegate.matches(transformer.apply(item)); + } + + @Override + public void describeTo(Description description) { + delegate.describeTo(description); + } +} diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java index c5c6fb7fbda..e93028865c9 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java @@ -68,13 +68,15 @@ import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; import org.apache.ignite.internal.storage.impl.schema.TestProfileConfigurationSchema; import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage; +import org.apache.ignite.internal.table.IndexScanCriteria; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.flow.TestFlowUtils; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl; -import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.internal.wrapper.Wrappers; import org.apache.ignite.table.Table; import org.apache.ignite.table.Tuple; @@ -340,19 +342,19 @@ public class ItPrimaryReplicaChoiceTest extends ClusterPerTestIntegrationTest { node(0).cluster().nodes().stream().filter(node -> node.id().equals(primaryId)).findAny().get() ); + OperationContext opCtx = OperationContext.create(TxContext.readOnly(tx)); + if (idxId == null) { - publisher = internalTable.scan(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, tx.coordinatorId()); + publisher = internalTable.scan(PART_ID, primaryNode, opCtx); } else if (exactKey == null) { - publisher = internalTable.scan(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, idxId, null, null, 0, null, - tx.coordinatorId()); + publisher = internalTable.scan(PART_ID, primaryNode, idxId, IndexScanCriteria.unbounded(), opCtx); } else { - publisher = internalTable.lookup(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, idxId, exactKey, null, - tx.coordinatorId()); + publisher = internalTable.scan(PART_ID, primaryNode, idxId, IndexScanCriteria.lookup(exactKey), opCtx); } } else if (idxId == null) { publisher = unwrappedTable.internalTable().scan(PART_ID, tx); } else if (exactKey == null) { - publisher = unwrappedTable.internalTable().scan(PART_ID, tx, idxId, null, null, 0, null); + publisher = unwrappedTable.internalTable().scan(PART_ID, tx, idxId, IndexScanCriteria.unbounded()); } else { ReadWriteTransactionImpl rwTx = Wrappers.unwrap(tx, ReadWriteTransactionImpl.class); @@ -371,15 +373,12 @@ public class ItPrimaryReplicaChoiceTest extends ClusterPerTestIntegrationTest { node(0).cluster().nodes().stream().filter(node -> node.id().equals(primaryId)).findAny().get() ); - publisher = unwrappedTable.internalTable().lookup( + publisher = unwrappedTable.internalTable().scan( PART_ID, - rwTx.id(), - rwTx.commitPartition(), - rwTx.coordinatorId(), - new PrimaryReplica(primaryNode, primaryReplicaFut.get().getStartTime().longValue()), + primaryNode, idxId, - exactKey, - null + IndexScanCriteria.lookup(exactKey), + OperationContext.create(TxContext.readWrite(rwTx, primaryReplicaFut.get().getStartTime().longValue())) ); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java index 193a4f55bf6..955b83a8c36 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java @@ -60,7 +60,9 @@ import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.Column; import org.apache.ignite.internal.schema.SchemaDescriptor; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tostring.IgniteToStringInclude; @@ -339,10 +341,8 @@ public class ItTruncateRaftLogAndRestartNodesTest extends ClusterPerTestIntegrat return internalTableImpl.scan( partitionId, - roTx.id(), - roTx.readTimestamp(), recipientNode, - roTx.coordinatorId() + OperationContext.create(TxContext.readOnly(roTx)) ); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java index 437957138a2..e2bfb980af6 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItInternalTableTest.java @@ -549,8 +549,10 @@ public class ItInternalTableTest extends ClusterPerClassIntegrationTest { InternalTransaction roTx = (InternalTransaction) node.transactions().begin(new TransactionOptions().readOnly(true)); + OperationContext operationContext = OperationContext.create(TxContext.readOnly(roTx)); + for (int i = 0; i < parts; i++) { - Publisher<BinaryRow> res = internalTable.scan(i, roTx.id(), node.clock().now(), node.node(), roTx.coordinatorId()); + Publisher<BinaryRow> res = internalTable.scan(i, node.node(), operationContext); res.subscribe(new Subscriber<>() { @Override diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index c6f160be999..1cb99c0204e 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -81,7 +81,6 @@ import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; -import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.KeyValueView; @@ -233,21 +232,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { List<BinaryRow> scannedRows = new ArrayList<>(); - PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx1); + ReplicationGroupId replicationGroupId = replicationGroup(PART_ID); + PendingTxPartitionEnlistment enlistment = tx1.enlistedPartition(replicationGroupId); + InternalClusterNode recipient = getNodeByConsistentId(enlistment.primaryNodeConsistentId()); Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>( tx1, internalTable.scan( PART_ID, - tx1.id(), - tx1.commitPartition(), - tx1.coordinatorId(), recipient, sortedIndexId, - null, - null, - 0, - null + IndexScanCriteria.unbounded(), + OperationContext.create(TxContext.readWrite(tx1, enlistment.consistencyToken())) ) ); @@ -293,7 +289,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { List<BinaryRow> scannedRows = new ArrayList<>(); - Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, sortedIndexId, null, null, 0, null); + Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, sortedIndexId, IndexScanCriteria.unbounded()); CompletableFuture<Void> scanned = new CompletableFuture<>(); @@ -469,7 +465,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { List<BinaryRow> scannedRows = new ArrayList<>(); - Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null, null, null, null, 0, null); + Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null); CompletableFuture<Void> scanned = new CompletableFuture<>(); @@ -505,7 +501,7 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { assertEquals(ROW_IDS.size(), scannedRows.size()); - var pub = internalTable.scan(PART_ID, null, null, null, null, 0, null); + var pub = internalTable.scan(PART_ID, null); assertEquals(ROW_IDS.size() + txOpFut.get(), scanAllRows(pub).size()); } @@ -520,21 +516,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false); - PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx); + ReplicationGroupId replicationGroupId = replicationGroup(PART_ID); + PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(replicationGroupId); + InternalClusterNode recipient = getNodeByConsistentId(enlistment.primaryNodeConsistentId()); Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>( tx, internalTable.scan( PART_ID, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), recipient, sortedIndexId, - null, - null, - 0, - null + IndexScanCriteria.unbounded(), + OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken())) ) ); @@ -569,15 +562,10 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { tx, internalTable.scan( PART_ID, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), recipient, sortedIndexId, - null, - null, - 0, - null + IndexScanCriteria.unbounded(), + OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken())) ) ); @@ -594,31 +582,29 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { public void testScanWithUpperBound() throws Exception { KeyValueView<Tuple, Tuple> kvView = table.keyValueView(); - BinaryTuplePrefix lowBound = BinaryTuplePrefix.fromBinaryTuple( + BinaryTuplePrefix lowerBound = BinaryTuplePrefix.fromBinaryTuple( new BinaryTuple(1, new BinaryTupleBuilder(1).appendInt(5).build()) ); BinaryTuplePrefix upperBound = BinaryTuplePrefix.fromBinaryTuple( new BinaryTuple(1, new BinaryTupleBuilder(1).appendInt(9).build()) ); - int soredIndexId = getSortedIndexId(); + int sortedIndexId = getSortedIndexId(); InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false, LONG_RUNNING_TX_TIMEOUT_MILLIS); - PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx); + + ReplicationGroupId replicationGroupId = replicationGroup(PART_ID); + PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(replicationGroupId); + InternalClusterNode recipient = getNodeByConsistentId(enlistment.primaryNodeConsistentId()); Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>( tx, internalTable.scan( PART_ID, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), recipient, - soredIndexId, - lowBound, - upperBound, - LESS_OR_EQUAL | GREATER_OR_EQUAL, - null + sortedIndexId, + IndexScanCriteria.range(lowerBound, upperBound, LESS_OR_EQUAL | GREATER_OR_EQUAL), + OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken())) ) ); @@ -644,15 +630,10 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { tx, internalTable.scan( PART_ID, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), recipient, - soredIndexId, - lowBound, - upperBound, - LESS_OR_EQUAL | GREATER_OR_EQUAL, - null + sortedIndexId, + IndexScanCriteria.range(lowerBound, upperBound, LESS_OR_EQUAL | GREATER_OR_EQUAL), + OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken())) ) ); @@ -669,11 +650,8 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { Publisher<BinaryRow> publisher2 = internalTable.scan( PART_ID, null, - soredIndexId, - lowBound, - upperBound, - LESS_OR_EQUAL | GREATER_OR_EQUAL, - null + sortedIndexId, + IndexScanCriteria.range(lowerBound, upperBound, LESS_OR_EQUAL | GREATER_OR_EQUAL) ); List<BinaryRow> scannedRows2 = scanAllRows(publisher2); @@ -697,21 +675,18 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false); try { - PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx); + ReplicationGroupId replicationGroupId = replicationGroup(PART_ID); + PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(replicationGroupId); + InternalClusterNode recipient = getNodeByConsistentId(enlistment.primaryNodeConsistentId()); Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>( tx, internalTable.scan( PART_ID, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), recipient, sortedIndexId, - null, - null, - 0, - null + IndexScanCriteria.unbounded(), + OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken())) ) ); @@ -740,15 +715,10 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { tx, internalTable.scan( PART_ID, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), recipient, sortedIndexId, - null, - null, - 0, - null + IndexScanCriteria.unbounded(), + OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken())) ) ); @@ -803,13 +773,13 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { tx = (InternalTransaction) CLUSTER.aliveNode().transactions().begin(new TransactionOptions().readOnly(true)); - publisher = internalTable.scan(PART_ID, tx.id(), ignite.clock().now(), recipientNode, tx.coordinatorId()); + publisher = internalTable.scan(PART_ID, recipientNode, OperationContext.create(TxContext.readOnly(tx))); } else { if (!implicit) { tx = (InternalTransaction) CLUSTER.aliveNode().transactions().begin(); } - publisher = internalTable.scan(PART_ID, tx, null, null, null, 0, null); + publisher = internalTable.scan(PART_ID, tx); } CompletableFuture<Void> scanned = new CompletableFuture<>(); @@ -866,25 +836,25 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { .map(ClusterNodeImpl::fromPublicClusterNode) .orElseThrow(); + OperationContext operationContext = OperationContext.create(TxContext.readOnly(tx)); + //noinspection DataFlowIssue - publisher = internalTable.scan(PART_ID, tx.id(), tx.readTimestamp(), node0, sortedIndexId, null, null, 0, null, - tx.coordinatorId()); + publisher = internalTable.scan(PART_ID, node0, sortedIndexId, IndexScanCriteria.unbounded(), operationContext); } else { - PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx); + ReplicationGroupId replicationGroupId = replicationGroup(PART_ID); + PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(replicationGroupId); + InternalClusterNode recipient = getNodeByConsistentId(enlistment.primaryNodeConsistentId()); + + OperationContext operationContext = OperationContext.create(TxContext.readWrite(tx, enlistment.consistencyToken())); publisher = new RollbackTxOnErrorPublisher<>( tx, internalTable.scan( PART_ID, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), recipient, sortedIndexId, - null, - null, - 0, - null + IndexScanCriteria.unbounded(), + operationContext ) ); } @@ -898,22 +868,20 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { } } - private PrimaryReplica getPrimaryReplica(int partId, InternalTransaction tx) { - ReplicationGroupId replicationGroupId = colocationEnabled() - ? new ZonePartitionId(table.zoneId(), partId) - : new TablePartitionId(table.tableId(), partId); - - PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(replicationGroupId); - + private static InternalClusterNode getNodeByConsistentId(String nodeConsistentId) { IgniteImpl ignite = unwrapIgniteImpl(CLUSTER.aliveNode()); - InternalClusterNode primaryNode = ignite.cluster().nodes().stream() - .filter(n -> n.name().equals(enlistment.primaryNodeConsistentId())) + return ignite.cluster().nodes().stream() + .filter(n -> n.name().equals(nodeConsistentId)) .map(ClusterNodeImpl::fromPublicClusterNode) .findAny() .orElseThrow(); + } - return new PrimaryReplica(primaryNode, enlistment.consistencyToken()); + private ReplicationGroupId replicationGroup(int partId) { + return colocationEnabled() + ? new ZonePartitionId(table.zoneId(), partId) + : new TablePartitionId(table.tableId(), partId); } /** diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java index 76010181f03..2256e7047fe 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ScannableTableImpl.java @@ -18,23 +18,28 @@ package org.apache.ignite.internal.sql.engine.exec; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL; +import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory; import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition; +import org.apache.ignite.internal.table.IndexScanCriteria; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.subscription.TransformingPublisher; -import org.apache.ignite.internal.utils.PrimaryReplica; import org.jetbrains.annotations.Nullable; /** @@ -56,35 +61,15 @@ public class ScannableTableImpl implements ScannableTable { @Override public <RowT> Publisher<RowT> scan(ExecutionContext<RowT> ctx, PartitionWithConsistencyToken partWithConsistencyToken, RowFactory<RowT> rowFactory, int @Nullable [] requiredColumns) { - - Publisher<BinaryRow> pub; - TxAttributes txAttributes = ctx.txAttributes(); + TxContext txContext = transactionalContextFrom(ctx.txAttributes(), partWithConsistencyToken.enlistmentConsistencyToken()); int partId = partWithConsistencyToken.partId(); - if (txAttributes.readOnly()) { - HybridTimestamp readTime = txAttributes.time(); - - assert readTime != null; - - pub = internalTable.scan(partId, txAttributes.id(), readTime, ctx.localNode(), - txAttributes.coordinatorId()); - } else { - PrimaryReplica recipient = new PrimaryReplica(ctx.localNode(), partWithConsistencyToken.enlistmentConsistencyToken()); - - pub = internalTable.scan( - partId, - txAttributes.id(), - txAttributes.commitPartition(), - txAttributes.coordinatorId(), - recipient, - null, - null, - null, - 0, - null - ); - } + Publisher<BinaryRow> pub = internalTable.scan( + partId, + ctx.localNode(), + OperationContext.create(txContext) + ); TableRowConverter rowConverter = converterFactory.create(requiredColumns, partId); @@ -102,10 +87,10 @@ public class ScannableTableImpl implements ScannableTable { @Nullable RangeCondition<RowT> cond, int @Nullable [] requiredColumns ) { - TxAttributes txAttributes = ctx.txAttributes(); + TxContext txContext = transactionalContextFrom(ctx.txAttributes(), partWithConsistencyToken.enlistmentConsistencyToken()); + RowHandler<RowT> handler = rowFactory.handler(); - Publisher<BinaryRow> pub; BinaryTuplePrefix lower; BinaryTuplePrefix upper; @@ -119,43 +104,19 @@ public class ScannableTableImpl implements ScannableTable { lower = toBinaryTuplePrefix(columns.size(), handler, cond.lower()); upper = toBinaryTuplePrefix(columns.size(), handler, cond.upper()); - flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : 0; - flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : 0; + flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : GREATER; + flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : LESS; } int partId = partWithConsistencyToken.partId(); - if (txAttributes.readOnly()) { - HybridTimestamp readTime = txAttributes.time(); - - assert readTime != null; - - pub = internalTable.scan( - partId, - txAttributes.id(), - readTime, - ctx.localNode(), - indexId, - lower, - upper, - flags, - null, - txAttributes.coordinatorId() - ); - } else { - pub = internalTable.scan( - partId, - txAttributes.id(), - txAttributes.commitPartition(), - txAttributes.coordinatorId(), - new PrimaryReplica(ctx.localNode(), partWithConsistencyToken.enlistmentConsistencyToken()), - indexId, - lower, - upper, - flags, - null - ); - } + Publisher<BinaryRow> pub = internalTable.scan( + partId, + ctx.localNode(), + indexId, + IndexScanCriteria.range(lower, upper, flags), + OperationContext.create(txContext) + ); TableRowConverter rowConverter = converterFactory.create(requiredColumns, partId); @@ -173,9 +134,9 @@ public class ScannableTableImpl implements ScannableTable { RowT key, int @Nullable [] requiredColumns ) { - TxAttributes txAttributes = ctx.txAttributes(); + TxContext txContext = transactionalContextFrom(ctx.txAttributes(), partWithConsistencyToken.enlistmentConsistencyToken()); + RowHandler<RowT> handler = rowFactory.handler(); - Publisher<BinaryRow> pub; BinaryTuple keyTuple = handler.toBinaryTuple(key); @@ -184,33 +145,13 @@ public class ScannableTableImpl implements ScannableTable { int partId = partWithConsistencyToken.partId(); - if (txAttributes.readOnly()) { - HybridTimestamp readTime = txAttributes.time(); - - assert readTime != null; - - pub = internalTable.lookup( - partId, - txAttributes.id(), - readTime, - ctx.localNode(), - indexId, - keyTuple, - null, - txAttributes.coordinatorId() - ); - } else { - pub = internalTable.lookup( - partId, - txAttributes.id(), - txAttributes.commitPartition(), - txAttributes.coordinatorId(), - new PrimaryReplica(ctx.localNode(), partWithConsistencyToken.enlistmentConsistencyToken()), - indexId, - keyTuple, - null - ); - } + Publisher<BinaryRow> pub = internalTable.scan( + partId, + ctx.localNode(), + indexId, + IndexScanCriteria.lookup(keyTuple), + OperationContext.create(txContext) + ); TableRowConverter rowConverter = converterFactory.create(requiredColumns, partId); @@ -259,4 +200,20 @@ public class ScannableTableImpl implements ScannableTable { return BinaryTuplePrefix.fromBinaryTuple(searchBoundSize, handler.toBinaryTuple(prefix)); } + + private static TxContext transactionalContextFrom(TxAttributes txAttributes, long enlistmentConsistencyToken) { + if (txAttributes.readOnly()) { + HybridTimestamp timestamp = txAttributes.time(); + + assert timestamp != null; + + return TxContext.readOnly(txAttributes.id(), txAttributes.coordinatorId(), timestamp); + } + + ReplicationGroupId commitPartition = txAttributes.commitPartition(); + + assert commitPartition != null; + + return TxContext.readWrite(txAttributes.id(), txAttributes.coordinatorId(), commitPartition, enlistmentConsistencyToken); + } } diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java index f43ab415b21..66ca760ffdb 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java @@ -20,7 +20,12 @@ package org.apache.ignite.internal.sql.engine.exec.rel; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL; import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL; +import static org.apache.ignite.internal.testframework.matchers.DelegatingMatcher.has; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -28,8 +33,6 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -40,7 +43,6 @@ import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; @@ -54,9 +56,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory.Builder; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.ImmutableIntList; -import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.network.InternalClusterNode; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.BinaryTuple; @@ -75,10 +75,13 @@ import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; +import org.apache.ignite.internal.table.IndexScanCriteria; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.type.NativeTypes; -import org.apache.ignite.internal.utils.PrimaryReplica; +import org.hamcrest.Matchers; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.extension.ExtendWith; @@ -123,27 +126,14 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { ResultCollector collector = tester.tableScan(partitionId, consistencyToken, tx); - if (tx.isReadOnly()) { - HybridTimestamp timestamp = tx.readTimestamp(); - InternalClusterNode clusterNode = tx.clusterNode(); - - verify(internalTable).scan(partitionId, tx.id(), timestamp, clusterNode, tx.coordinatorId()); - } else { - InternalClusterNode clusterNode = tx.clusterNode(); - - verify(internalTable).scan( - partitionId, - tx.id(), - tx.commitPartition(), - tx.coordinatorId(), - new PrimaryReplica(clusterNode, consistencyToken), - null, - null, - null, - 0, - null - ); - } + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); + InternalClusterNode clusterNode = tx.clusterNode(); + + verify(internalTable).scan( + partitionId, + clusterNode, + OperationContext.create(txContext) + ); data.sendRows(); data.done(); @@ -197,48 +187,32 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { condition.setLower(lower, lowerValue); condition.setUpper(upper, upperValue); - int flags = condition.toFlags(); - ResultCollector collector = tester.indexScan(partitionId, consistencyToken, tx, indexId, condition); - if (tx.isReadOnly()) { - HybridTimestamp timestamp = tx.readTimestamp(); - InternalClusterNode clusterNode = tx.clusterNode(); + InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : ctx.localNode(); + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); - verify(internalTable).scan( - eq(partitionId), - eq(tx.id()), - eq(timestamp), - eq(clusterNode), - eq(indexId), - condition.lowerValue != null ? any(BinaryTuplePrefix.class) : isNull(), - condition.upperValue != null ? any(BinaryTuplePrefix.class) : isNull(), - eq(flags), - isNull(), - eq(tx.coordinatorId()) - ); - } else { - PrimaryReplica primaryReplica = new PrimaryReplica(ctx.localNode(), consistencyToken); - - verify(internalTable).scan( - eq(partitionId), - eq(tx.id()), - eq(tx.commitPartition()), - any(UUID.class), - eq(primaryReplica), - eq(indexId), - condition.lowerValue != null ? any(BinaryTuplePrefix.class) : isNull(), - condition.upperValue != null ? any(BinaryTuplePrefix.class) : isNull(), - eq(flags), - isNull() - ); - } + ArgumentCaptor<IndexScanCriteria.Range> criteriaCaptor = ArgumentCaptor.forClass(IndexScanCriteria.Range.class); + + verify(internalTable).scan( + eq(partitionId), + eq(clusterNode), + eq(indexId), + criteriaCaptor.capture(), + eq(OperationContext.create(txContext)) + ); input.sendRows(); input.done(); collector.expectRow(binaryRow); collector.expectCompleted(); + + assertThat(criteriaCaptor.getValue(), Matchers.allOf( + has(IndexScanCriteria.Range::lowerBound, (lowerValue == null) ? nullValue() : instanceOf(BinaryTuplePrefix.class)), + has(IndexScanCriteria.Range::upperBound, (upperValue == null) ? nullValue() : instanceOf(BinaryTuplePrefix.class)), + has(IndexScanCriteria.Range::flags, Matchers.is(condition.toFlags())) + )); } private static Stream<Arguments> indexScanParameters() { @@ -276,38 +250,18 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { ResultCollector collector = tester.indexScan(partitionId, consistencyToken, tx, indexId, condition); - if (tx.isReadOnly()) { - HybridTimestamp timestamp = tx.readTimestamp(); - InternalClusterNode clusterNode = tx.clusterNode(); + InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : ctx.localNode(); + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); - verify(internalTable).scan( - eq(partitionId), - eq(tx.id()), - eq(timestamp), - eq(clusterNode), - eq(indexId), - nullable(BinaryTuplePrefix.class), - nullable(BinaryTuplePrefix.class), - anyInt(), - isNull(), - eq(tx.coordinatorId()) - ); - } else { - PrimaryReplica primaryReplica = new PrimaryReplica(ctx.localNode(), consistencyToken); - - verify(internalTable).scan( - eq(partitionId), - eq(tx.id()), - eq(tx.commitPartition()), - any(UUID.class), - eq(primaryReplica), - eq(indexId), - nullable(BinaryTuplePrefix.class), - nullable(BinaryTuplePrefix.class), - anyInt(), - isNull() - ); - } + OperationContext operationContext = OperationContext.create(txContext); + + verify(internalTable).scan( + eq(partitionId), + eq(clusterNode), + eq(indexId), + any(IndexScanCriteria.Range.class), + eq(operationContext) + ); input.sendRows(); input.done(); @@ -393,50 +347,32 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { TestRangeCondition<Object[]> condition = new TestRangeCondition<>(); condition.setLower(Bound.INCLUSIVE, new Object[]{1, 2}); - ArgumentCaptor<BinaryTuplePrefix> prefix = ArgumentCaptor.forClass(BinaryTuplePrefix.class); + ArgumentCaptor<IndexScanCriteria.Range> criteriaCaptor = ArgumentCaptor.forClass(IndexScanCriteria.Range.class); ResultCollector collector = tester.indexScan(partitionId, consistencyToken, tx, indexId, condition); - if (tx.isReadOnly()) { - HybridTimestamp timestamp = tx.readTimestamp(); - InternalClusterNode clusterNode = tx.clusterNode(); + InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : ctx.localNode(); + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); - verify(internalTable).scan( - eq(partitionId), - eq(tx.id()), - eq(timestamp), - eq(clusterNode), - eq(indexId), - prefix.capture(), - nullable(BinaryTuplePrefix.class), - anyInt(), - isNull(), - eq(tx.coordinatorId()) - ); - } else { - PrimaryReplica primaryReplica = new PrimaryReplica(ctx.localNode(), consistencyToken); - - verify(internalTable).scan( - eq(partitionId), - eq(tx.id()), - eq(tx.commitPartition()), - any(UUID.class), - eq(primaryReplica), - eq(indexId), - prefix.capture(), - nullable(BinaryTuplePrefix.class), - anyInt(), - isNull() - ); - } + verify(internalTable).scan( + eq(partitionId), + eq(clusterNode), + eq(indexId), + criteriaCaptor.capture(), + eq(OperationContext.create(txContext)) + ); input.sendRows(); input.done(); collector.expectCompleted(); - BinaryTuplePrefix lowerBound = prefix.getValue(); + BinaryTuplePrefix lowerBound = criteriaCaptor.getValue().lowerBound(); + assertNotNull(lowerBound); assertEquals(2, lowerBound.elementCount()); + + assertNull(criteriaCaptor.getValue().upperBound()); + assertEquals(GREATER_OR_EQUAL, criteriaCaptor.getValue().flags()); } /** @@ -455,39 +391,30 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { int indexId = 3; Object[] key = {1}; + ArgumentCaptor<IndexScanCriteria.Lookup> criteriaCaptor = ArgumentCaptor.forClass(IndexScanCriteria.Lookup.class); + ResultCollector collector = tester.indexLookUp(partitionId, consistencyToken, tx, indexId, key); - if (tx.isReadOnly()) { - verify(internalTable).lookup( - eq(partitionId), - eq(tx.id()), - eq(tx.readTimestamp()), - eq(tx.clusterNode()), - eq(indexId), - any(BinaryTuple.class), - isNull(), - eq(tx.coordinatorId()) - ); - } else { - PrimaryReplica primaryReplica = new PrimaryReplica(ctx.localNode(), consistencyToken); - - verify(internalTable).lookup( - eq(partitionId), - eq(tx.id()), - any(), - any(UUID.class), - eq(primaryReplica), - eq(indexId), - any(BinaryTuple.class), - isNull() - ); - } + InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : ctx.localNode(); + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); + + verify(internalTable).scan( + eq(partitionId), + eq(clusterNode), + eq(indexId), + criteriaCaptor.capture(), + eq(OperationContext.create(txContext)) + ); input.sendRows(); input.done(); collector.expectRow(binaryRow); collector.expectCompleted(); + + BinaryTuple exactKey = criteriaCaptor.getValue().key(); + assertNotNull(exactKey); + assertEquals(1, exactKey.elementCount()); } /** @@ -509,31 +436,16 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { ResultCollector collector = tester.indexLookUp(partitionId, consistencyToken, tx, indexId, key); - if (tx.isReadOnly()) { - verify(internalTable).lookup( - eq(partitionId), - eq(tx.id()), - eq(tx.readTimestamp()), - eq(tx.clusterNode()), - eq(indexId), - any(BinaryTuple.class), - eq(null), - eq(tx.coordinatorId()) - ); - } else { - PrimaryReplica primaryReplica = new PrimaryReplica(ctx.localNode(), consistencyToken); - - verify(internalTable).lookup( - eq(partitionId), - eq(tx.id()), - any(), - any(UUID.class), - eq(primaryReplica), - eq(indexId), - any(BinaryTuple.class), - eq(null) - ); - } + InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : ctx.localNode(); + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); + + verify(internalTable).scan( + eq(partitionId), + eq(clusterNode), + eq(indexId), + any(IndexScanCriteria.Lookup.class), + eq(OperationContext.create(txContext)) + ); input.sendRows(); input.done(); @@ -596,30 +508,23 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx)); when(ctx.localNode()).thenReturn(tx.clusterNode()); - if (tx.isReadOnly()) { - doAnswer(invocation -> input.publisher).when(internalTable) - .scan(anyInt(), any(UUID.class), any(HybridTimestamp.class), any(InternalClusterNode.class), any(UUID.class)); - } else { - doAnswer(invocation -> input.publisher).when(internalTable).scan( - anyInt(), - any(UUID.class), - any(TablePartitionId.class), - any(UUID.class), - any(PrimaryReplica.class), - isNull(), - isNull(), - isNull(), - eq(0), - isNull() - ); - } + InternalClusterNode clusterNode = tx.isReadOnly() ? tx.clusterNode() : ctx.localNode(); + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); + + doAnswer(invocation -> input.publisher).when(internalTable).scan( + anyInt(), + eq(clusterNode), + eq(OperationContext.create(txContext)) + ); RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE; RowFactory<Object[]> rowFactory = rowHandler.factory(input.rowSchema); Publisher<Object[]> publisher = scannableTable.scan( ctx, - new PartitionWithConsistencyToken(partitionId, consistencyToken), rowFactory, null + new PartitionWithConsistencyToken(partitionId, consistencyToken), + rowFactory, + null ); return new ResultCollector(publisher, rowConverter); @@ -636,31 +541,15 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx)); when(ctx.localNode()).thenReturn(tx.clusterNode()); - if (tx.isReadOnly()) { - doAnswer(i -> input.publisher).when(internalTable).scan( - anyInt(), - any(UUID.class), - any(HybridTimestamp.class), - any(InternalClusterNode.class), - any(Integer.class), - nullable(BinaryTuplePrefix.class), - nullable(BinaryTuplePrefix.class), - anyInt(), - nullable(BitSet.class), - any(UUID.class)); - } else { - doAnswer(i -> input.publisher).when(internalTable).scan( - anyInt(), - any(UUID.class), - any(TablePartitionId.class), - any(UUID.class), - any(PrimaryReplica.class), - any(Integer.class), - nullable(BinaryTuplePrefix.class), - nullable(BinaryTuplePrefix.class), - anyInt(), - nullable(BitSet.class)); - } + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); + + doAnswer(i -> input.publisher).when(internalTable).scan( + anyInt(), + any(InternalClusterNode.class), + anyInt(), + any(IndexScanCriteria.Range.class), + eq(OperationContext.create(txContext)) + ); RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE; RowFactory<Object[]> rowFactory = rowHandler.factory(input.rowSchema); @@ -686,27 +575,14 @@ public class ScannableTableSelfTest extends BaseIgniteAbstractTest { when(ctx.txAttributes()).thenReturn(TxAttributes.fromTx(tx)); when(ctx.localNode()).thenReturn(tx.clusterNode()); - if (tx.isReadOnly()) { - doAnswer(i -> input.publisher).when(internalTable).lookup( - anyInt(), - any(UUID.class), - any(HybridTimestamp.class), - any(InternalClusterNode.class), - any(Integer.class), - nullable(BinaryTuple.class), - isNull(), - any(UUID.class)); - } else { - doAnswer(i -> input.publisher).when(internalTable).lookup( - anyInt(), - any(UUID.class), - any(TablePartitionId.class), - any(UUID.class), - any(PrimaryReplica.class), - any(Integer.class), - nullable(BinaryTuple.class), - isNull()); - } + TxContext txContext = tx.isReadOnly() ? TxContext.readOnly(tx) : TxContext.readWrite(tx, consistencyToken); + + doAnswer(i -> input.publisher).when(internalTable).scan( + anyInt(), + any(InternalClusterNode.class), + any(Integer.class), + any(IndexScanCriteria.Lookup.class), + eq(OperationContext.create(txContext))); RowHandler<Object[]> rowHandler = ArrayRowHandler.INSTANCE; RowFactory<Object[]> rowFactory = rowHandler.factory(input.rowSchema); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index 865be91c770..6bfb2cc29c1 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -30,7 +30,6 @@ import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.BitSet; import java.util.LinkedList; import java.util.List; import java.util.Spliterator; @@ -52,7 +51,6 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; -import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.hlc.TestClockService; import org.apache.ignite.internal.lowwatermark.TestLowWatermark; @@ -69,7 +67,6 @@ import org.apache.ignite.internal.placementdriver.TestPlacementDriver; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; -import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.PartitionProvider; @@ -86,6 +83,7 @@ import org.apache.ignite.internal.sql.engine.framework.TestBuilders; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.StreamerReceiverRunner; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.metrics.TableMetricSource; @@ -104,7 +102,6 @@ import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -354,15 +351,8 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]> @Override public Publisher<BinaryRow> scan( int partId, - UUID txId, - HybridTimestamp readTime, InternalClusterNode recipient, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude, - UUID txCoordinatorId + OperationContext opCtx ) { return s -> { s.onSubscribe(new Subscription() { diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java index 288a9dc373c..6a5f8518199 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; +import org.apache.ignite.internal.tx.TransactionIds; import org.apache.ignite.internal.tx.TxState; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.tx.TransactionException; @@ -41,7 +42,7 @@ import org.apache.ignite.tx.TransactionException; */ public final class NoOpTransaction implements InternalTransaction { - private final UUID id = randomUUID(); + private final UUID id; private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1) .addPhysicalTime(System.currentTimeMillis()); @@ -94,6 +95,8 @@ public final class NoOpTransaction implements InternalTransaction { this.enlistment = new PendingTxPartitionEnlistment(enlistmentNode.name(), 1L, groupId.tableId()); this.implicit = implicit; this.readOnly = readOnly; + + this.id = readOnly ? randomUUID() : TransactionIds.transactionId(hybridTimestamp, enlistmentNode.name().hashCode()); } /** Node at which this transaction was start. */ diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java index 796a5166c06..9fda77c3f9c 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java @@ -27,6 +27,8 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.InternalTxOptions; import org.jetbrains.annotations.Nullable; @@ -48,7 +50,7 @@ public class ItInternalTableReadOnlyScanTest extends ItAbstractInternalTableScan InternalClusterNode node = mock(InternalClusterNode.class); lenient().when(node.name()).thenReturn("node"); - return internalTbl.scan(part, tx.id(), internalTbl.CLOCK.now(), node, tx.coordinatorId()); + return internalTbl.scan(part, node, OperationContext.create(TxContext.readOnly(tx))); } @Override diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java index 09280a2b74d..b89455202b9 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java @@ -28,12 +28,13 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.RollbackTxOnErrorPublisher; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.InternalTxOptions; import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment; -import org.apache.ignite.internal.utils.PrimaryReplica; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; @@ -54,14 +55,13 @@ public class ItInternalTableReadWriteScanTest extends ItAbstractInternalTableSca PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(targetReplicationGroupId(zoneId, part)); - PrimaryReplica recipient = new PrimaryReplica( - clusterNodeResolver.getByConsistentId(enlistment.primaryNodeConsistentId()), - enlistment.consistencyToken() - ); + InternalClusterNode primaryNode = clusterNodeResolver.getByConsistentId(enlistment.primaryNodeConsistentId()); + + TxContext txContext = TxContext.readWrite(tx, enlistment.consistencyToken()); return new RollbackTxOnErrorPublisher<>( tx, - internalTbl.scan(part, tx.id(), tx.commitPartition(), tx.coordinatorId(), recipient, null, null, null, 0, null) + internalTbl.scan(part, primaryNode, OperationContext.create(txContext)) ); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java new file mode 100644 index 00000000000..08a658e07b0 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/IndexScanCriteria.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import org.apache.ignite.internal.schema.BinaryTuple; +import org.apache.ignite.internal.schema.BinaryTuplePrefix; +import org.jetbrains.annotations.Nullable; + +/** + * Index scan criteria for range scan and lookup index operations. + */ +public interface IndexScanCriteria { + /** Creates range scan criteria. */ + static Range range(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) { + return new Range(lowerBound, upperBound, flags); + } + + /** Creates lookup criteria. */ + static Lookup lookup(BinaryTuple key) { + return new Lookup(key); + } + + /** Shortcut to create unbounded range scan criteria. */ + static Range unbounded() { + return Range.UNBOUNDED; + } + + /** + * Range scan criteria. + */ + class Range implements IndexScanCriteria { + private static final Range UNBOUNDED = new Range(null, null, 0); + + private final @Nullable BinaryTuplePrefix lowerBound; + private final @Nullable BinaryTuplePrefix upperBound; + private final int flags; + + private Range(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) { + this.lowerBound = lowerBound; + this.upperBound = upperBound; + this.flags = flags; + } + + /** Get lower bound. */ + public @Nullable BinaryTuplePrefix lowerBound() { + return lowerBound; + } + + /** Get upper bound. */ + public @Nullable BinaryTuplePrefix upperBound() { + return upperBound; + } + + /** Get flags. */ + public int flags() { + return flags; + } + } + + /** + * Lookup criteria. + */ + class Lookup implements IndexScanCriteria { + private final BinaryTuple key; + + private Lookup(BinaryTuple key) { + this.key = key; + } + + /** Get lookup key. */ + public BinaryTuple key() { + return key; + } + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index 1f9ab422675..bef61054490 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -30,18 +30,16 @@ import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; -import org.apache.ignite.internal.schema.BinaryTuple; -import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.engine.MvTableStorage; import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.PendingComparableValuesTracker; -import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.table.QualifiedName; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Internal table facade provides low-level methods for table operations. The facade hides TX/replication protocol over table storage @@ -327,84 +325,44 @@ public interface InternalTable extends ManuallyCloseable { * @return {@link Publisher} that reactively notifies about partition rows. * @throws IllegalArgumentException If proposed partition index {@code p} is out of bounds. */ - default Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx) { - return scan(partId, tx, null, null, null, 0, null); - } + // TODO: https://issues.apache.org/jira/browse/IGNITE-26846 Drop the method. + @TestOnly + @Deprecated(forRemoval = true) + Publisher<BinaryRow> scan(int partId, @Nullable InternalTransaction tx); /** - * Scans given partition with the proposed read timestamp, providing {@link Publisher} that reactively notifies about partition rows. + * Scans given partition, providing {@link Publisher} that reactively notifies about partition rows. * * @param partId The partition. - * @param readTimestamp Read timestamp. * @param recipientNode Cluster node that will handle given get request. - * @param txCoordinatorId Transaction coordinator inconsistent id. + * @param operationContext Operation context. * @return {@link Publisher} that reactively notifies about partition rows. * @throws IllegalArgumentException If proposed partition index {@code p} is out of bounds. * @throws TransactionException If proposed {@code tx} is read-write. Transaction itself won't be automatically rolled back. */ - default Publisher<BinaryRow> scan( + Publisher<BinaryRow> scan( int partId, - UUID txId, - HybridTimestamp readTimestamp, InternalClusterNode recipientNode, - UUID txCoordinatorId - ) { - return scan(partId, txId, readTimestamp, recipientNode, null, null, null, 0, null, txCoordinatorId); - } + OperationContext operationContext + ); /** - * Lookup rows corresponding to the given key given partition index, providing {@link Publisher} + * Scans given partition index, providing {@link Publisher} * that reactively notifies about partition rows. * * @param partId The partition. - * @param readTimestamp Read timestamp. * @param recipientNode Cluster node that will handle given get request. * @param indexId Index id. - * @param lowerBound Lower search bound. - * @param upperBound Upper search bound. - * @param flags Control flags. See {@link org.apache.ignite.internal.storage.index.SortedIndexStorage} constants. - * @param columnsToInclude Row projection. - * @param txCoordinatorId Transaction coordinator inconsistent id. + * @param criteria Index scan criteria. + * @param operationContext Operation context. * @return {@link Publisher} that reactively notifies about partition rows. */ Publisher<BinaryRow> scan( int partId, - UUID txId, - HybridTimestamp readTimestamp, InternalClusterNode recipientNode, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude, - UUID txCoordinatorId - ); - - /** - * Scans given partition index, providing {@link Publisher} that reactively notifies about partition rows. - * - * @param partId The partition. - * @param txId Transaction id. - * @param commitPartition Commit partition id. - * @param txCoordinatorId Transaction coordinator id. - * @param recipient Primary replica that will handle given get request. - * @param lowerBound Lower search bound. - * @param upperBound Upper search bound. - * @param flags Control flags. See {@link org.apache.ignite.internal.storage.index.SortedIndexStorage} constants. - * @param columnsToInclude Row projection. - * @return {@link Publisher} that reactively notifies about partition rows. - */ - Publisher<BinaryRow> scan( - int partId, - UUID txId, - ReplicationGroupId commitPartition, - UUID txCoordinatorId, - PrimaryReplica recipient, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude + int indexId, + IndexScanCriteria criteria, + OperationContext operationContext ); /** @@ -413,68 +371,17 @@ public interface InternalTable extends ManuallyCloseable { * @param partId The partition. * @param tx The transaction. * @param indexId Index id. - * @param lowerBound Lower search bound. - * @param upperBound Upper search bound. - * @param flags Control flags. See {@link org.apache.ignite.internal.storage.index.SortedIndexStorage} constants. - * @param columnsToInclude Row projection. + * @param criteria Index scan criteria. * @return {@link Publisher} that reactively notifies about partition rows. */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-26846 Drop the method. + @TestOnly + @Deprecated(forRemoval = true) Publisher<BinaryRow> scan( int partId, @Nullable InternalTransaction tx, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude - ); - - /** - * Scans given partition index, providing {@link Publisher} that reactively notifies about partition rows. - * - * @param partId The partition. - * @param readTimestamp Read timestamp. - * @param recipientNode Cluster node that will handle given get request. - * @param indexId Index id. - * @param key Key to search. - * @param columnsToInclude Row projection. - * @param txCoordinatorId Transaction coordinator id. - * @return {@link Publisher} that reactively notifies about partition rows. - */ - Publisher<BinaryRow> lookup( - int partId, - UUID txId, - HybridTimestamp readTimestamp, - InternalClusterNode recipientNode, - int indexId, - BinaryTuple key, - @Nullable BitSet columnsToInclude, - UUID txCoordinatorId - ); - - /** - * Lookup rows corresponding to the given key given partition index, providing {@link Publisher} - * that reactively notifies about partition rows. - * - * @param partId The partition. - * @param txId Transaction id. - * @param commitPartition Commit partition id. - * @param txCoordinatorId Transaction coordinator id. - * @param recipient Primary replica that will handle given get request. - * @param indexId Index id. - * @param key Key to search. - * @param columnsToInclude Row projection. - * @return {@link Publisher} that reactively notifies about partition rows. - */ - Publisher<BinaryRow> lookup( - int partId, - UUID txId, - ReplicationGroupId commitPartition, - UUID txCoordinatorId, - PrimaryReplica recipient, int indexId, - BinaryTuple key, - @Nullable BitSet columnsToInclude + IndexScanCriteria.Range criteria ); /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java b/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java new file mode 100644 index 00000000000..d375a393e89 --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/OperationContext.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import java.util.Objects; + +/** + * Context for table/index partition scan/lookup operation mostly for SQL need. + */ +public class OperationContext { + /** Creates a new instance of {@link OperationContext}. */ + public static OperationContext create(TxContext txContext) { + return new OperationContext(txContext); + } + + /** Transactional context. */ + private final TxContext txContext; + + private OperationContext(TxContext txContext) { + this.txContext = Objects.requireNonNull(txContext); + } + + /** + * Returns the transactional context. + * + * @return The transactional context. + */ + public TxContext txContext() { + return txContext; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + OperationContext that = (OperationContext) o; + return Objects.equals(txContext, that.txContext); + } + + @Override + public int hashCode() { + return Objects.hash(txContext.hashCode()); + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java new file mode 100644 index 00000000000..117b058b88e --- /dev/null +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TxContext.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table; + +import java.util.Objects; +import java.util.UUID; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.ReplicationGroupId; +import org.apache.ignite.internal.tostring.IgniteToStringBuilder; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.TransactionIds; +import org.jetbrains.annotations.TestOnly; + +/** + * Transactional operation context used in scan\lookup table operations. + * + * @see InternalTable + */ +public abstract class TxContext { + /** Creates operation context from RO transaction. */ + public static TxContext readOnly(UUID txId, UUID txCoordinatorId, HybridTimestamp readTimestamp) { + return new ReadOnly(txId, txCoordinatorId, readTimestamp); + } + + /** Creates operation context from RO transaction. For test purposes only. */ + @TestOnly + public static TxContext readOnly(InternalTransaction tx) { + assert tx.isReadOnly(); + + HybridTimestamp readTimestamp = tx.readTimestamp(); + + assert readTimestamp != null; + + return new ReadOnly(tx.id(), tx.coordinatorId(), readTimestamp); + } + + /** Creates operation context from RW transaction. */ + public static TxContext readWrite( + UUID txId, + UUID txCoordinatorId, + ReplicationGroupId commitPartition, + long enlistmentConsistencyToken + ) { + return new ReadWrite(txId, txCoordinatorId, commitPartition, enlistmentConsistencyToken); + } + + /** Creates operation context from RW transaction. For test purposes only. */ + @TestOnly + public static TxContext readWrite(InternalTransaction tx, long enlistmentConsistencyToken) { + assert !tx.isReadOnly(); + + return new ReadWrite(tx.id(), tx.coordinatorId(), tx.commitPartition(), enlistmentConsistencyToken); + } + + protected final UUID txId; + protected final UUID coordinatorId; + + protected TxContext(UUID txId, UUID coordinatorId) { + Objects.requireNonNull(txId, "Transaction id is mandatory"); + Objects.requireNonNull(coordinatorId, "Transaction coordinator id is mandatory"); + + this.txId = txId; + this.coordinatorId = coordinatorId; + } + + /** Returns {@code true} for read only transaction, {@code false} otherwise. */ + public abstract boolean isReadOnly(); + + /** Returns transaction id. */ + public UUID txId() { + return txId; + } + + /** Returns transaction coordinator id. */ + public UUID coordinatorId() { + return coordinatorId; + } + + /** Read-only transaction context. */ + public static class ReadOnly extends TxContext { + private final HybridTimestamp readTimestamp; + + private ReadOnly(UUID txId, UUID txCoordinatorId, HybridTimestamp readTimestamp) { + super(txId, txCoordinatorId); + + this.readTimestamp = readTimestamp; + } + + @Override + public boolean isReadOnly() { + return true; + } + + /** Returns transaction read timestamp. */ + public HybridTimestamp readTimestamp() { + return readTimestamp; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + ReadOnly txCtx = (ReadOnly) o; + return Objects.equals(txId, txCtx.txId) + && Objects.equals(coordinatorId, txCtx.coordinatorId) + && Objects.equals(readTimestamp, txCtx.readTimestamp); + } + + @Override + public int hashCode() { + return Objects.hash(txId); + } + + @Override + public String toString() { + return IgniteToStringBuilder.toString(this); + } + } + + /** Read-write transaction context. */ + public static class ReadWrite extends TxContext { + private final ReplicationGroupId commitPartition; + private final long enlistmentConsistencyToken; + + private ReadWrite(UUID txId, UUID txCoordinatorId, ReplicationGroupId commitPartition, long enlistmentConsistencyToken) { + super(txId, txCoordinatorId); + + Objects.requireNonNull(commitPartition, "Commit partition is mandatory for RW transaction"); + + if (enlistmentConsistencyToken < 0) { + throw new IllegalArgumentException("Consistency token partition is mandatory for RW transaction"); + } + + this.commitPartition = commitPartition; + this.enlistmentConsistencyToken = enlistmentConsistencyToken; + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + ReadWrite opCtx = (ReadWrite) o; + return enlistmentConsistencyToken == opCtx.enlistmentConsistencyToken + && Objects.equals(txId, opCtx.txId) + && Objects.equals(coordinatorId, opCtx.coordinatorId) + && Objects.equals(commitPartition, opCtx.commitPartition); + } + + /** Returns transaction commit partition. */ + public ReplicationGroupId commitPartition() { + return commitPartition; + } + + /** Returns transaction begin timestamp. */ + public HybridTimestamp beginTimestamp() { + return TransactionIds.beginTimestamp(txId); + } + + /** Returns enlistment consistency token. */ + public long enlistmentConsistencyToken() { + return enlistmentConsistencyToken; + } + + @Override + public int hashCode() { + return Objects.hash(txId); + } + + @Override + public String toString() { + return IgniteToStringBuilder.toString(this); + } + } +} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 530f397a6b7..ea666067e76 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -116,8 +116,11 @@ import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTuplePrefix; import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.table.IndexScanCriteria; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.StreamerReceiverRunner; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.table.distributed.storage.PartitionScanPublisher.InflightBatchRequestTracker; import org.apache.ignite.internal.table.metrics.TableMetricSource; import org.apache.ignite.internal.tx.InternalTransaction; @@ -129,12 +132,12 @@ import org.apache.ignite.internal.tx.storage.state.TxStateStorage; import org.apache.ignite.internal.util.CollectionUtils; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; -import org.apache.ignite.internal.utils.PrimaryReplica; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; /** * Storage of table rows. @@ -1562,139 +1565,125 @@ public class InternalTableImpl implements InternalTable { } @Override - public Publisher<BinaryRow> lookup( + public Publisher<BinaryRow> scan( int partId, - UUID txId, - HybridTimestamp readTimestamp, InternalClusterNode recipientNode, - int indexId, - BinaryTuple key, - @Nullable BitSet columnsToInclude, - UUID txCoordinatorId + OperationContext operationContext ) { - return readOnlyScan(partId, txId, readTimestamp, recipientNode, indexId, key, null, null, 0, columnsToInclude, txCoordinatorId); - } + validatePartitionIndex(partId); - @Override - public Publisher<BinaryRow> lookup( - int partId, - UUID txId, - ReplicationGroupId commitPartition, - UUID coordinatorId, - PrimaryReplica recipient, - int indexId, - BinaryTuple key, - @Nullable BitSet columnsToInclude - ) { - return readWriteScan( - partId, - txId, - commitPartition, - coordinatorId, - recipient, - indexId, - key, - null, - null, - 0, - columnsToInclude - ); + if (operationContext.txContext().isReadOnly()) { + return readOnlyScan( + partId, + recipientNode, + null, + null, + null, + operationContext + ); + } else { + return readWriteScan( + partId, + recipientNode, + null, + null, + operationContext + ); + } } @Override public Publisher<BinaryRow> scan( int partId, - UUID txId, - HybridTimestamp readTimestamp, InternalClusterNode recipientNode, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude, - UUID txCoordinatorId + int indexId, + IndexScanCriteria criteria, + OperationContext operationContext ) { - return readOnlyScan( - partId, - txId, - readTimestamp, - recipientNode, - indexId, - null, - lowerBound, - upperBound, - flags, - columnsToInclude, - txCoordinatorId - ); + validatePartitionIndex(partId); + + if (operationContext.txContext().isReadOnly()) { + return readOnlyScan( + partId, + recipientNode, + null, + null, + null, + operationContext + ); + } else { + return readWriteScan( + partId, + recipientNode, + null, + null, + operationContext + ); + } } + @TestOnly @Override public Publisher<BinaryRow> scan( int partId, - @Nullable InternalTransaction tx, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude + @Nullable InternalTransaction tx ) { - return readWriteScan(partId, tx, indexId, null, lowerBound, upperBound, flags, columnsToInclude); + validatePartitionIndex(partId); + + InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx); + + return readWriteScan(partId, actualTx, null, null); } + @TestOnly @Override public Publisher<BinaryRow> scan( int partId, - UUID txId, - ReplicationGroupId commitPartition, - UUID coordinatorId, - PrimaryReplica recipient, - @Nullable Integer indexId, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude + @Nullable InternalTransaction tx, + int indexId, + IndexScanCriteria.Range criteria ) { - return readWriteScan( - partId, - txId, - commitPartition, - coordinatorId, - recipient, - indexId, - null, - lowerBound, - upperBound, - flags, - columnsToInclude - ); + + validatePartitionIndex(partId); + + InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx); + + assert !actualTx.isReadOnly(); + + return readWriteScan(partId, actualTx, indexId, criteria); } private Publisher<BinaryRow> readOnlyScan( int partId, - UUID txId, - HybridTimestamp readTimestamp, InternalClusterNode recipientNode, @Nullable Integer indexId, - @Nullable BinaryTuple exactKey, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, + @Nullable IndexScanCriteria criteria, @Nullable BitSet columnsToInclude, - UUID txCoordinatorId + OperationContext opCtx ) { - validatePartitionIndex(partId); + assert opCtx.txContext().isReadOnly(); + + boolean rangeScan = criteria instanceof IndexScanCriteria.Range; + boolean lookup = criteria instanceof IndexScanCriteria.Lookup; + + BinaryTuple exactKey = lookup ? ((IndexScanCriteria.Lookup) criteria).key() : null; + BinaryTuplePrefix lowerBound = rangeScan ? ((IndexScanCriteria.Range) criteria).lowerBound() : null; + BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range) criteria).upperBound() : null; + int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() : 0; + + TxContext.ReadOnly txContext = (TxContext.ReadOnly) opCtx.txContext(); ReplicationGroupId replicationGroupId = targetReplicationGroupId(partId); - return new PartitionScanPublisher<>(new ReadOnlyInflightBatchRequestTracker(transactionInflights, txId)) { + return new PartitionScanPublisher<>(new ReadOnlyInflightBatchRequestTracker(transactionInflights, txContext.txId())) { @Override protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long scanId, int batchSize) { ReadOnlyScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() .groupId(serializeReplicationGroupId(replicationGroupId)) .tableId(tableId) - .readTimestamp(readTimestamp) - .transactionId(txId) + .transactionId(txContext.txId()) + .coordinatorId(txContext.coordinatorId()) + .readTimestamp(txContext.readTimestamp()) .scanId(scanId) .batchSize(batchSize) .indexToUse(indexId) @@ -1703,7 +1692,6 @@ public class InternalTableImpl implements InternalTable { .upperBoundPrefix(binaryTupleMessage(upperBound)) .flags(flags) .columnsToInclude(columnsToInclude) - .coordinatorId(txCoordinatorId) .build(); return replicaSvc.invoke(recipientNode, request); @@ -1712,7 +1700,7 @@ public class InternalTableImpl implements InternalTable { @Override protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) { return completeScan( - txId, + txContext.txId(), replicationGroupId, scanId, th, @@ -1725,13 +1713,9 @@ public class InternalTableImpl implements InternalTable { private Publisher<BinaryRow> readWriteScan( int partId, - @Nullable InternalTransaction tx, + InternalTransaction tx, @Nullable Integer indexId, - @Nullable BinaryTuple exactKey, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude + @Nullable IndexScanCriteria.Range criteria ) { // Check whether proposed tx is read-only. Complete future exceptionally if true. // Attempting to enlist a read-only in a read-write transaction does not corrupt the transaction itself, thus read-write transaction @@ -1745,22 +1729,26 @@ public class InternalTableImpl implements InternalTable { validatePartitionIndex(partId); - InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx); + assert indexId == null || criteria != null; + + BinaryTuplePrefix lowerBound = indexId == null ? null : criteria.lowerBound(); + BinaryTuplePrefix upperBound = indexId == null ? null : criteria.upperBound(); + int flags = indexId == null ? 0 : criteria.flags(); return new PartitionScanPublisher<>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) { @Override protected CompletableFuture<Collection<BinaryRow>> retrieveBatch(long scanId, int batchSize) { return enlistCursorInTx( - actualTx, + tx, partId, scanId, batchSize, indexId, - exactKey, + null, lowerBound, upperBound, flags, - columnsToInclude + null ); } @@ -1768,7 +1756,7 @@ public class InternalTableImpl implements InternalTable { protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) { CompletableFuture<Void> opFut; - if (actualTx.implicit()) { + if (tx.implicit()) { opFut = completedOrFailedFuture(null, th); } else { var replicationGrpId = targetReplicationGroupId(partId); @@ -1787,26 +1775,33 @@ public class InternalTableImpl implements InternalTable { return postEnlist( opFut, intentionallyClose, - actualTx, - actualTx.implicit() && !intentionallyClose + tx, + tx.implicit() && !intentionallyClose ); } }; } + @TestOnly private Publisher<BinaryRow> readWriteScan( int partId, - UUID txId, - ReplicationGroupId commitPartition, - UUID coordinatorId, - PrimaryReplica recipient, + InternalClusterNode recipient, @Nullable Integer indexId, - @Nullable BinaryTuple exactKey, - @Nullable BinaryTuplePrefix lowerBound, - @Nullable BinaryTuplePrefix upperBound, - int flags, - @Nullable BitSet columnsToInclude + @Nullable IndexScanCriteria criteria, + OperationContext opCtx ) { + assert !opCtx.txContext().isReadOnly(); + + boolean rangeScan = criteria instanceof IndexScanCriteria.Range; + boolean lookup = criteria instanceof IndexScanCriteria.Lookup; + + BinaryTuple exactKey = lookup ? ((IndexScanCriteria.Lookup) criteria).key() : null; + BinaryTuplePrefix lowerBound = rangeScan ? ((IndexScanCriteria.Range) criteria).lowerBound() : null; + BinaryTuplePrefix upperBound = rangeScan ? ((IndexScanCriteria.Range) criteria).upperBound() : null; + int flags = rangeScan ? ((IndexScanCriteria.Range) criteria).flags() : 0; + + TxContext.ReadWrite txContext = (TxContext.ReadWrite) opCtx.txContext(); + ReplicationGroupId replicationGroupId = targetReplicationGroupId(partId); return new PartitionScanPublisher<>(READ_WRITE_INFLIGHT_BATCH_REQUEST_TRACKER) { @@ -1815,28 +1810,27 @@ public class InternalTableImpl implements InternalTable { ReadWriteScanRetrieveBatchReplicaRequest request = TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() .groupId(serializeReplicationGroupId(replicationGroupId)) .tableId(tableId) - .timestamp(TransactionIds.beginTimestamp(txId)) - .transactionId(txId) + .transactionId(txContext.txId()) + .coordinatorId(txContext.coordinatorId()) + .timestamp(txContext.beginTimestamp()) + .commitPartitionId(serializeReplicationGroupId(txContext.commitPartition())) + .enlistmentConsistencyToken(txContext.enlistmentConsistencyToken()) .scanId(scanId) .indexToUse(indexId) .exactKey(binaryTupleMessage(exactKey)) .lowerBoundPrefix(binaryTupleMessage(lowerBound)) .upperBoundPrefix(binaryTupleMessage(upperBound)) .flags(flags) - .columnsToInclude(columnsToInclude) .batchSize(batchSize) - .enlistmentConsistencyToken(recipient.enlistmentConsistencyToken()) .full(false) // Set explicitly. - .commitPartitionId(serializeReplicationGroupId(commitPartition)) - .coordinatorId(coordinatorId) .build(); - return replicaSvc.invoke(recipient.node(), request); + return replicaSvc.invoke(recipient, request); } @Override protected CompletableFuture<Void> onClose(boolean intentionallyClose, long scanId, @Nullable Throwable th) { - return completeScan(txId, replicationGroupId, scanId, th, recipient.node().name(), intentionallyClose); + return completeScan(txContext.txId(), replicationGroupId, scanId, th, recipient.name(), intentionallyClose); } }; } diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java index 180be47cce7..f065f652747 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java @@ -85,6 +85,7 @@ import org.apache.ignite.internal.schema.BinaryRowEx; import org.apache.ignite.internal.schema.NullBinaryRow; import org.apache.ignite.internal.sql.SqlCommon; import org.apache.ignite.internal.storage.engine.MvTableStorage; +import org.apache.ignite.internal.table.IndexScanCriteria; import org.apache.ignite.internal.table.InternalTable; import org.apache.ignite.internal.table.StreamerReceiverRunner; import org.apache.ignite.internal.table.metrics.TableMetricSource; @@ -427,7 +428,7 @@ public class InternalTableImplTest extends BaseIgniteAbstractTest { DELETE_ALL((table, tx) -> table.deleteAll(List.of(createBinaryRow()), tx)), DELETE_ALL_EXACT((table, tx) -> table.deleteAllExact(List.of(createBinaryRow()), tx)), SCAN_MV_STORAGE(adaptScan((table, tx) -> table.scan(0, tx))), - SCAN_INDEX(adaptScan((table, tx) -> table.scan(0, tx, 1, null, null, 0, null))); + SCAN_INDEX(adaptScan((table, tx) -> table.scan(0, tx, 1, IndexScanCriteria.unbounded()))); private final BiFunction<InternalTable, InternalTransaction, CompletableFuture<?>> action; diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java index 55c6281a3be..08b345b4939 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.replicator.ReplicaManager; import org.apache.ignite.internal.replicator.ReplicaTestUtils; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl; import org.apache.ignite.internal.schema.row.Row; @@ -118,6 +119,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.function.Executable; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -1465,15 +1467,13 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { ? internalTable.scan( 0, - internalTx.id(), - internalTx.readTimestamp(), ReplicaTestUtils.leaderAssignment( txTestCluster.replicaManagers().get(txTestCluster.localNodeName()), txTestCluster.clusterServices().get(txTestCluster.localNodeName()).topologyService(), colocationEnabled() ? internalTable.zoneId() : internalTable.tableId(), 0 ), - internalTx.coordinatorId() + OperationContext.create(TxContext.readOnly(internalTx)) ) : internalTable.scan(0, internalTx); @@ -2106,10 +2106,8 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { assertThrowsTxFinishedException(() -> { Flow.Publisher<BinaryRow> pub = accounts.internalTable().scan( 0, - internalTx.id(), - internalTx.readTimestamp(), new ClusterNodeImpl(UUID.randomUUID(), "node", new NetworkAddress("localhost", 123)), - internalTx.coordinatorId() + OperationContext.create(TxContext.readOnly(internalTx)) ); AtomicReference<Throwable> errorRef = new AtomicReference<>(); @@ -2122,16 +2120,13 @@ public abstract class TxAbstractTest extends TxInfrastructureTest { }); assertThrowsTxFinishedException(() -> { - Flow.Publisher<BinaryRow> pub = accounts.internalTable().lookup( + Flow.Publisher<BinaryRow> pub = accounts.internalTable().scan( 0, - internalTx.id(), - internalTx.readTimestamp(), new ClusterNodeImpl(UUID.randomUUID(), "node", new NetworkAddress("localhost", 123)), 0, - // Binary tuple is null for testing purposes, assuming that it wouldn't be processed anyway. - null, - null, - internalTx.coordinatorId() + // We assume that BinaryTuple will never be accessed. + IndexScanCriteria.lookup(Mockito.mock(BinaryTuple.class)), + OperationContext.create(TxContext.readOnly(internalTx)) ); AtomicReference<Throwable> errorRef = new AtomicReference<>(); diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java index 7bfeb9e7848..56bdf31dd4e 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java @@ -79,8 +79,10 @@ import org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaR import org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.OperationContext; import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.table.TxContext; import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.internal.testframework.SystemPropertiesExtension; import org.apache.ignite.internal.testframework.WithSystemProperty; @@ -1085,7 +1087,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { node(0).cluster().nodes().stream().filter(node -> node.name().equals(primary)).findAny().get() ); - publisher = tbl.internalTable().scan(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, tx.coordinatorId()); + OperationContext operationContext = OperationContext.create(TxContext.readOnly(tx)); + + publisher = tbl.internalTable().scan(PART_ID, primaryNode, operationContext); } else { publisher = tbl.internalTable().scan(PART_ID, tx); }
