This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-18738 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a33a15a8c2a7b58ccce3d44bb3d85dccc9059631 Author: Semyon Danilov <[email protected]> AuthorDate: Wed Feb 8 17:54:39 2023 +0400 IGNITE-18738 Cleanup indexes on transaction abort --- .../storage/index/impl/TestHashIndexStorage.java | 5 + .../storage/index/impl/TestSortedIndexStorage.java | 6 + .../table/distributed/StorageUpdateHandler.java | 57 +++- .../distributed/TableSchemaAwareIndexStorage.java | 4 +- .../distributed/raft/PartitionDataStorage.java | 13 + .../table/distributed/raft/PartitionListener.java | 27 +- .../SnapshotAwarePartitionDataStorage.java | 9 + .../table/distributed/IndexCleanupTest.java | 370 +++++++++++++++++++++ .../distributed/TestPartitionDataStorage.java | 7 + 9 files changed, 482 insertions(+), 16 deletions(-) diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java index 9dbdf16928..4d768ee577 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageClosedException; @@ -197,4 +198,8 @@ public class TestHashIndexStorage implements HashIndexStorage { rebalance = false; } + + public Set<RowId> allRows() { + return index.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + } } diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java index 9d15eabdc2..b2cd8507d1 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java @@ -25,8 +25,10 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.NoSuchElementException; +import java.util.Set; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTuplePrefix; @@ -371,4 +373,8 @@ public class TestSortedIndexStorage implements SortedIndexStorage { rebalance = false; } + + public Set<RowId> allRows() { + return index.values().stream().flatMap(m -> m.keySet().stream()).collect(Collectors.toSet()); + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index dae55ead81..7b55b47ca3 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -24,13 +24,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.ignite.internal.schema.TableRow; +import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; /** @@ -71,7 +74,7 @@ public class StorageUpdateHandler { UUID txId, UUID rowUuid, TablePartitionId commitPartitionId, - ByteBuffer rowBuffer, + @Nullable ByteBuffer rowBuffer, @Nullable Consumer<RowId> onReplication ) { storage.runConsistently(() -> { @@ -80,7 +83,12 @@ public class StorageUpdateHandler { UUID commitTblId = commitPartitionId.tableId(); int commitPartId = commitPartitionId.partitionId(); - storage.addWrite(rowId, row, txId, commitTblId, commitPartId); + TableRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, commitPartId); + + if (oldRow != null) { + // Previous uncommitted row should be removed from indexes. + removeFromIndex(oldRow, rowId); + } if (onReplication != null) { onReplication.accept(rowId); @@ -117,7 +125,12 @@ public class StorageUpdateHandler { RowId rowId = new RowId(partitionId, entry.getKey()); TableRow row = entry.getValue() != null ? new TableRow(entry.getValue()) : null; - storage.addWrite(rowId, row, txId, commitTblId, commitPartId); + TableRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, commitPartId); + + if (oldRow != null) { + // Previous uncommitted row should be removed from indexes. + removeFromIndex(oldRow, rowId); + } rowIds.add(rowId); addToIndexes(row, rowId); @@ -132,6 +145,44 @@ public class StorageUpdateHandler { }); } + /** + * Handles the abortion of a transaction. + * + * @param pendingRowIds Row ids of write-intents to be rolled back. + * @param onReplication On replication callback. + */ + public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable onReplication) { + storage.runConsistently(() -> { + for (RowId rowId : pendingRowIds) { + try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { + if (cursor.hasNext()) { + ReadResult item = cursor.next(); + if (item.isWriteIntent()) { + TableRow row = item.tableRow(); + + // No point in cleaning up indexes for tombstone, they should not exist. + if (row != null) { + removeFromIndex(row, item.rowId()); + } + } + } + } + } + + pendingRowIds.forEach(storage::abortWrite); + + onReplication.run(); + + return null; + }); + } + + private void removeFromIndex(TableRow row, RowId rowId) { + for (TableSchemaAwareIndexStorage index : indexes.get().values()) { + index.remove(row, rowId); + } + } + private void addToIndexes(@Nullable TableRow tableRow, RowId rowId) { if (tableRow == null) { // skip removes return; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java index be0578d8f8..dc661a5f00 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java @@ -81,8 +81,8 @@ public class TableSchemaAwareIndexStorage { * @param tableRow A table row to remove. * @param rowId An identifier of a row in a main storage. */ - public void remove(BinaryRow tableRow, RowId rowId) { - BinaryTuple tuple = indexBinaryRowResolver.apply(tableRow); + public void remove(TableRow tableRow, RowId rowId) { + BinaryTuple tuple = indexTableRowResolver.apply(tableRow); storage.remove(new IndexRowImpl(tuple, rowId)); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java index be097690b9..b008493968 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java @@ -25,9 +25,11 @@ import org.apache.ignite.internal.schema.TableRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.RaftGroupConfiguration; +import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.TxIdMismatchException; +import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -162,6 +164,17 @@ public interface PartitionDataStorage extends ManuallyCloseable { */ void commitWrite(RowId rowId, HybridTimestamp timestamp) throws StorageException; + /** + * Scans all versions of a single row. + * + * <p>{@link ReadResult#newestCommitTimestamp()} is NOT filled by this method for intents having preceding committed + * versions. + * + * @param rowId Row id. + * @return Cursor of results including both rows data and transaction-related context. The versions are ordered from newest to oldest. + */ + Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException; + /** * Returns the underlying {@link MvPartitionStorage}. Only for tests! * diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 9514245eb7..8b8a33fd43 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -313,23 +313,28 @@ public class PartitionListener implements RaftGroupListener { return; } - storage.runConsistently(() -> { - UUID txId = cmd.txId(); + UUID txId = cmd.txId(); - Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet()); + Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet()); - if (cmd.commit()) { + if (cmd.commit()) { + storage.runConsistently(() -> { pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, cmd.commitTimestamp().asHybridTimestamp())); - } else { - pendingRowIds.forEach(storage::abortWrite); - } - txsPendingRowIds.remove(txId); + txsPendingRowIds.remove(txId); - storage.lastApplied(commandIndex, commandTerm); + storage.lastApplied(commandIndex, commandTerm); - return null; - }); + return null; + }); + } else { + storageUpdateHandler.handleTransactionAbortion(pendingRowIds, () -> { + // on replication callback + txsPendingRowIds.remove(txId); + + storage.lastApplied(commandIndex, commandTerm); + }); + } } /** diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java index 2376e93b2b..7af2db5260 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java @@ -24,11 +24,13 @@ import org.apache.ignite.internal.schema.TableRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.RaftGroupConfiguration; +import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.TxIdMismatchException; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey; +import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; @@ -129,6 +131,13 @@ public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage { partitionStorage.commitWrite(rowId, timestamp); } + @Override + public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException { + handleSnapshotInterference(rowId); + + return partitionStorage.scanVersions(rowId); + } + private void handleSnapshotInterference(RowId rowId) { PartitionSnapshots partitionSnapshots = getPartitionSnapshots(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java new file mode 100644 index 0000000000..f7447f923e --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.distributed.TestPartitionDataStorage; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.schema.BinaryConverter; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryTuple; +import org.apache.ignite.internal.schema.BinaryTupleSchema; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.TableRow; +import org.apache.ignite.internal.schema.TableRowConverter; +import org.apache.ignite.internal.schema.marshaller.KvMarshaller; +import org.apache.ignite.internal.schema.marshaller.MarshallerException; +import org.apache.ignite.internal.schema.marshaller.MarshallerFactory; +import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; +import org.apache.ignite.internal.storage.index.HashIndexDescriptor; +import org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor; +import org.apache.ignite.internal.storage.index.SortedIndexDescriptor; +import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor; +import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; +import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.lang.IgniteException; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +/** Tests indexes cleaning up on data removal and transaction abortions. */ +public class IndexCleanupTest { + /** Default reflection marshaller factory. */ + private static final MarshallerFactory MARSHALLER_FACTORY = new ReflectionMarshallerFactory(); + + private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new SchemaDescriptor(1, new Column[]{ + new Column("INTKEY", NativeTypes.INT32, false), + new Column("STRKEY", NativeTypes.STRING, false), + }, new Column[]{ + new Column("INTVAL", NativeTypes.INT32, false), + new Column("STRVAL", NativeTypes.STRING, false), + }); + + private static final BinaryTupleSchema TUPLE_SCHEMA = BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR); + + private static final BinaryTupleSchema PK_INDEX_SCHEMA = BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR); + + private static final BinaryConverter PK_INDEX_BINARY_ROW_CONVERTER = BinaryConverter.forKey(SCHEMA_DESCRIPTOR); + private static final TableRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = new TableRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA); + + private static final int[] USER_INDEX_COLS = { + SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(), + SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex() + }; + + private static final BinaryTupleSchema USER_INDEX_SCHEMA = BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS); + + private static final BinaryConverter USER_INDEX_BINARY_ROW_CONVERTER = BinaryConverter.forValue(SCHEMA_DESCRIPTOR); + private static final TableRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER = new TableRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA); + + /** Key-value {@link BinaryTuple} converter for tests. */ + private static final BinaryConverter KV_BINARY_CONVERTER = BinaryConverter.forRow(SCHEMA_DESCRIPTOR); + + /** Key-value marshaller for tests. */ + private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER + = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class, TestValue.class); + + private static final UUID TX_ID = UUID.randomUUID(); + + private static final HybridClock CLOCK = new HybridClockImpl(); + + private TestHashIndexStorage pkInnerStorage; + private TestSortedIndexStorage sortedInnerStorage; + private TestHashIndexStorage hashInnerStorage; + private TestMvPartitionStorage storage; + private TestPartitionDataStorage dataStorage; + private StorageUpdateHandler storageUpdateHandler; + + @BeforeEach + void setUp() { + UUID pkIndexId = UUID.randomUUID(); + UUID sortedIndexId = UUID.randomUUID(); + UUID hashIndexId = UUID.randomUUID(); + + pkInnerStorage = new TestHashIndexStorage(null); + + TableSchemaAwareIndexStorage pkStorage = new TableSchemaAwareIndexStorage( + pkIndexId, + pkInnerStorage, + PK_INDEX_BINARY_ROW_CONVERTER::toTuple, + PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple + ); + + sortedInnerStorage = new TestSortedIndexStorage(new SortedIndexDescriptor(sortedIndexId, List.of( + new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, false, true), + new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, false, true) + ))); + + TableSchemaAwareIndexStorage sortedIndexStorage = new TableSchemaAwareIndexStorage( + sortedIndexId, + sortedInnerStorage, + USER_INDEX_BINARY_ROW_CONVERTER::toTuple, + USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple + ); + + hashInnerStorage = new TestHashIndexStorage(new HashIndexDescriptor(hashIndexId, List.of( + new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, false), + new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, false) + ))); + + TableSchemaAwareIndexStorage hashIndexStorage = new TableSchemaAwareIndexStorage( + hashIndexId, + hashInnerStorage, + USER_INDEX_BINARY_ROW_CONVERTER::toTuple, + USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple + ); + + storage = new TestMvPartitionStorage(1); + + dataStorage = new TestPartitionDataStorage(storage); + + storageUpdateHandler = new StorageUpdateHandler(1, dataStorage, + () -> Map.of( + pkIndexId, pkStorage, + sortedIndexId, sortedIndexStorage, + hashIndexId, hashIndexStorage + ) + ); + } + + @ParameterizedTest + @EnumSource(AddWrite.class) + void testAbort(AddWrite writer) { + UUID rowUuid = UUID.randomUUID(); + RowId rowId = new RowId(1, rowUuid); + + var key = new TestKey(1, "foo"); + var value = new TestValue(2, "bar"); + TableRow tableRow = tableRow(key, value); + + writer.addWrite(storageUpdateHandler, rowUuid, tableRow); + + assertEquals(1, storage.rowsCount()); + assertThat(pkInnerStorage.allRows(), contains(rowId)); + assertThat(sortedInnerStorage.allRows(), contains(rowId)); + assertThat(hashInnerStorage.allRows(), contains(rowId)); + + storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> {}); + + assertEquals(0, storage.rowsCount()); + assertTrue(pkInnerStorage.allRows().isEmpty()); + assertTrue(sortedInnerStorage.allRows().isEmpty()); + assertTrue(hashInnerStorage.allRows().isEmpty()); + } + + @ParameterizedTest + @EnumSource(AddWrite.class) + void testTombstoneCleansUpIndexes(AddWrite writer) { + UUID rowUuid = UUID.randomUUID(); + + var key = new TestKey(1, "foo"); + var value = new TestValue(2, "bar"); + TableRow tableRow = tableRow(key, value); + + writer.addWrite(storageUpdateHandler, rowUuid, tableRow); + writer.addWrite(storageUpdateHandler, rowUuid, null); + + // Write intent is in the storage. + assertEquals(1, storage.rowsCount()); + + // But indexes are removed. + assertTrue(pkInnerStorage.allRows().isEmpty()); + assertTrue(sortedInnerStorage.allRows().isEmpty()); + assertTrue(hashInnerStorage.allRows().isEmpty()); + } + + @ParameterizedTest + @EnumSource(AddWrite.class) + void testAbortTombstone(AddWrite writer) { + UUID rowUuid = UUID.randomUUID(); + RowId rowId = new RowId(1, rowUuid); + + var key = new TestKey(1, "foo"); + var value = new TestValue(2, "bar"); + TableRow tableRow = tableRow(key, value); + + writer.addWrite(storageUpdateHandler, rowUuid, tableRow); + writer.addWrite(storageUpdateHandler, rowUuid, null); + + storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> {}); + + assertEquals(0, storage.rowsCount()); + assertTrue(pkInnerStorage.allRows().isEmpty()); + assertTrue(sortedInnerStorage.allRows().isEmpty()); + assertTrue(hashInnerStorage.allRows().isEmpty()); + } + + @ParameterizedTest + @EnumSource(AddWrite.class) + void testAbortConsecutiveTxWithMatchingIndexes(AddWrite writer) { + UUID rowUuid1 = UUID.randomUUID(); + UUID rowUuid2 = UUID.randomUUID(); + RowId rowId1 = new RowId(1, rowUuid1); + RowId rowId2 = new RowId(1, rowUuid2); + + var key1 = new TestKey(1, "foo"); + var key2 = new TestKey(2, "baz"); + var value = new TestValue(2, "bar"); + + writer.addWrite(storageUpdateHandler, rowUuid1, tableRow(key1, value)); + commitWrite(rowId1); + + writer.addWrite(storageUpdateHandler, rowUuid2, tableRow(key2, value)); + + storageUpdateHandler.handleTransactionAbortion(Set.of(rowId2), () -> {}); + + assertEquals(1, storage.rowsCount()); + + Set<RowId> pkRows = pkInnerStorage.allRows(); + Set<RowId> sortedRows = sortedInnerStorage.allRows(); + Set<RowId> hashRows = hashInnerStorage.allRows(); + + assertThat(pkRows, contains(rowId1)); + assertThat(sortedRows, contains(rowId1)); + assertThat(hashRows, contains(rowId1)); + + assertThat(pkRows, not(contains(rowId2))); + assertThat(sortedRows, not(contains(rowId2))); + assertThat(hashRows, not(contains(rowId2))); + } + + @ParameterizedTest + @EnumSource(AddWrite.class) + void testIndexNotRemovedOnTombstone(AddWrite writer) { + UUID rowUuid = UUID.randomUUID(); + RowId rowId = new RowId(1, rowUuid); + + var key = new TestKey(1, "foo"); + var value = new TestValue(2, "bar"); + TableRow tableRow = tableRow(key, value); + + writer.addWrite(storageUpdateHandler, rowUuid, tableRow); + commitWrite(rowId); + + writer.addWrite(storageUpdateHandler, rowUuid, null); + + storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> {}); + + assertEquals(1, storage.rowsCount()); + assertThat(pkInnerStorage.allRows(), contains(rowId)); + assertThat(sortedInnerStorage.allRows(), contains(rowId)); + assertThat(hashInnerStorage.allRows(), contains(rowId)); + } + + /** Enum that encapsulates update API. */ + enum AddWrite { + /** Uses update api. */ + USE_UPDATE { + @Override + void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable TableRow row) { + handler.handleUpdate( + TX_ID, + rowUuid, + partitionId, + row == null ? null : row.byteBuffer(), + (unused) -> {} + ); + } + }, + /** Uses updateAll api. */ + USE_UPDATE_ALL { + @Override + void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable TableRow row) { + handler.handleUpdateAll( + TX_ID, + singletonMap(rowUuid, row == null ? null : row.byteBuffer()), + partitionId, + (unused) -> {} + ); + } + }; + + void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable TableRow row) { + TablePartitionId tablePartitionId = new TablePartitionId(UUID.randomUUID(), 1); + + addWrite(handler, tablePartitionId, rowUuid, row); + } + + abstract void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable TableRow row); + } + + private static BinaryRow binaryRow(TestKey key, TestValue value) { + try { + return KV_MARSHALLER.marshal(key, value); + } catch (MarshallerException e) { + throw new IgniteException(e); + } + } + + private static TableRow tableRow(TestKey key, TestValue value) { + return TableRowConverter.fromBinaryRow(binaryRow(key, value), KV_BINARY_CONVERTER); + } + + private void commitWrite(RowId rowId) { + storage.runConsistently(() -> { + storage.commitWrite(rowId, CLOCK.now()); + + return null; + }); + } + + private static class TestKey { + int intKey; + + String strKey; + + TestKey() { + } + + TestKey(int intKey, String strKey) { + this.intKey = intKey; + this.strKey = strKey; + } + } + + private static class TestValue { + Integer intVal; + + String strVal; + + TestValue() { + } + + TestValue(Integer intVal, String strVal) { + this.intVal = intVal; + this.strVal = strVal; + } + } +} diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java index 0c937c1f4b..b5ecfd78ba 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java @@ -26,10 +26,12 @@ import org.apache.ignite.internal.schema.TableRow; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.RaftGroupConfiguration; +import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; import org.apache.ignite.internal.storage.TxIdMismatchException; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; +import org.apache.ignite.internal.util.Cursor; import org.jetbrains.annotations.Nullable; /** @@ -106,6 +108,11 @@ public class TestPartitionDataStorage implements PartitionDataStorage { partitionStorage.commitWrite(rowId, timestamp); } + @Override + public Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException { + return partitionStorage.scanVersions(rowId); + } + @Override public MvPartitionStorage getStorage() { return partitionStorage;
