This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 018148ffab IGNITE-18518 Proper filtering of index scans, according to
MV storage data. (#2037)
018148ffab is described below
commit 018148ffab6442fa8a7e93287a047c3e3d88acf7
Author: Ivan Bessonov <[email protected]>
AuthorDate: Mon May 22 11:22:03 2023 +0300
IGNITE-18518 Proper filtering of index scans, according to MV storage data.
(#2037)
Signed-off-by: ibessonov <[email protected]>
Co-authored-by: Semyon Danilov <[email protected]>
---
.../ignite/internal/table/ItTableScanTest.java | 114 ++++++++++++++-------
.../ignite/internal/schema/BinaryRowConverter.java | 2 +-
.../distributed/TableSchemaAwareIndexStorage.java | 2 +-
.../replicator/PartitionReplicaListener.java | 67 ++++++++----
.../replication/PartitionReplicaListenerTest.java | 7 +-
5 files changed, 133 insertions(+), 59 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 9ee8498b47..a81152dfa1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -71,6 +72,7 @@ import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.TransactionException;
+import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -78,6 +80,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests to check a scan internal command.
@@ -92,6 +95,9 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
/** Ids to insert. */
private static final List<Integer> ROW_IDS = List.of(1, 2, 5, 6, 7, 10,
53);
+ /** The only partition in the table. */
+ private static final int PART_ID = 0;
+
private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("key", NativeTypes.INT32, false)},
@@ -124,21 +130,18 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
@Test
public void testInsertWaitScanComplete() throws Exception {
- int partId = 0;
IgniteTransactions transactions = CLUSTER_NODES.get(0).transactions();
InternalTransaction tx0 = (InternalTransaction) transactions.begin();
- InternalTransaction tx1 = startTxWithEnlistedPartition(partId);
+ InternalTransaction tx1 = startTxWithEnlistedPartition(PART_ID, false);
UUID sortedIndexId = getSortedIndexId();
List<BinaryRow> scannedRows = new ArrayList<>();
- IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx1.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
-
- PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+ PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx1);
- Publisher<BinaryRow> publisher = internalTable.scan(partId, tx1.id(),
recipient, sortedIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, tx1.id(),
recipient, sortedIndexId, null, null, 0, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -159,7 +162,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
IgniteTestUtils.await(scanned);
- log.info("Result: " + scannedRows.stream().map(binaryRow ->
rowToString(binaryRow)).collect(Collectors.joining(", ")));
+ log.info("Result: " +
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
")));
assertEquals(ROW_IDS.size(), scannedRows.size());
@@ -195,7 +198,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
IgniteTestUtils.await(scanned);
- log.info("Result: " + scannedRows.stream().map(binaryRow ->
rowToString(binaryRow)).collect(Collectors.joining(", ")));
+ log.info("Result: " +
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
")));
assertEquals(ROW_IDS.size() + 1, scannedRows.size());
}
@@ -379,7 +382,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
IgniteTestUtils.await(scanned);
- log.info("Result: " + scannedRows.stream().map(binaryRow ->
rowToString(binaryRow)).collect(Collectors.joining(", ")));
+ log.info("Result: " +
scannedRows.stream().map(ItTableScanTest::rowToString).collect(Collectors.joining(",
")));
assertThat(txOpFut, willCompleteSuccessfully());
@@ -400,15 +403,11 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
List<BinaryRow> scannedRows = new ArrayList<>();
- int partId = 0;
+ InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
- InternalTransaction tx = startTxWithEnlistedPartition(partId);
+ PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
- IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
-
- PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
-
- Publisher<BinaryRow> publisher = internalTable.scan(partId, tx.id(),
recipient, sortedIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, tx.id(),
recipient, sortedIndexId, null, null, 0, null);
CompletableFuture<Void> scanned = new CompletableFuture<>();
@@ -433,7 +432,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
assertEquals(ROW_IDS.size() + 1, scannedRows.size());
- Publisher<BinaryRow> publisher1 = internalTable.scan(0, tx.id(),
recipient, sortedIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher1 = internalTable.scan(PART_ID, tx.id(),
recipient, sortedIndexId, null, null, 0, null);
assertEquals(scanAllRows(publisher1).size(), scannedRows.size());
@@ -457,14 +456,11 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
UUID soredIndexId = getSortedIndexId();
- int partId = 0;
-
- InternalTransaction tx = startTxWithEnlistedPartition(partId);
- IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
- PrimaryReplica recipient = new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+ InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
+ PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
Publisher<BinaryRow> publisher = internalTable.scan(
- partId,
+ PART_ID,
tx.id(),
recipient,
soredIndexId,
@@ -476,7 +472,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
List<BinaryRow> scannedRows = scanAllRows(publisher);
- log.info("Result of scanning in old transaction: " +
scannedRows.stream().map(binaryRow -> rowToString(binaryRow))
+ log.info("Result of scanning in old transaction: " +
scannedRows.stream().map(ItTableScanTest::rowToString)
.collect(Collectors.joining(", ")));
assertEquals(3, scannedRows.size());
@@ -487,7 +483,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
kvView.put(null, Tuple.create().set("key", 9),
Tuple.create().set("valInt", 9).set("valStr", "New_9")));
Publisher<BinaryRow> publisher1 = internalTable.scan(
- partId,
+ PART_ID,
tx.id(),
recipient,
soredIndexId,
@@ -508,7 +504,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
kvView.put(null, Tuple.create().set("key", 9),
Tuple.create().set("valInt", 9).set("valStr", "New_9"));
Publisher<BinaryRow> publisher2 = internalTable.scan(
- 0,
+ PART_ID,
null,
soredIndexId,
lowBound,
@@ -521,7 +517,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
assertEquals(5, scannedRows2.size());
- log.info("Result of scanning after insert rows: " +
scannedRows2.stream().map(binaryRow -> rowToString(binaryRow))
+ log.info("Result of scanning after insert rows: " +
scannedRows2.stream().map(ItTableScanTest::rowToString)
.collect(Collectors.joining(", ")));
}
@@ -535,16 +531,12 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
UUID sortedIndexId = getSortedIndexId();
- int partId = 0;
-
- InternalTransaction tx = startTxWithEnlistedPartition(partId);
+ InternalTransaction tx = startTxWithEnlistedPartition(PART_ID,
false);
try {
- IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+ PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
- PrimaryReplica recipient = new
PrimaryReplica(leaderWithTerm.get1(), leaderWithTerm.get2());
-
- Publisher<BinaryRow> publisher = internalTable.scan(partId,
tx.id(), recipient, sortedIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID,
tx.id(), recipient, sortedIndexId, null, null, 0, null);
// Non-thread-safe collection is fine, HB is guaranteed by
"Thread#join" inside of "runRace".
List<BinaryRow> scannedRows = new ArrayList<>();
@@ -567,7 +559,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
() -> scannedRows.addAll(scanAllRows(publisher))
);
- Publisher<BinaryRow> publisher1 = internalTable.scan(0,
tx.id(), recipient, sortedIndexId, null, null, 0, null);
+ Publisher<BinaryRow> publisher1 = internalTable.scan(PART_ID,
tx.id(), recipient, sortedIndexId, null, null, 0, null);
assertEquals(scanAllRows(publisher1).size(),
scannedRows.size());
} finally {
@@ -610,6 +602,53 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
subscription.cancel();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testMvScan(boolean readOnly) throws Exception {
+ KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
+
+ kvView.remove(null, Tuple.create().set("key", ROW_IDS.get(0)));
+ kvView.remove(null, Tuple.create().set("key", ROW_IDS.get(1)));
+ kvView.put(null, Tuple.create().set("key", ROW_IDS.get(2)),
Tuple.create().set("valInt", 999).set("valStr", "Str_999"));
+
+ UUID sortedIndexId = getSortedIndexId();
+
+ InternalTransaction tx = startTxWithEnlistedPartition(PART_ID,
readOnly);
+
+ try {
+ Publisher<BinaryRow> publisher;
+
+ if (readOnly) {
+ List<String> assignments = internalTable.assignments();
+
+ // Any node from assignments will do it.
+ ClusterNode node0 =
CLUSTER_NODES.get(0).clusterNodes().stream().filter(clusterNode -> {
+ return assignments.contains(clusterNode.name());
+ }).findFirst().orElseThrow();
+
+ //noinspection DataFlowIssue
+ publisher = internalTable.scan(PART_ID, tx.readTimestamp(),
node0, sortedIndexId, null, null, 0, null);
+ } else {
+ PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
+
+ publisher = internalTable.scan(PART_ID, tx.id(), recipient,
sortedIndexId, null, null, 0, null);
+ }
+
+ List<BinaryRow> scannedRows = scanAllRows(publisher);
+
+ // Two rows are removed, one changed.
+ assertThat(scannedRows, hasSize(ROW_IDS.size() - 2));
+ } finally {
+ tx.commit();
+ }
+ }
+
+ private PrimaryReplica getLeaderRecipient(int partId, InternalTransaction
tx) {
+ IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+
+ return new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+ }
+
/**
* Represents a binary row as a string.
*
@@ -803,12 +842,13 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
* Starts an RW transaction and enlists the specified partition in it.
*
* @param partId Partition ID.
+ * @param readOnly Read-only flag for transaction.
* @return Transaction.
*/
- private InternalTransaction startTxWithEnlistedPartition(int partId) {
+ private InternalTransaction startTxWithEnlistedPartition(int partId,
boolean readOnly) {
Ignite ignite = CLUSTER_NODES.get(0);
- InternalTransaction tx = (InternalTransaction)
ignite.transactions().begin();
+ InternalTransaction tx = (InternalTransaction)
ignite.transactions().begin(new TransactionOptions().readOnly(readOnly));
InternalTable table = ((TableImpl)
ignite.tables().table(TABLE_NAME)).internalTable();
TablePartitionId tblPartId = new TablePartitionId(table.tableId(),
partId);
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
index 48a5a6e96d..e369fa4b51 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowConverter.java
@@ -182,7 +182,7 @@ public class BinaryRowConverter {
}
/** Helper method to convert from a full row or key-only row to the tuple
with specified columns. */
- public static Function<BinaryRow, BinaryTuple>
columnsExtractor(SchemaDescriptor schema, int[] columns) {
+ public static Function<BinaryRow, BinaryTuple>
columnsExtractor(SchemaDescriptor schema, int... columns) {
BinaryTupleSchema rowSchema =
BinaryTupleSchema.createRowSchema(schema);
BinaryTupleSchema keySchema =
BinaryTupleSchema.createKeySchema(schema);
BinaryTupleSchema trimmedSchema =
BinaryTupleSchema.createSchema(schema, columns);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index fd1cd8ae1f..b56f7c222d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -89,7 +89,7 @@ public class TableSchemaAwareIndexStorage {
* @param row Full row.
* @return A tuple that represents indexed columns of a row.
*/
- BinaryTuple resolveIndexRow(BinaryRow row) {
+ public BinaryTuple resolveIndexRow(BinaryRow row) {
return indexRowResolver.apply(row);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 020a9921d8..725f19008a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -49,6 +49,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.index.BinaryTupleComparator;
import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -117,6 +119,7 @@ import
org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
@@ -402,12 +405,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (request.exactKey() != null) {
assert request.lowerBound() == null && request.upperBound() ==
null : "Index lookup doesn't allow bounds.";
- return safeReadFuture.thenCompose(unused ->
lookupIndex(request, indexStorage.storage()));
+ return safeReadFuture.thenCompose(unused ->
lookupIndex(request, indexStorage));
}
assert indexStorage.storage() instanceof SortedIndexStorage;
- return safeReadFuture.thenCompose(unused ->
scanSortedIndex(request, (SortedIndexStorage) indexStorage.storage()));
+ return safeReadFuture.thenCompose(unused ->
scanSortedIndex(request, indexStorage));
}
return safeReadFuture.thenCompose(unused ->
retrieveExactEntriesUntilCursorEmpty(readTimestamp, cursorId, batchCount));
@@ -641,7 +644,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
assert indexStorage.storage() instanceof SortedIndexStorage;
- return scanSortedIndex(request, (SortedIndexStorage)
indexStorage.storage());
+ return scanSortedIndex(request, indexStorage);
}
UUID txId = request.transactionId();
@@ -683,13 +686,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Lookup sorted index in RO tx.
*
* @param request Index scan request.
- * @param indexStorage Index storage.
+ * @param schemaAwareIndexStorage Index storage.
* @return Operation future.
*/
private CompletableFuture<List<BinaryRow>> lookupIndex(
ReadOnlyScanRetrieveBatchReplicaRequest request,
- IndexStorage indexStorage
+ TableSchemaAwareIndexStorage schemaAwareIndexStorage
) {
+ IndexStorage indexStorage = schemaAwareIndexStorage.storage();
+
int batchCount = request.batchSize();
HybridTimestamp timestamp = request.readTimestamp();
@@ -702,7 +707,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
var result = new ArrayList<BinaryRow>(batchCount);
- return continueReadOnlyIndexLookup(cursor, timestamp, batchCount,
result)
+ Cursor<IndexRow> indexRowCursor = CursorUtils.map(cursor, rowId -> new
IndexRowImpl(key, rowId));
+
+ return continueReadOnlyIndexScan(schemaAwareIndexStorage,
indexRowCursor, timestamp, batchCount, result)
.thenCompose(ignore -> completedFuture(result));
}
@@ -738,13 +745,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Scans sorted index in RW tx.
*
* @param request Index scan request.
- * @param indexStorage Index storage.
+ * @param schemaAwareIndexStorage Sorted index storage.
* @return Operation future.
*/
private CompletableFuture<List<BinaryRow>> scanSortedIndex(
ReadWriteScanRetrieveBatchReplicaRequest request,
- SortedIndexStorage indexStorage
+ TableSchemaAwareIndexStorage schemaAwareIndexStorage
) {
+ var indexStorage = (SortedIndexStorage)
schemaAwareIndexStorage.storage();
+
UUID txId = request.transactionId();
int batchCount = request.batchSize();
@@ -761,7 +770,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return lockManager.acquire(txId, new LockKey(tableId),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
var comparator = new
BinaryTupleComparator(indexStorage.indexDescriptor());
- Function<IndexRow, Boolean> isUpperBoundAchieved = indexRow ->
{
+ Predicate<IndexRow> isUpperBoundAchieved = indexRow -> {
if (indexRow == null) {
return true;
}
@@ -794,7 +803,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
var result = new ArrayList<BinaryRow>(batchCount);
- return continueIndexScan(txId, indexLocker, cursor,
batchCount, result, isUpperBoundAchieved)
+ return continueIndexScan(txId, schemaAwareIndexStorage,
indexLocker, cursor, batchCount, result, isUpperBoundAchieved)
.thenApply(ignore -> result);
});
});
@@ -804,13 +813,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Scans sorted index in RO tx.
*
* @param request Index scan request.
- * @param indexStorage Index storage.
+ * @param schemaAwareIndexStorage Sorted index storage.
* @return Operation future.
*/
private CompletableFuture<List<BinaryRow>> scanSortedIndex(
ReadOnlyScanRetrieveBatchReplicaRequest request,
- SortedIndexStorage indexStorage
+ TableSchemaAwareIndexStorage schemaAwareIndexStorage
) {
+ var indexStorage = (SortedIndexStorage)
schemaAwareIndexStorage.storage();
+
UUID txId = request.transactionId();
int batchCount = request.batchSize();
HybridTimestamp timestamp = request.readTimestamp();
@@ -831,11 +842,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
var result = new ArrayList<BinaryRow>(batchCount);
- return continueReadOnlyIndexScan(cursor, timestamp, batchCount, result)
+ return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor,
timestamp, batchCount, result)
.thenApply(ignore -> result);
}
private CompletableFuture<Void> continueReadOnlyIndexScan(
+ TableSchemaAwareIndexStorage schemaAwareIndexStorage,
Cursor<IndexRow> cursor,
HybridTimestamp timestamp,
int batchSize,
@@ -865,11 +877,11 @@ public class PartitionReplicaListener implements
ReplicaListener {
return committedReadResult.binaryRow();
})
.thenComposeAsync(resolvedReadResult -> {
- if (resolvedReadResult != null) {
+ if (resolvedReadResult != null && indexRowMatches(indexRow,
resolvedReadResult, schemaAwareIndexStorage)) {
result.add(resolvedReadResult);
}
- return continueReadOnlyIndexScan(cursor, timestamp, batchSize,
result);
+ return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor,
timestamp, batchSize, result);
}, scanRequestExecutor);
}
@@ -877,6 +889,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Index scan loop. Retrieves next row from index, takes locks, fetches
associated data row and collects to the result.
*
* @param txId Transaction id.
+ * @param schemaAwareIndexStorage Index storage.
* @param indexLocker Index locker.
* @param indexCursor Index cursor.
* @param batchSize Batch size.
@@ -886,11 +899,12 @@ public class PartitionReplicaListener implements
ReplicaListener {
*/
private CompletableFuture<Void> continueIndexScan(
UUID txId,
+ TableSchemaAwareIndexStorage schemaAwareIndexStorage,
SortedIndexLocker indexLocker,
Cursor<IndexRow> indexCursor,
int batchSize,
List<BinaryRow> result,
- Function<IndexRow, Boolean> isUpperBoundAchieved
+ Predicate<IndexRow> isUpperBoundAchieved
) {
if (result.size() == batchSize) { // Batch is full, exit loop.
return completedFuture(null);
@@ -898,7 +912,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return indexLocker.locksForScan(txId, indexCursor)
.thenCompose(currentRow -> { // Index row S lock
- if (isUpperBoundAchieved.apply(currentRow)) {
+ if (isUpperBoundAchieved.test(currentRow)) {
return completedFuture(null); // End of range reached.
Exit loop.
}
@@ -908,12 +922,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
return
resolveAndCheckReadCompatibility(readResult, txId)
.thenCompose(resolvedReadResult -> {
if (resolvedReadResult != null) {
- result.add(resolvedReadResult);
+ if
(indexRowMatches(currentRow, resolvedReadResult, schemaAwareIndexStorage)) {
+
result.add(resolvedReadResult);
+ }
}
// Proceed scan.
return continueIndexScan(
txId,
+ schemaAwareIndexStorage,
indexLocker,
indexCursor,
batchSize,
@@ -925,6 +942,20 @@ public class PartitionReplicaListener implements
ReplicaListener {
});
}
+ /**
+ * Checks whether passed index row corresponds to the binary row.
+ *
+ * @param indexRow Index row, read from index storage.
+ * @param binaryRow Binary row, read from MV storage.
+ * @param schemaAwareIndexStorage Schema aware index storage, to resolve
values of indexed columns in a binary row.
+ * @return {@code true} if index row matches the binary row, {@code false}
otherwise.
+ */
+ private static boolean indexRowMatches(IndexRow indexRow, BinaryRow
binaryRow, TableSchemaAwareIndexStorage schemaAwareIndexStorage) {
+ BinaryTuple actualIndexRow =
schemaAwareIndexStorage.resolveIndexRow(binaryRow);
+
+ return
indexRow.indexColumns().byteBuffer().equals(actualIndexRow.byteBuffer());
+ }
+
private CompletableFuture<Void> continueIndexLookup(
UUID txId,
Cursor<RowId> indexCursor,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index e8a3b4b896..d89cefd370 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -355,14 +355,17 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new SortedIndexColumnDescriptor("intVal", NativeTypes.INT32,
false, true)
)));
- sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId,
indexStorage, row -> null);
+ // 2 is the index of "intVal" in the list of all columns.
+ Function<BinaryRow, BinaryTuple> columnsExtractor =
BinaryRowConverter.columnsExtractor(schemaDescriptor, 2);
+
+ sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId,
indexStorage, columnsExtractor);
hashIndexStorage = new TableSchemaAwareIndexStorage(
hashIndexId,
new TestHashIndexStorage(partId, new
HashIndexDescriptor(hashIndexId, List.of(
new HashIndexColumnDescriptor("intVal",
NativeTypes.INT32, false)
))),
- row -> null
+ columnsExtractor
);
IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true,
lockManager, row2Tuple);