This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 018148ffab IGNITE-18518 Proper filtering of index scans, according to 
MV storage data. (#2037)
018148ffab is described below

commit 018148ffab6442fa8a7e93287a047c3e3d88acf7
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon May 22 11:22:03 2023 +0300

    IGNITE-18518 Proper filtering of index scans, according to MV storage data. 
(#2037)
    
    Signed-off-by: ibessonov <[email protected]>
    Co-authored-by: Semyon Danilov <[email protected]>
---
 .../ignite/internal/table/ItTableScanTest.java     | 114 ++++++++++++++-------
 .../ignite/internal/schema/BinaryRowConverter.java |   2 +-
 .../distributed/TableSchemaAwareIndexStorage.java  |   2 +-
 .../replicator/PartitionReplicaListener.java       |  67 ++++++++----
 .../replication/PartitionReplicaListenerTest.java  |   7 +-
 5 files changed, 133 insertions(+), 59 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 9ee8498b47..a81152dfa1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -71,6 +72,7 @@ import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.TransactionException;
+import org.apache.ignite.tx.TransactionOptions;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -78,6 +80,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Tests to check a scan internal command.
@@ -92,6 +95,9 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
     /** Ids to insert. */
     private static final List<Integer> ROW_IDS = List.of(1, 2, 5, 6, 7, 10, 
53);
 
+    /** The only partition in the table. */
+    private static final int PART_ID = 0;
+
     private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
             1,
             new Column[]{new Column("key", NativeTypes.INT32, false)},
@@ -124,21 +130,18 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
     @Test
     public void testInsertWaitScanComplete() throws Exception {
-        int partId = 0;
         IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
 
         InternalTransaction tx0 = (InternalTransaction) transactions.begin();
-        InternalTransaction tx1 = startTxWithEnlistedPartition(partId);
+        InternalTransaction tx1 = startTxWithEnlistedPartition(PART_ID, false);
 
         UUID sortedIndexId = getSortedIndexId();
 
         List<BinaryRow> scannedRows = new ArrayList<>();
 
-        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx1.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
-
-        PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
+        PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx1);
 
-        Publisher<BinaryRow> publisher = internalTable.scan(partId, tx1.id(), 
recipient, sortedIndexId, null, null, 0, null);
+        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, tx1.id(), 
recipient, sortedIndexId, null, null, 0, null);
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -159,7 +162,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         IgniteTestUtils.await(scanned);
 
-        log.info("Result: " + scannedRows.stream().map(binaryRow -> 
rowToString(binaryRow)).collect(Collectors.joining(", ")));
+        log.info("Result: " + 
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
 ")));
 
         assertEquals(ROW_IDS.size(), scannedRows.size());
 
@@ -195,7 +198,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         IgniteTestUtils.await(scanned);
 
-        log.info("Result: " + scannedRows.stream().map(binaryRow -> 
rowToString(binaryRow)).collect(Collectors.joining(", ")));
+        log.info("Result: " + 
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
 ")));
 
         assertEquals(ROW_IDS.size() + 1, scannedRows.size());
     }
@@ -379,7 +382,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         IgniteTestUtils.await(scanned);
 
-        log.info("Result: " + scannedRows.stream().map(binaryRow -> 
rowToString(binaryRow)).collect(Collectors.joining(", ")));
+        log.info("Result: " + 
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
 ")));
 
         assertThat(txOpFut, willCompleteSuccessfully());
 
@@ -400,15 +403,11 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         List<BinaryRow> scannedRows = new ArrayList<>();
 
-        int partId = 0;
+        InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
 
-        InternalTransaction tx = startTxWithEnlistedPartition(partId);
+        PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
 
-        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
-
-        PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
-
-        Publisher<BinaryRow> publisher = internalTable.scan(partId, tx.id(), 
recipient, sortedIndexId, null, null, 0, null);
+        Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, tx.id(), 
recipient, sortedIndexId, null, null, 0, null);
 
         CompletableFuture<Void> scanned = new CompletableFuture<>();
 
@@ -433,7 +432,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         assertEquals(ROW_IDS.size() + 1, scannedRows.size());
 
-        Publisher<BinaryRow> publisher1 = internalTable.scan(0, tx.id(), 
recipient, sortedIndexId, null, null, 0, null);
+        Publisher<BinaryRow> publisher1 = internalTable.scan(PART_ID, tx.id(), 
recipient, sortedIndexId, null, null, 0, null);
 
         assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
 
@@ -457,14 +456,11 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         UUID soredIndexId = getSortedIndexId();
 
-        int partId = 0;
-
-        InternalTransaction tx = startTxWithEnlistedPartition(partId);
-        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
-        PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
+        InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
+        PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
 
         Publisher<BinaryRow> publisher = internalTable.scan(
-                partId,
+                PART_ID,
                 tx.id(),
                 recipient,
                 soredIndexId,
@@ -476,7 +472,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         List<BinaryRow> scannedRows = scanAllRows(publisher);
 
-        log.info("Result of scanning in old transaction: " + 
scannedRows.stream().map(binaryRow -> rowToString(binaryRow))
+        log.info("Result of scanning in old transaction: " + 
scannedRows.stream().map(ItTableScanTest::rowToString)
                 .collect(Collectors.joining(", ")));
 
         assertEquals(3, scannedRows.size());
@@ -487,7 +483,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
                 kvView.put(null, Tuple.create().set("key", 9), 
Tuple.create().set("valInt", 9).set("valStr", "New_9")));
 
         Publisher<BinaryRow> publisher1 = internalTable.scan(
-                partId,
+                PART_ID,
                 tx.id(),
                 recipient,
                 soredIndexId,
@@ -508,7 +504,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
         kvView.put(null, Tuple.create().set("key", 9), 
Tuple.create().set("valInt", 9).set("valStr", "New_9"));
 
         Publisher<BinaryRow> publisher2 = internalTable.scan(
-                0,
+                PART_ID,
                 null,
                 soredIndexId,
                 lowBound,
@@ -521,7 +517,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         assertEquals(5, scannedRows2.size());
 
-        log.info("Result of scanning after insert rows: " + 
scannedRows2.stream().map(binaryRow -> rowToString(binaryRow))
+        log.info("Result of scanning after insert rows: " + 
scannedRows2.stream().map(ItTableScanTest::rowToString)
                 .collect(Collectors.joining(", ")));
     }
 
@@ -535,16 +531,12 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
             UUID sortedIndexId = getSortedIndexId();
 
-            int partId = 0;
-
-            InternalTransaction tx = startTxWithEnlistedPartition(partId);
+            InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, 
false);
 
             try {
-                IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+                PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
 
-                PrimaryReplica recipient = new 
PrimaryReplica(leaderWithTerm.get1(), leaderWithTerm.get2());
-
-                Publisher<BinaryRow> publisher = internalTable.scan(partId, 
tx.id(), recipient, sortedIndexId, null, null, 0, null);
+                Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, 
tx.id(), recipient, sortedIndexId, null, null, 0, null);
 
                 // Non-thread-safe collection is fine, HB is guaranteed by 
"Thread#join" inside of "runRace".
                 List<BinaryRow> scannedRows = new ArrayList<>();
@@ -567,7 +559,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
                         () -> scannedRows.addAll(scanAllRows(publisher))
                 );
 
-                Publisher<BinaryRow> publisher1 = internalTable.scan(0, 
tx.id(), recipient, sortedIndexId, null, null, 0, null);
+                Publisher<BinaryRow> publisher1 = internalTable.scan(PART_ID, 
tx.id(), recipient, sortedIndexId, null, null, 0, null);
 
                 assertEquals(scanAllRows(publisher1).size(), 
scannedRows.size());
             } finally {
@@ -610,6 +602,53 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
         subscription.cancel();
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testMvScan(boolean readOnly) throws Exception {
+        KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
+
+        kvView.remove(null, Tuple.create().set("key", ROW_IDS.get(0)));
+        kvView.remove(null, Tuple.create().set("key", ROW_IDS.get(1)));
+        kvView.put(null, Tuple.create().set("key", ROW_IDS.get(2)), 
Tuple.create().set("valInt", 999).set("valStr", "Str_999"));
+
+        UUID sortedIndexId = getSortedIndexId();
+
+        InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, 
readOnly);
+
+        try {
+            Publisher<BinaryRow> publisher;
+
+            if (readOnly) {
+                List<String> assignments = internalTable.assignments();
+
+                // Any node from assignments will do it.
+                ClusterNode node0 = 
CLUSTER_NODES.get(0).clusterNodes().stream().filter(clusterNode -> {
+                    return assignments.contains(clusterNode.name());
+                }).findFirst().orElseThrow();
+
+                //noinspection DataFlowIssue
+                publisher = internalTable.scan(PART_ID, tx.readTimestamp(), 
node0, sortedIndexId, null, null, 0, null);
+            } else {
+                PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
+
+                publisher = internalTable.scan(PART_ID, tx.id(), recipient, 
sortedIndexId, null, null, 0, null);
+            }
+
+            List<BinaryRow> scannedRows = scanAllRows(publisher);
+
+            // Two rows are removed, one changed.
+            assertThat(scannedRows, hasSize(ROW_IDS.size() - 2));
+        } finally {
+            tx.commit();
+        }
+    }
+
+    private PrimaryReplica getLeaderRecipient(int partId, InternalTransaction 
tx) {
+        IgniteBiTuple<ClusterNode, Long> leaderWithTerm = 
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+
+        return new PrimaryReplica(leaderWithTerm.get1(), 
leaderWithTerm.get2());
+    }
+
     /**
      * Represents a binary row as a string.
      *
@@ -803,12 +842,13 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
      * Starts an RW transaction and enlists the specified partition in it.
      *
      * @param partId Partition ID.
+     * @param readOnly Read-only flag for transaction.
      * @return Transaction.
      */
-    private InternalTransaction startTxWithEnlistedPartition(int partId) {
+    private InternalTransaction startTxWithEnlistedPartition(int partId, 
boolean readOnly) {
         Ignite ignite = CLUSTER_NODES.get(0);
 
-        InternalTransaction tx = (InternalTransaction) 
ignite.transactions().begin();
+        InternalTransaction tx = (InternalTransaction) 
ignite.transactions().begin(new TransactionOptions().readOnly(readOnly));
 
         InternalTable table = ((TableImpl) 
ignite.tables().table(TABLE_NAME)).internalTable();
         TablePartitionId tblPartId = new TablePartitionId(table.tableId(), 
partId);
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
index 48a5a6e96d..e369fa4b51 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
@@ -182,7 +182,7 @@ public class BinaryRowConverter {
     }
 
     /** Helper method to convert from a full row or key-only row to the tuple 
with specified columns. */
-    public static Function<BinaryRow, BinaryTuple> 
columnsExtractor(SchemaDescriptor schema, int[] columns) {
+    public static Function<BinaryRow, BinaryTuple> 
columnsExtractor(SchemaDescriptor schema, int... columns) {
         BinaryTupleSchema rowSchema = 
BinaryTupleSchema.createRowSchema(schema);
         BinaryTupleSchema keySchema = 
BinaryTupleSchema.createKeySchema(schema);
         BinaryTupleSchema trimmedSchema = 
BinaryTupleSchema.createSchema(schema, columns);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index fd1cd8ae1f..b56f7c222d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -89,7 +89,7 @@ public class TableSchemaAwareIndexStorage {
      * @param row Full row.
      * @return A tuple that represents indexed columns of a row.
      */
-    BinaryTuple resolveIndexRow(BinaryRow row) {
+    public BinaryTuple resolveIndexRow(BinaryRow row) {
         return indexRowResolver.apply(row);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 020a9921d8..725f19008a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -49,6 +49,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
 import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
 import org.apache.ignite.internal.storage.index.IndexStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -117,6 +119,7 @@ import 
org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.CursorUtils;
 import org.apache.ignite.internal.util.Lazy;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
@@ -402,12 +405,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             if (request.exactKey() != null) {
                 assert request.lowerBound() == null && request.upperBound() == 
null : "Index lookup doesn't allow bounds.";
 
-                return safeReadFuture.thenCompose(unused -> 
lookupIndex(request, indexStorage.storage()));
+                return safeReadFuture.thenCompose(unused -> 
lookupIndex(request, indexStorage));
             }
 
             assert indexStorage.storage() instanceof SortedIndexStorage;
 
-            return safeReadFuture.thenCompose(unused -> 
scanSortedIndex(request, (SortedIndexStorage) indexStorage.storage()));
+            return safeReadFuture.thenCompose(unused -> 
scanSortedIndex(request, indexStorage));
         }
 
         return safeReadFuture.thenCompose(unused -> 
retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, batchCount));
@@ -641,7 +644,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
             assert indexStorage.storage() instanceof SortedIndexStorage;
 
-            return scanSortedIndex(request, (SortedIndexStorage) 
indexStorage.storage());
+            return scanSortedIndex(request, indexStorage);
         }
 
         UUID txId = request.transactionId();
@@ -683,13 +686,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Lookup sorted index in RO tx.
      *
      * @param request Index scan request.
-     * @param indexStorage Index storage.
+     * @param schemaAwareIndexStorage Index storage.
      * @return Operation future.
      */
     private CompletableFuture<List<BinaryRow>> lookupIndex(
             ReadOnlyScanRetrieveBatchReplicaRequest request,
-            IndexStorage indexStorage
+            TableSchemaAwareIndexStorage schemaAwareIndexStorage
     ) {
+        IndexStorage indexStorage = schemaAwareIndexStorage.storage();
+
         int batchCount = request.batchSize();
         HybridTimestamp timestamp = request.readTimestamp();
 
@@ -702,7 +707,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         var result = new ArrayList<BinaryRow>(batchCount);
 
-        return continueReadOnlyIndexLookup(cursor, timestamp, batchCount, 
result)
+        Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new 
IndexRowImpl(key, rowId));
+
+        return continueReadOnlyIndexScan(schemaAwareIndexStorage, 
indexRowCursor, timestamp, batchCount, result)
                 .thenCompose(ignore -> completedFuture(result));
     }
 
@@ -738,13 +745,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Scans sorted index in RW tx.
      *
      * @param request Index scan request.
-     * @param indexStorage Index storage.
+     * @param schemaAwareIndexStorage Sorted index storage.
      * @return Operation future.
      */
     private CompletableFuture<List<BinaryRow>> scanSortedIndex(
             ReadWriteScanRetrieveBatchReplicaRequest request,
-            SortedIndexStorage indexStorage
+            TableSchemaAwareIndexStorage schemaAwareIndexStorage
     ) {
+        var indexStorage = (SortedIndexStorage) 
schemaAwareIndexStorage.storage();
+
         UUID txId = request.transactionId();
         int batchCount = request.batchSize();
 
@@ -761,7 +770,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return lockManager.acquire(txId, new LockKey(tableId), 
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
                 var comparator = new 
BinaryTupleComparator(indexStorage.indexDescriptor());
 
-                Function<IndexRow, Boolean> isUpperBoundAchieved = indexRow -> 
{
+                Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
                     if (indexRow == null) {
                         return true;
                     }
@@ -794,7 +803,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
                 var result = new ArrayList<BinaryRow>(batchCount);
 
-                return continueIndexScan(txId, indexLocker, cursor, 
batchCount, result, isUpperBoundAchieved)
+                return continueIndexScan(txId, schemaAwareIndexStorage, 
indexLocker, cursor, batchCount, result, isUpperBoundAchieved)
                         .thenApply(ignore -> result);
             });
         });
@@ -804,13 +813,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Scans sorted index in RO tx.
      *
      * @param request Index scan request.
-     * @param indexStorage Index storage.
+     * @param schemaAwareIndexStorage Sorted index storage.
      * @return Operation future.
      */
     private CompletableFuture<List<BinaryRow>> scanSortedIndex(
             ReadOnlyScanRetrieveBatchReplicaRequest request,
-            SortedIndexStorage indexStorage
+            TableSchemaAwareIndexStorage schemaAwareIndexStorage
     ) {
+        var indexStorage = (SortedIndexStorage) 
schemaAwareIndexStorage.storage();
+
         UUID txId = request.transactionId();
         int batchCount = request.batchSize();
         HybridTimestamp timestamp = request.readTimestamp();
@@ -831,11 +842,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         var result = new ArrayList<BinaryRow>(batchCount);
 
-        return continueReadOnlyIndexScan(cursor, timestamp, batchCount, result)
+        return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, 
timestamp, batchCount, result)
                 .thenApply(ignore -> result);
     }
 
     private CompletableFuture<Void> continueReadOnlyIndexScan(
+            TableSchemaAwareIndexStorage schemaAwareIndexStorage,
             Cursor<IndexRow> cursor,
             HybridTimestamp timestamp,
             int batchSize,
@@ -865,11 +877,11 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             return committedReadResult.binaryRow();
         })
         .thenComposeAsync(resolvedReadResult -> {
-            if (resolvedReadResult != null) {
+            if (resolvedReadResult != null && indexRowMatches(indexRow, 
resolvedReadResult, schemaAwareIndexStorage)) {
                 result.add(resolvedReadResult);
             }
 
-            return continueReadOnlyIndexScan(cursor, timestamp, batchSize, 
result);
+            return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, 
timestamp, batchSize, result);
         }, scanRequestExecutor);
     }
 
@@ -877,6 +889,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Index scan loop. Retrieves next row from index, takes locks, fetches 
associated data row and collects to the result.
      *
      * @param txId Transaction id.
+     * @param schemaAwareIndexStorage Index storage.
      * @param indexLocker Index locker.
      * @param indexCursor Index cursor.
      * @param batchSize Batch size.
@@ -886,11 +899,12 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      */
     private CompletableFuture<Void> continueIndexScan(
             UUID txId,
+            TableSchemaAwareIndexStorage schemaAwareIndexStorage,
             SortedIndexLocker indexLocker,
             Cursor<IndexRow> indexCursor,
             int batchSize,
             List<BinaryRow> result,
-            Function<IndexRow, Boolean> isUpperBoundAchieved
+            Predicate<IndexRow> isUpperBoundAchieved
     ) {
         if (result.size() == batchSize) { // Batch is full, exit loop.
             return completedFuture(null);
@@ -898,7 +912,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
         return indexLocker.locksForScan(txId, indexCursor)
                 .thenCompose(currentRow -> { // Index row S lock
-                    if (isUpperBoundAchieved.apply(currentRow)) {
+                    if (isUpperBoundAchieved.test(currentRow)) {
                         return completedFuture(null); // End of range reached. 
Exit loop.
                     }
 
@@ -908,12 +922,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                                 return 
resolveAndCheckReadCompatibility(readResult, txId)
                                         .thenCompose(resolvedReadResult -> {
                                             if (resolvedReadResult != null) {
-                                                result.add(resolvedReadResult);
+                                                if 
(indexRowMatches(currentRow, resolvedReadResult, schemaAwareIndexStorage)) {
+                                                    
result.add(resolvedReadResult);
+                                                }
                                             }
 
                                             // Proceed scan.
                                             return continueIndexScan(
                                                     txId,
+                                                    schemaAwareIndexStorage,
                                                     indexLocker,
                                                     indexCursor,
                                                     batchSize,
@@ -925,6 +942,20 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 });
     }
 
+    /**
+     * Checks whether passed index row corresponds to the binary row.
+     *
+     * @param indexRow Index row, read from index storage.
+     * @param binaryRow Binary row, read from MV storage.
+     * @param schemaAwareIndexStorage Schema aware index storage, to resolve 
values of indexed columns in a binary row.
+     * @return {@code true} if index row matches the binary row, {@code false} 
otherwise.
+     */
+    private static boolean indexRowMatches(IndexRow indexRow, BinaryRow 
binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
+        BinaryTuple actualIndexRow = 
schemaAwareIndexStorage.resolveIndexRow(binaryRow);
+
+        return 
indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer());
+    }
+
     private CompletableFuture<Void> continueIndexLookup(
             UUID txId,
             Cursor<RowId> indexCursor,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index e8a3b4b896..d89cefd370 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -355,14 +355,17 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 new SortedIndexColumnDescriptor("intVal", NativeTypes.INT32, 
false, true)
         )));
 
-        sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId, 
indexStorage, row -> null);
+        // 2 is the index of "intVal" in the list of all columns.
+        Function<BinaryRow, BinaryTuple> columnsExtractor = 
BinaryRowConverter.columnsExtractor(schemaDescriptor, 2);
+
+        sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId, 
indexStorage, columnsExtractor);
 
         hashIndexStorage = new TableSchemaAwareIndexStorage(
                 hashIndexId,
                 new TestHashIndexStorage(partId, new 
HashIndexDescriptor(hashIndexId, List.of(
                         new HashIndexColumnDescriptor("intVal", 
NativeTypes.INT32, false)
                 ))),
-                row -> null
+                columnsExtractor
         );
 
         IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true, 
lockManager, row2Tuple);

Reply via email to