This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-18738
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a33a15a8c2a7b58ccce3d44bb3d85dccc9059631
Author: Semyon Danilov <[email protected]>
AuthorDate: Wed Feb 8 17:54:39 2023 +0400

    IGNITE-18738 Cleanup indexes on transaction abort
---
 .../storage/index/impl/TestHashIndexStorage.java   |   5 +
 .../storage/index/impl/TestSortedIndexStorage.java |   6 +
 .../table/distributed/StorageUpdateHandler.java    |  57 +++-
 .../distributed/TableSchemaAwareIndexStorage.java  |   4 +-
 .../distributed/raft/PartitionDataStorage.java     |  13 +
 .../table/distributed/raft/PartitionListener.java  |  27 +-
 .../SnapshotAwarePartitionDataStorage.java         |   9 +
 .../table/distributed/IndexCleanupTest.java        | 370 +++++++++++++++++++++
 .../distributed/TestPartitionDataStorage.java      |   7 +
 9 files changed, 482 insertions(+), 16 deletions(-)

diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index 9dbdf16928..4d768ee577 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageClosedException;
@@ -197,4 +198,8 @@ public class TestHashIndexStorage implements 
HashIndexStorage {
 
         rebalance = false;
     }
+
+    public Set<RowId> allRows() {
+        return 
index.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 9d15eabdc2..b2cd8507d1 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -25,8 +25,10 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -371,4 +373,8 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
         rebalance = false;
     }
+
+    public Set<RowId> allRows() {
+        return index.values().stream().flatMap(m -> 
m.keySet().stream()).collect(Collectors.toSet());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index dae55ead81..7b55b47ca3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -24,13 +24,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -71,7 +74,7 @@ public class StorageUpdateHandler {
             UUID txId,
             UUID rowUuid,
             TablePartitionId commitPartitionId,
-            ByteBuffer rowBuffer,
+            @Nullable ByteBuffer rowBuffer,
             @Nullable Consumer<RowId> onReplication
     ) {
         storage.runConsistently(() -> {
@@ -80,7 +83,12 @@ public class StorageUpdateHandler {
             UUID commitTblId = commitPartitionId.tableId();
             int commitPartId = commitPartitionId.partitionId();
 
-            storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+            TableRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
+
+            if (oldRow != null) {
+                // Previous uncommitted row should be removed from indexes.
+                removeFromIndex(oldRow, rowId);
+            }
 
             if (onReplication != null) {
                 onReplication.accept(rowId);
@@ -117,7 +125,12 @@ public class StorageUpdateHandler {
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     TableRow row = entry.getValue() != null ? new 
TableRow(entry.getValue()) : null;
 
-                    storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
+                    TableRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
+
+                    if (oldRow != null) {
+                        // Previous uncommitted row should be removed from 
indexes.
+                        removeFromIndex(oldRow, rowId);
+                    }
 
                     rowIds.add(rowId);
                     addToIndexes(row, rowId);
@@ -132,6 +145,44 @@ public class StorageUpdateHandler {
         });
     }
 
+    /**
+     * Handles the abortion of a transaction.
+     *
+     * @param pendingRowIds Row ids of write-intents to be rolled back.
+     * @param onReplication On replication callback.
+     */
+    public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable 
onReplication) {
+        storage.runConsistently(() -> {
+            for (RowId rowId : pendingRowIds) {
+                try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+                    if (cursor.hasNext()) {
+                        ReadResult item = cursor.next();
+                        if (item.isWriteIntent()) {
+                            TableRow row = item.tableRow();
+
+                            // No point in cleaning up indexes for tombstone, 
they should not exist.
+                            if (row != null) {
+                                removeFromIndex(row, item.rowId());
+                            }
+                        }
+                    }
+                }
+            }
+
+            pendingRowIds.forEach(storage::abortWrite);
+
+            onReplication.run();
+
+            return null;
+        });
+    }
+
+    private void removeFromIndex(TableRow row, RowId rowId) {
+        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
+            index.remove(row, rowId);
+        }
+    }
+
     private void addToIndexes(@Nullable TableRow tableRow, RowId rowId) {
         if (tableRow == null) { // skip removes
             return;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index be0578d8f8..dc661a5f00 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -81,8 +81,8 @@ public class TableSchemaAwareIndexStorage {
      * @param tableRow A table row to remove.
      * @param rowId An identifier of a row in a main storage.
      */
-    public void remove(BinaryRow tableRow, RowId rowId) {
-        BinaryTuple tuple = indexBinaryRowResolver.apply(tableRow);
+    public void remove(TableRow tableRow, RowId rowId) {
+        BinaryTuple tuple = indexTableRowResolver.apply(tableRow);
 
         storage.remove(new IndexRowImpl(tuple, rowId));
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index be097690b9..b008493968 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -25,9 +25,11 @@ import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -162,6 +164,17 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      */
     void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException;
 
+    /**
+     * Scans all versions of a single row.
+     *
+     * <p>{@link ReadResult#newestCommitTimestamp()} is NOT filled by this 
method for intents having preceding committed
+     * versions.
+     *
+     * @param rowId Row id.
+     * @return Cursor of results including both rows data and 
transaction-related context. The versions are ordered from newest to oldest.
+     */
+    Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
+
     /**
      * Returns the underlying {@link MvPartitionStorage}. Only for tests!
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9514245eb7..8b8a33fd43 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -313,23 +313,28 @@ public class PartitionListener implements 
RaftGroupListener {
             return;
         }
 
-        storage.runConsistently(() -> {
-            UUID txId = cmd.txId();
+        UUID txId = cmd.txId();
 
-            Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, 
Collections.emptySet());
+        Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, 
Collections.emptySet());
 
-            if (cmd.commit()) {
+        if (cmd.commit()) {
+            storage.runConsistently(() -> {
                 pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, 
cmd.commitTimestamp().asHybridTimestamp()));
-            } else {
-                pendingRowIds.forEach(storage::abortWrite);
-            }
 
-            txsPendingRowIds.remove(txId);
+                txsPendingRowIds.remove(txId);
 
-            storage.lastApplied(commandIndex, commandTerm);
+                storage.lastApplied(commandIndex, commandTerm);
 
-            return null;
-        });
+                return null;
+            });
+        } else {
+            storageUpdateHandler.handleTransactionAbortion(pendingRowIds, () 
-> {
+                // on replication callback
+                txsPendingRowIds.remove(txId);
+
+                storage.lastApplied(commandIndex, commandTerm);
+            });
+        }
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 2376e93b2b..7af2db5260 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -24,11 +24,13 @@ import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -129,6 +131,13 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
         partitionStorage.commitWrite(rowId, timestamp);
     }
 
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
+        handleSnapshotInterference(rowId);
+
+        return partitionStorage.scanVersions(rowId);
+    }
+
     private void handleSnapshotInterference(RowId rowId) {
         PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
new file mode 100644
index 0000000000..f7447f923e
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.TableRow;
+import org.apache.ignite.internal.schema.TableRowConverter;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.IgniteException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests indexes cleaning up on data removal and transaction abortions. */
+public class IndexCleanupTest {
+    /** Default reflection marshaller factory. */
+    private static final MarshallerFactory MARSHALLER_FACTORY = new 
ReflectionMarshallerFactory();
+
+    private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new 
SchemaDescriptor(1, new Column[]{
+            new Column("INTKEY", NativeTypes.INT32, false),
+            new Column("STRKEY", NativeTypes.STRING, false),
+    }, new Column[]{
+            new Column("INTVAL", NativeTypes.INT32, false),
+            new Column("STRVAL", NativeTypes.STRING, false),
+    });
+
+    private static final BinaryTupleSchema TUPLE_SCHEMA = 
BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryTupleSchema PK_INDEX_SCHEMA = 
BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryConverter PK_INDEX_BINARY_ROW_CONVERTER = 
BinaryConverter.forKey(SCHEMA_DESCRIPTOR);
+    private static final TableRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = 
new TableRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
+
+    private static final int[] USER_INDEX_COLS = {
+            SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(),
+            SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex()
+    };
+
+    private static final BinaryTupleSchema USER_INDEX_SCHEMA = 
BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS);
+
+    private static final BinaryConverter USER_INDEX_BINARY_ROW_CONVERTER = 
BinaryConverter.forValue(SCHEMA_DESCRIPTOR);
+    private static final TableRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER = 
new TableRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
+
+    /** Key-value {@link BinaryTuple} converter for tests. */
+    private static final BinaryConverter KV_BINARY_CONVERTER = 
BinaryConverter.forRow(SCHEMA_DESCRIPTOR);
+
+    /** Key-value marshaller for tests. */
+    private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER
+            = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class, 
TestValue.class);
+
+    private static final UUID TX_ID = UUID.randomUUID();
+
+    private static final HybridClock CLOCK = new HybridClockImpl();
+
+    private TestHashIndexStorage pkInnerStorage;
+    private TestSortedIndexStorage sortedInnerStorage;
+    private TestHashIndexStorage hashInnerStorage;
+    private TestMvPartitionStorage storage;
+    private TestPartitionDataStorage dataStorage;
+    private StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp() {
+        UUID pkIndexId = UUID.randomUUID();
+        UUID sortedIndexId = UUID.randomUUID();
+        UUID hashIndexId = UUID.randomUUID();
+
+        pkInnerStorage = new TestHashIndexStorage(null);
+
+        TableSchemaAwareIndexStorage pkStorage = new 
TableSchemaAwareIndexStorage(
+                pkIndexId,
+                pkInnerStorage,
+                PK_INDEX_BINARY_ROW_CONVERTER::toTuple,
+                PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        sortedInnerStorage = new TestSortedIndexStorage(new 
SortedIndexDescriptor(sortedIndexId, List.of(
+                new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false, true),
+                new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false, true)
+        )));
+
+        TableSchemaAwareIndexStorage sortedIndexStorage = new 
TableSchemaAwareIndexStorage(
+                sortedIndexId,
+                sortedInnerStorage,
+                USER_INDEX_BINARY_ROW_CONVERTER::toTuple,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        hashInnerStorage = new TestHashIndexStorage(new 
HashIndexDescriptor(hashIndexId, List.of(
+                new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false),
+                new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false)
+        )));
+
+        TableSchemaAwareIndexStorage hashIndexStorage = new 
TableSchemaAwareIndexStorage(
+                hashIndexId,
+                hashInnerStorage,
+                USER_INDEX_BINARY_ROW_CONVERTER::toTuple,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        storage = new TestMvPartitionStorage(1);
+
+        dataStorage = new TestPartitionDataStorage(storage);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, dataStorage,
+                () -> Map.of(
+                        pkIndexId, pkStorage,
+                        sortedIndexId, sortedIndexStorage,
+                        hashIndexId, hashIndexStorage
+                )
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbort(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRows(), contains(rowId));
+        assertThat(sortedInnerStorage.allRows(), contains(rowId));
+        assertThat(hashInnerStorage.allRows(), contains(rowId));
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(0, storage.rowsCount());
+        assertTrue(pkInnerStorage.allRows().isEmpty());
+        assertTrue(sortedInnerStorage.allRows().isEmpty());
+        assertTrue(hashInnerStorage.allRows().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testTombstoneCleansUpIndexes(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        // Write intent is in the storage.
+        assertEquals(1, storage.rowsCount());
+
+        // But indexes are removed.
+        assertTrue(pkInnerStorage.allRows().isEmpty());
+        assertTrue(sortedInnerStorage.allRows().isEmpty());
+        assertTrue(hashInnerStorage.allRows().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbortTombstone(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(0, storage.rowsCount());
+        assertTrue(pkInnerStorage.allRows().isEmpty());
+        assertTrue(sortedInnerStorage.allRows().isEmpty());
+        assertTrue(hashInnerStorage.allRows().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbortConsecutiveTxWithMatchingIndexes(AddWrite writer) {
+        UUID rowUuid1 = UUID.randomUUID();
+        UUID rowUuid2 = UUID.randomUUID();
+        RowId rowId1 = new RowId(1, rowUuid1);
+        RowId rowId2 = new RowId(1, rowUuid2);
+
+        var key1 = new TestKey(1, "foo");
+        var key2 = new TestKey(2, "baz");
+        var value = new TestValue(2, "bar");
+
+        writer.addWrite(storageUpdateHandler, rowUuid1, tableRow(key1, value));
+        commitWrite(rowId1);
+
+        writer.addWrite(storageUpdateHandler, rowUuid2, tableRow(key2, value));
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId2), () -> 
{});
+
+        assertEquals(1, storage.rowsCount());
+
+        Set<RowId> pkRows = pkInnerStorage.allRows();
+        Set<RowId> sortedRows = sortedInnerStorage.allRows();
+        Set<RowId> hashRows = hashInnerStorage.allRows();
+
+        assertThat(pkRows, contains(rowId1));
+        assertThat(sortedRows, contains(rowId1));
+        assertThat(hashRows, contains(rowId1));
+
+        assertThat(pkRows, not(contains(rowId2)));
+        assertThat(sortedRows, not(contains(rowId2)));
+        assertThat(hashRows, not(contains(rowId2)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testIndexNotRemovedOnTombstone(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        TableRow tableRow = tableRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRows(), contains(rowId));
+        assertThat(sortedInnerStorage.allRows(), contains(rowId));
+        assertThat(hashInnerStorage.allRows(), contains(rowId));
+    }
+
+    /** Enum that encapsulates update API. */
+    enum AddWrite {
+        /** Uses update api. */
+        USE_UPDATE {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable TableRow row) {
+                handler.handleUpdate(
+                        TX_ID,
+                        rowUuid,
+                        partitionId,
+                        row == null ? null : row.byteBuffer(),
+                        (unused) -> {}
+                );
+            }
+        },
+        /** Uses updateAll api. */
+        USE_UPDATE_ALL {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable TableRow row) {
+                handler.handleUpdateAll(
+                        TX_ID,
+                        singletonMap(rowUuid, row == null ? null : 
row.byteBuffer()),
+                        partitionId,
+                        (unused) -> {}
+                );
+            }
+        };
+
+        void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
TableRow row) {
+            TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+            addWrite(handler, tablePartitionId, rowUuid, row);
+        }
+
+        abstract void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable TableRow row);
+    }
+
+    private static BinaryRow binaryRow(TestKey key, TestValue value) {
+        try {
+            return KV_MARSHALLER.marshal(key, value);
+        } catch (MarshallerException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    private static TableRow tableRow(TestKey key, TestValue value) {
+        return TableRowConverter.fromBinaryRow(binaryRow(key, value), 
KV_BINARY_CONVERTER);
+    }
+
+    private void commitWrite(RowId rowId) {
+        storage.runConsistently(() -> {
+            storage.commitWrite(rowId, CLOCK.now());
+
+            return null;
+        });
+    }
+
+    private static class TestKey {
+        int intKey;
+
+        String strKey;
+
+        TestKey() {
+        }
+
+        TestKey(int intKey, String strKey) {
+            this.intKey = intKey;
+            this.strKey = strKey;
+        }
+    }
+
+    private static class TestValue {
+        Integer intVal;
+
+        String strVal;
+
+        TestValue() {
+        }
+
+        TestValue(Integer intVal, String strVal) {
+            this.intVal = intVal;
+            this.strVal = strVal;
+        }
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 0c937c1f4b..b5ecfd78ba 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -26,10 +26,12 @@ import org.apache.ignite.internal.schema.TableRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -106,6 +108,11 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
         partitionStorage.commitWrite(rowId, timestamp);
     }
 
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
+        return partitionStorage.scanVersions(rowId);
+    }
+
     @Override
     public MvPartitionStorage getStorage() {
         return partitionStorage;

Reply via email to