This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26b90aef12 IGNITE-18738 Cleanup indexes on transaction abort (#1657)
26b90aef12 is described below
commit 26b90aef128936f2b479f9195242a5c203b72805
Author: Semyon Danilov <[email protected]>
AuthorDate: Fri Feb 10 12:35:38 2023 +0400
IGNITE-18738 Cleanup indexes on transaction abort (#1657)
---
.../storage/index/impl/TestHashIndexStorage.java | 8 +
.../storage/index/impl/TestSortedIndexStorage.java | 9 +
.../table/distributed/StorageUpdateHandler.java | 131 ++++++-
.../distributed/TableSchemaAwareIndexStorage.java | 10 +
.../distributed/raft/PartitionDataStorage.java | 13 +
.../table/distributed/raft/PartitionListener.java | 27 +-
.../SnapshotAwarePartitionDataStorage.java | 9 +
.../table/distributed/IndexCleanupTest.java | 431 +++++++++++++++++++++
.../replication/PartitionReplicaListenerTest.java | 24 +-
.../distributed/TestPartitionDataStorage.java | 7 +
10 files changed, 646 insertions(+), 23 deletions(-)
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index 9dbdf16928..49563660e2 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageClosedException;
@@ -197,4 +198,11 @@ public class TestHashIndexStorage implements
HashIndexStorage {
rebalance = false;
}
+
+ /**
+ * Returns all indexed row ids.
+ */
+ public Set<RowId> allRowsIds() {
+ return
index.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 9d15eabdc2..b23297eb2a 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -25,8 +25,10 @@ import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -371,4 +373,11 @@ public class TestSortedIndexStorage implements
SortedIndexStorage {
rebalance = false;
}
+
+ /**
+ * Returns all indexed row ids.
+ */
+ public Set<RowId> allRowsIds() {
+ return index.values().stream().flatMap(m ->
m.keySet().stream()).collect(Collectors.toSet());
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 377552392d..3bdc0aff09 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -24,14 +24,18 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
/**
@@ -72,7 +76,7 @@ public class StorageUpdateHandler {
UUID txId,
UUID rowUuid,
TablePartitionId commitPartitionId,
- ByteBuffer rowBuffer,
+ @Nullable ByteBuffer rowBuffer,
@Nullable Consumer<RowId> onReplication
) {
storage.runConsistently(() -> {
@@ -81,7 +85,12 @@ public class StorageUpdateHandler {
UUID commitTblId = commitPartitionId.tableId();
int commitPartId = commitPartitionId.partitionId();
- storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+ BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId,
commitPartId);
+
+ if (oldRow != null) {
+ // Previous uncommitted row should be removed from indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
if (onReplication != null) {
onReplication.accept(rowId);
@@ -118,7 +127,12 @@ public class StorageUpdateHandler {
RowId rowId = new RowId(partitionId, entry.getKey());
BinaryRow row = entry.getValue() != null ? new
ByteBufferRow(entry.getValue()) : null;
- storage.addWrite(rowId, row, txId, commitTblId,
commitPartId);
+ BinaryRow oldRow = storage.addWrite(rowId, row, txId,
commitTblId, commitPartId);
+
+ if (oldRow != null) {
+ // Previous uncommitted row should be removed from
indexes.
+ tryRemovePreviousWritesIndex(rowId, oldRow);
+ }
rowIds.add(rowId);
addToIndexes(row, rowId);
@@ -133,6 +147,117 @@ public class StorageUpdateHandler {
});
}
+ /**
+ * Tries to remove a previous write from index.
+ *
+ * @param rowId Row id.
+ * @param previousRow Previous write value.
+ */
+ private void tryRemovePreviousWritesIndex(RowId rowId, BinaryRow
previousRow) {
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ if (!cursor.hasNext()) {
+ return;
+ }
+
+ tryRemoveFromIndexes(previousRow, rowId, cursor);
+ }
+ }
+
+ /**
+ * Handles the abortion of a transaction.
+ *
+ * @param pendingRowIds Row ids of write-intents to be rolled back.
+ * @param onReplication On replication callback.
+ */
+ public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable
onReplication) {
+ storage.runConsistently(() -> {
+ for (RowId rowId : pendingRowIds) {
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ if (!cursor.hasNext()) {
+ continue;
+ }
+
+ ReadResult item = cursor.next();
+
+ assert item.isWriteIntent();
+
+ BinaryRow rowToRemove = item.binaryRow();
+
+ if (rowToRemove == null) {
+ continue;
+ }
+
+ tryRemoveFromIndexes(rowToRemove, rowId, cursor);
+ }
+ }
+
+ pendingRowIds.forEach(storage::abortWrite);
+
+ onReplication.run();
+
+ return null;
+ });
+ }
+
+ /**
+ * Tries removing indexed row from every index.
+ * Removes the row only if no previous value's index matches index of the
row to remove, because if it matches, then the index
+ * might still be in use.
+ *
+ * @param rowToRemove Row to remove from indexes.
+ * @param rowId Row id.
+ * @param previousValues Cursor with previous version of the row.
+ */
+ private void tryRemoveFromIndexes(BinaryRow rowToRemove, RowId rowId,
Cursor<ReadResult> previousValues) {
+ TableSchemaAwareIndexStorage[] indexes =
this.indexes.get().values().toArray(new TableSchemaAwareIndexStorage[0]);
+
+ ByteBuffer[] indexValues = new ByteBuffer[indexes.length];
+
+ // Precalculate value for every index.
+ for (int i = 0; i < indexes.length; i++) {
+ TableSchemaAwareIndexStorage index = indexes[i];
+
+ indexValues[i] = index.resolveIndexRow(rowToRemove).byteBuffer();
+ }
+
+ while (previousValues.hasNext()) {
+ ReadResult previousVersion = previousValues.next();
+
+ BinaryRow previousRow = previousVersion.binaryRow();
+
+ // No point in cleaning up indexes for tombstone, they should not
exist.
+ if (previousRow != null) {
+ for (int i = 0; i < indexes.length; i++) {
+ TableSchemaAwareIndexStorage index = indexes[i];
+
+ if (index == null) {
+ continue;
+ }
+
+ // If any of the previous versions' index value matches
the index value of
+ // the row to remove, then we can't remove that index as
it can still be used.
+ BinaryTuple previousRowIndex =
index.resolveIndexRow(previousRow);
+
+ if (indexValues[i].equals(previousRowIndex.byteBuffer())) {
+ indexes[i] = null;
+ }
+ }
+ }
+ }
+
+ for (TableSchemaAwareIndexStorage index : indexes) {
+ if (index != null) {
+ index.remove(rowToRemove, rowId);
+ }
+ }
+ }
+
+ private void removeFromIndex(BinaryRow row, RowId rowId) {
+ for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
+ index.remove(row, rowId);
+ }
+ }
+
private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
if (binaryRow == null) { // skip removes
return;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index b42ff952c2..c0a530bec6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -83,6 +83,16 @@ public class TableSchemaAwareIndexStorage {
storage.remove(new IndexRowImpl(tuple, rowId));
}
+ /**
+ * Resolves index row value.
+ *
+ * @param row Full row.
+ * @return Index value.
+ */
+ public BinaryTuple resolveIndexRow(BinaryRow row) {
+ return indexRowResolver.apply(row);
+ }
+
/** Returns underlying index storage. */
public IndexStorage storage() {
return storage;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index 74b87e0d76..5b9ac5b6e1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -25,9 +25,11 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -162,6 +164,17 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
*/
void commitWrite(RowId rowId, HybridTimestamp timestamp) throws
StorageException;
+ /**
+ * Scans all versions of a single row.
+ *
+ * <p>{@link ReadResult#newestCommitTimestamp()} is NOT filled by this
method for intents having preceding committed
+ * versions.
+ *
+ * @param rowId Row id.
+ * @return Cursor of results including both rows data and
transaction-related context. The versions are ordered from newest to oldest.
+ */
+ Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
+
/**
* Returns the underlying {@link MvPartitionStorage}. Only for tests!
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9514245eb7..8b8a33fd43 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -313,23 +313,28 @@ public class PartitionListener implements
RaftGroupListener {
return;
}
- storage.runConsistently(() -> {
- UUID txId = cmd.txId();
+ UUID txId = cmd.txId();
- Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId,
Collections.emptySet());
+ Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId,
Collections.emptySet());
- if (cmd.commit()) {
+ if (cmd.commit()) {
+ storage.runConsistently(() -> {
pendingRowIds.forEach(rowId -> storage.commitWrite(rowId,
cmd.commitTimestamp().asHybridTimestamp()));
- } else {
- pendingRowIds.forEach(storage::abortWrite);
- }
- txsPendingRowIds.remove(txId);
+ txsPendingRowIds.remove(txId);
- storage.lastApplied(commandIndex, commandTerm);
+ storage.lastApplied(commandIndex, commandTerm);
- return null;
- });
+ return null;
+ });
+ } else {
+ storageUpdateHandler.handleTransactionAbortion(pendingRowIds, ()
-> {
+ // on replication callback
+ txsPendingRowIds.remove(txId);
+
+ storage.lastApplied(commandIndex, commandTerm);
+ });
+ }
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 4b717cb119..a5bf3601d9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -24,11 +24,13 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -129,6 +131,13 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
partitionStorage.commitWrite(rowId, timestamp);
}
+ @Override
+ public Cursor<ReadResult> scanVersions(RowId rowId) throws
StorageException {
+ handleSnapshotInterference(rowId);
+
+ return partitionStorage.scanVersions(rowId);
+ }
+
private void handleSnapshotInterference(RowId rowId) {
PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
new file mode 100644
index 0000000000..44ba9a280d
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests indexes cleaning up on data removal and transaction abortions. */
+public class IndexCleanupTest {
+ /** Default reflection marshaller factory. */
+ private static final MarshallerFactory MARSHALLER_FACTORY = new
ReflectionMarshallerFactory();
+
+ private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new
SchemaDescriptor(1, new Column[]{
+ new Column("INTKEY", NativeTypes.INT32, false),
+ new Column("STRKEY", NativeTypes.STRING, false),
+ }, new Column[]{
+ new Column("INTVAL", NativeTypes.INT32, false),
+ new Column("STRVAL", NativeTypes.STRING, false),
+ });
+
+ private static final BinaryTupleSchema TUPLE_SCHEMA =
BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
+
+ private static final BinaryTupleSchema PK_INDEX_SCHEMA =
BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR);
+
+ private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER =
new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
+
+ private static final int[] USER_INDEX_COLS = {
+ SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(),
+ SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex()
+ };
+
+ private static final BinaryTupleSchema USER_INDEX_SCHEMA =
BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS);
+
+ private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER
= new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
+
+ /** Key-value marshaller for tests. */
+ private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER
+ = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class,
TestValue.class);
+
+ private static final UUID TX_ID = UUID.randomUUID();
+
+ private static final HybridClock CLOCK = new HybridClockImpl();
+
+ private TestHashIndexStorage pkInnerStorage;
+ private TestSortedIndexStorage sortedInnerStorage;
+ private TestHashIndexStorage hashInnerStorage;
+ private TestMvPartitionStorage storage;
+ private StorageUpdateHandler storageUpdateHandler;
+
+ @BeforeEach
+ void setUp() {
+ UUID pkIndexId = UUID.randomUUID();
+ UUID sortedIndexId = UUID.randomUUID();
+ UUID hashIndexId = UUID.randomUUID();
+
+ pkInnerStorage = new TestHashIndexStorage(null);
+
+ TableSchemaAwareIndexStorage pkStorage = new
TableSchemaAwareIndexStorage(
+ pkIndexId,
+ pkInnerStorage,
+ PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+ );
+
+ sortedInnerStorage = new TestSortedIndexStorage(new
SortedIndexDescriptor(sortedIndexId, List.of(
+ new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32,
false, true),
+ new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING,
false, true)
+ )));
+
+ TableSchemaAwareIndexStorage sortedIndexStorage = new
TableSchemaAwareIndexStorage(
+ sortedIndexId,
+ sortedInnerStorage,
+ USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+ );
+
+ hashInnerStorage = new TestHashIndexStorage(new
HashIndexDescriptor(hashIndexId, List.of(
+ new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32,
false),
+ new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING,
false)
+ )));
+
+ TableSchemaAwareIndexStorage hashIndexStorage = new
TableSchemaAwareIndexStorage(
+ hashIndexId,
+ hashInnerStorage,
+ USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+ );
+
+ storage = new TestMvPartitionStorage(1);
+
+ storageUpdateHandler = new StorageUpdateHandler(1, new
TestPartitionDataStorage(storage),
+ () -> Map.of(
+ pkIndexId, pkStorage,
+ sortedIndexId, sortedIndexStorage,
+ hashIndexId, hashIndexStorage
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testAbort(AddWrite writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+ BinaryRow tableRow = binaryRow(key, value);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+ assertEquals(1, storage.rowsCount());
+ assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+
+ storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () ->
{});
+
+ assertEquals(0, storage.rowsCount());
+ assertTrue(pkInnerStorage.allRowsIds().isEmpty());
+ assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
+ assertTrue(hashInnerStorage.allRowsIds().isEmpty());
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testTombstoneCleansUpIndexes(AddWrite writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+ BinaryRow tableRow = binaryRow(key, value);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+ // Write intent is in the storage.
+ assertEquals(1, storage.rowsCount());
+
+ // Indexes are already in the storage.
+ assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+
+ writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+ // Write intent is in the storage.
+ assertEquals(1, storage.rowsCount());
+
+ // But indexes are removed.
+ assertTrue(pkInnerStorage.allRowsIds().isEmpty());
+ assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
+ assertTrue(hashInnerStorage.allRowsIds().isEmpty());
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testAbortTombstone(AddWrite writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+ BinaryRow tableRow = binaryRow(key, value);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+ storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () ->
{});
+
+ assertEquals(0, storage.rowsCount());
+ assertTrue(pkInnerStorage.allRowsIds().isEmpty());
+ assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
+ assertTrue(hashInnerStorage.allRowsIds().isEmpty());
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testAbortConsecutiveTxWithMatchingIndexes(AddWrite writer) {
+ UUID rowUuid1 = UUID.randomUUID();
+ UUID rowUuid2 = UUID.randomUUID();
+ RowId rowId1 = new RowId(1, rowUuid1);
+ RowId rowId2 = new RowId(1, rowUuid2);
+
+ var key1 = new TestKey(1, "foo");
+ var key2 = new TestKey(2, "baz");
+ var value = new TestValue(2, "bar");
+
+ writer.addWrite(storageUpdateHandler, rowUuid1, binaryRow(key1,
value));
+ commitWrite(rowId1);
+
+ writer.addWrite(storageUpdateHandler, rowUuid2, binaryRow(key2,
value));
+
+ storageUpdateHandler.handleTransactionAbortion(Set.of(rowId2), () ->
{});
+
+ assertEquals(1, storage.rowsCount());
+
+ Set<RowId> pkRows = pkInnerStorage.allRowsIds();
+ Set<RowId> sortedRows = sortedInnerStorage.allRowsIds();
+ Set<RowId> hashRows = hashInnerStorage.allRowsIds();
+
+ assertThat(pkRows, contains(rowId1));
+ assertThat(sortedRows, contains(rowId1));
+ assertThat(hashRows, contains(rowId1));
+
+ assertThat(pkRows, not(contains(rowId2)));
+ assertThat(sortedRows, not(contains(rowId2)));
+ assertThat(hashRows, not(contains(rowId2)));
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testIndexNotRemovedOnTombstone(AddWrite writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+ BinaryRow tableRow = binaryRow(key, value);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ commitWrite(rowId);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+ storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () ->
{});
+
+ assertEquals(1, storage.rowsCount());
+ assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testIndexNotRemovedWhileAbortingIfPreviousVersionExists(AddWrite
writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+ BinaryRow tableRow = binaryRow(key, value);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ commitWrite(rowId);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+ storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () ->
{});
+
+ assertEquals(1, storage.rowsCount());
+ assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testIndexNotRemovedWhileWritingIfPreviousVersionExists(AddWrite
writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+ BinaryRow tableRow = binaryRow(key, value);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ commitWrite(rowId);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+ assertEquals(1, storage.rowsCount());
+ assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void
testIndexNotRemovedWhileWritingIfMultiplePreviousVersionsExists(AddWrite
writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+ BinaryRow tableRow = binaryRow(key, value);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ commitWrite(rowId);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ commitWrite(rowId);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+ assertEquals(1, storage.rowsCount());
+ assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+ assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+ }
+
+ /** Enum that encapsulates update API. */
+ enum AddWrite {
+ /** Uses update api. */
+ USE_UPDATE {
+ @Override
+ void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+ handler.handleUpdate(
+ TX_ID,
+ rowUuid,
+ partitionId,
+ row == null ? null : row.byteBuffer(),
+ (unused) -> {}
+ );
+ }
+ },
+ /** Uses updateAll api. */
+ USE_UPDATE_ALL {
+ @Override
+ void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+ handler.handleUpdateAll(
+ TX_ID,
+ singletonMap(rowUuid, row == null ? null :
row.byteBuffer()),
+ partitionId,
+ (unused) -> {}
+ );
+ }
+ };
+
+ void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable
BinaryRow row) {
+ TablePartitionId tablePartitionId = new
TablePartitionId(UUID.randomUUID(), 1);
+
+ addWrite(handler, tablePartitionId, rowUuid, row);
+ }
+
+ abstract void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row);
+ }
+
+ private static BinaryRow binaryRow(TestKey key, TestValue value) {
+ try {
+ return KV_MARSHALLER.marshal(key, value);
+ } catch (MarshallerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void commitWrite(RowId rowId) {
+ storage.runConsistently(() -> {
+ storage.commitWrite(rowId, CLOCK.now());
+
+ return null;
+ });
+ }
+
+ private static class TestKey {
+ int intKey;
+
+ String strKey;
+
+ TestKey() {
+ }
+
+ TestKey(int intKey, String strKey) {
+ this.intKey = intKey;
+ this.strKey = strKey;
+ }
+ }
+
+ private static class TestValue {
+ Integer intVal;
+
+ String strVal;
+
+ TestValue() {
+ }
+
+ TestValue(Integer intVal, String strVal) {
+ this.intVal = intVal;
+ this.strVal = strVal;
+ }
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 5997e3be94..12c6b00d9f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -821,7 +821,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
checkRowInMvStorage(br, true);
doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE);
- checkRowInMvStorage(binaryRow(0), false);
+ checkNoRowInIndex(binaryRow(0));
doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
checkRowInMvStorage(binaryRow(0), true);
@@ -835,12 +835,12 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
checkRowInMvStorage(br, true);
doSingleRowRequest(txId, br, RequestType.RW_GET_AND_DELETE);
- checkRowInMvStorage(br, false);
+ checkNoRowInIndex(br);
doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
checkRowInMvStorage(binaryRow(0), true);
doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE_EXACT);
- checkRowInMvStorage(binaryRow(0), false);
+ checkNoRowInIndex(binaryRow(0));
cleanup(txId);
}
@@ -866,17 +866,17 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
checkRowInMvStorage(newRow1, true);
doMultiRowRequest(txId, newRows, RequestType.RW_DELETE_ALL);
- checkRowInMvStorage(row0, false);
- checkRowInMvStorage(row1, false);
- checkRowInMvStorage(newRow0, false);
- checkRowInMvStorage(newRow1, false);
+ checkNoRowInIndex(row0);
+ checkNoRowInIndex(row1);
+ checkNoRowInIndex(newRow0);
+ checkNoRowInIndex(newRow1);
doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL);
checkRowInMvStorage(row0, true);
checkRowInMvStorage(row1, true);
doMultiRowRequest(txId, rows, RequestType.RW_DELETE_EXACT_ALL);
- checkRowInMvStorage(row0, false);
- checkRowInMvStorage(row1, false);
+ checkNoRowInIndex(row0);
+ checkNoRowInIndex(row1);
cleanup(txId);
}
@@ -980,6 +980,12 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
}
+ private void checkNoRowInIndex(BinaryRow binaryRow) {
+ try (Cursor<RowId> cursor = pkStorage.get().get(binaryRow)) {
+ assertFalse(cursor.hasNext());
+ }
+ }
+
private void testWriteIntentOnPrimaryReplica(
UUID txId,
Supplier<ReadWriteReplicaRequest> updatingRequestSupplier,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index dec8b7d67f..3391eff013 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -26,10 +26,12 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
/**
@@ -106,6 +108,11 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
partitionStorage.commitWrite(rowId, timestamp);
}
+ @Override
+ public Cursor<ReadResult> scanVersions(RowId rowId) throws
StorageException {
+ return partitionStorage.scanVersions(rowId);
+ }
+
@Override
public MvPartitionStorage getStorage() {
return partitionStorage;