This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 26b90aef12 IGNITE-18738 Cleanup indexes on transaction abort (#1657)
26b90aef12 is described below

commit 26b90aef128936f2b479f9195242a5c203b72805
Author: Semyon Danilov <[email protected]>
AuthorDate: Fri Feb 10 12:35:38 2023 +0400

    IGNITE-18738 Cleanup indexes on transaction abort (#1657)
---
 .../storage/index/impl/TestHashIndexStorage.java   |   8 +
 .../storage/index/impl/TestSortedIndexStorage.java |   9 +
 .../table/distributed/StorageUpdateHandler.java    | 131 ++++++-
 .../distributed/TableSchemaAwareIndexStorage.java  |  10 +
 .../distributed/raft/PartitionDataStorage.java     |  13 +
 .../table/distributed/raft/PartitionListener.java  |  27 +-
 .../SnapshotAwarePartitionDataStorage.java         |   9 +
 .../table/distributed/IndexCleanupTest.java        | 431 +++++++++++++++++++++
 .../replication/PartitionReplicaListenerTest.java  |  24 +-
 .../distributed/TestPartitionDataStorage.java      |   7 +
 10 files changed, 646 insertions(+), 23 deletions(-)

diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index 9dbdf16928..49563660e2 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageClosedException;
@@ -197,4 +198,11 @@ public class TestHashIndexStorage implements 
HashIndexStorage {
 
         rebalance = false;
     }
+
+    /**
+     * Returns all indexed row ids.
+     */
+    public Set<RowId> allRowsIds() {
+        return 
index.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 9d15eabdc2..b23297eb2a 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -25,8 +25,10 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -371,4 +373,11 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
 
         rebalance = false;
     }
+
+    /**
+     * Returns all indexed row ids.
+     */
+    public Set<RowId> allRowsIds() {
+        return index.values().stream().flatMap(m -> 
m.keySet().stream()).collect(Collectors.toSet());
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 377552392d..3bdc0aff09 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -24,14 +24,18 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -72,7 +76,7 @@ public class StorageUpdateHandler {
             UUID txId,
             UUID rowUuid,
             TablePartitionId commitPartitionId,
-            ByteBuffer rowBuffer,
+            @Nullable ByteBuffer rowBuffer,
             @Nullable Consumer<RowId> onReplication
     ) {
         storage.runConsistently(() -> {
@@ -81,7 +85,12 @@ public class StorageUpdateHandler {
             UUID commitTblId = commitPartitionId.tableId();
             int commitPartId = commitPartitionId.partitionId();
 
-            storage.addWrite(rowId, row, txId, commitTblId, commitPartId);
+            BinaryRow oldRow = storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
+
+            if (oldRow != null) {
+                // Previous uncommitted row should be removed from indexes.
+                tryRemovePreviousWritesIndex(rowId, oldRow);
+            }
 
             if (onReplication != null) {
                 onReplication.accept(rowId);
@@ -118,7 +127,12 @@ public class StorageUpdateHandler {
                     RowId rowId = new RowId(partitionId, entry.getKey());
                     BinaryRow row = entry.getValue() != null ? new 
ByteBufferRow(entry.getValue()) : null;
 
-                    storage.addWrite(rowId, row, txId, commitTblId, 
commitPartId);
+                    BinaryRow oldRow = storage.addWrite(rowId, row, txId, 
commitTblId, commitPartId);
+
+                    if (oldRow != null) {
+                        // Previous uncommitted row should be removed from 
indexes.
+                        tryRemovePreviousWritesIndex(rowId, oldRow);
+                    }
 
                     rowIds.add(rowId);
                     addToIndexes(row, rowId);
@@ -133,6 +147,117 @@ public class StorageUpdateHandler {
         });
     }
 
+    /**
+     * Tries to remove a previous write from index.
+     *
+     * @param rowId Row id.
+     * @param previousRow Previous write value.
+     */
+    private void tryRemovePreviousWritesIndex(RowId rowId, BinaryRow 
previousRow) {
+        try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+            if (!cursor.hasNext()) {
+                return;
+            }
+
+            tryRemoveFromIndexes(previousRow, rowId, cursor);
+        }
+    }
+
+    /**
+     * Handles the abortion of a transaction.
+     *
+     * @param pendingRowIds Row ids of write-intents to be rolled back.
+     * @param onReplication On replication callback.
+     */
+    public void handleTransactionAbortion(Set<RowId> pendingRowIds, Runnable 
onReplication) {
+        storage.runConsistently(() -> {
+            for (RowId rowId : pendingRowIds) {
+                try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+                    if (!cursor.hasNext()) {
+                        continue;
+                    }
+
+                    ReadResult item = cursor.next();
+
+                    assert item.isWriteIntent();
+
+                    BinaryRow rowToRemove = item.binaryRow();
+
+                    if (rowToRemove == null) {
+                        continue;
+                    }
+
+                    tryRemoveFromIndexes(rowToRemove, rowId, cursor);
+                }
+            }
+
+            pendingRowIds.forEach(storage::abortWrite);
+
+            onReplication.run();
+
+            return null;
+        });
+    }
+
+    /**
+     * Tries removing indexed row from every index.
+     * Removes the row only if no previous value's index matches index of the 
row to remove, because if it matches, then the index
+     * might still be in use.
+     *
+     * @param rowToRemove Row to remove from indexes.
+     * @param rowId Row id.
+     * @param previousValues Cursor with previous version of the row.
+     */
+    private void tryRemoveFromIndexes(BinaryRow rowToRemove, RowId rowId, 
Cursor<ReadResult> previousValues) {
+        TableSchemaAwareIndexStorage[] indexes = 
this.indexes.get().values().toArray(new TableSchemaAwareIndexStorage[0]);
+
+        ByteBuffer[] indexValues = new ByteBuffer[indexes.length];
+
+        // Precalculate value for every index.
+        for (int i = 0; i < indexes.length; i++) {
+            TableSchemaAwareIndexStorage index = indexes[i];
+
+            indexValues[i] = index.resolveIndexRow(rowToRemove).byteBuffer();
+        }
+
+        while (previousValues.hasNext()) {
+            ReadResult previousVersion = previousValues.next();
+
+            BinaryRow previousRow = previousVersion.binaryRow();
+
+            // No point in cleaning up indexes for tombstone, they should not 
exist.
+            if (previousRow != null) {
+                for (int i = 0; i < indexes.length; i++) {
+                    TableSchemaAwareIndexStorage index = indexes[i];
+
+                    if (index == null) {
+                        continue;
+                    }
+
+                    // If any of the previous versions' index value matches 
the index value of
+                    // the row to remove, then we can't remove that index as 
it can still be used.
+                    BinaryTuple previousRowIndex = 
index.resolveIndexRow(previousRow);
+
+                    if (indexValues[i].equals(previousRowIndex.byteBuffer())) {
+                        indexes[i] = null;
+                    }
+                }
+            }
+        }
+
+        for (TableSchemaAwareIndexStorage index : indexes) {
+            if (index != null) {
+                index.remove(rowToRemove, rowId);
+            }
+        }
+    }
+
+    private void removeFromIndex(BinaryRow row, RowId rowId) {
+        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
+            index.remove(row, rowId);
+        }
+    }
+
     private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
         if (binaryRow == null) { // skip removes
             return;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index b42ff952c2..c0a530bec6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -83,6 +83,16 @@ public class TableSchemaAwareIndexStorage {
         storage.remove(new IndexRowImpl(tuple, rowId));
     }
 
+    /**
+     * Resolves index row value.
+     *
+     * @param row Full row.
+     * @return Index value.
+     */
+    public BinaryTuple resolveIndexRow(BinaryRow row) {
+        return indexRowResolver.apply(row);
+    }
+
     /** Returns underlying index storage. */
     public IndexStorage storage() {
         return storage;
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index 74b87e0d76..5b9ac5b6e1 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -25,9 +25,11 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -162,6 +164,17 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      */
     void commitWrite(RowId rowId, HybridTimestamp timestamp) throws 
StorageException;
 
+    /**
+     * Scans all versions of a single row.
+     *
+     * <p>{@link ReadResult#newestCommitTimestamp()} is NOT filled by this 
method for intents having preceding committed
+     * versions.
+     *
+     * @param rowId Row id.
+     * @return Cursor of results including both rows data and 
transaction-related context. The versions are ordered from newest to oldest.
+     */
+    Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
+
     /**
      * Returns the underlying {@link MvPartitionStorage}. Only for tests!
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 9514245eb7..8b8a33fd43 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -313,23 +313,28 @@ public class PartitionListener implements 
RaftGroupListener {
             return;
         }
 
-        storage.runConsistently(() -> {
-            UUID txId = cmd.txId();
+        UUID txId = cmd.txId();
 
-            Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, 
Collections.emptySet());
+        Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, 
Collections.emptySet());
 
-            if (cmd.commit()) {
+        if (cmd.commit()) {
+            storage.runConsistently(() -> {
                 pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, 
cmd.commitTimestamp().asHybridTimestamp()));
-            } else {
-                pendingRowIds.forEach(storage::abortWrite);
-            }
 
-            txsPendingRowIds.remove(txId);
+                txsPendingRowIds.remove(txId);
 
-            storage.lastApplied(commandIndex, commandTerm);
+                storage.lastApplied(commandIndex, commandTerm);
 
-            return null;
-        });
+                return null;
+            });
+        } else {
+            storageUpdateHandler.handleTransactionAbortion(pendingRowIds, () 
-> {
+                // on replication callback
+                txsPendingRowIds.remove(txId);
+
+                storage.lastApplied(commandIndex, commandTerm);
+            });
+        }
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 4b717cb119..a5bf3601d9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -24,11 +24,13 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
 import org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionKey;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -129,6 +131,13 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
         partitionStorage.commitWrite(rowId, timestamp);
     }
 
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
+        handleSnapshotInterference(rowId);
+
+        return partitionStorage.scanVersions(rowId);
+    }
+
     private void handleSnapshotInterference(RowId rowId) {
         PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
new file mode 100644
index 0000000000..44ba9a280d
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests indexes cleaning up on data removal and transaction abortions. */
+public class IndexCleanupTest {
+    /** Default reflection marshaller factory. */
+    private static final MarshallerFactory MARSHALLER_FACTORY = new 
ReflectionMarshallerFactory();
+
+    private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new 
SchemaDescriptor(1, new Column[]{
+            new Column("INTKEY", NativeTypes.INT32, false),
+            new Column("STRKEY", NativeTypes.STRING, false),
+    }, new Column[]{
+            new Column("INTVAL", NativeTypes.INT32, false),
+            new Column("STRVAL", NativeTypes.STRING, false),
+    });
+
+    private static final BinaryTupleSchema TUPLE_SCHEMA = 
BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryTupleSchema PK_INDEX_SCHEMA = 
BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = 
new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
+
+    private static final int[] USER_INDEX_COLS = {
+            SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(),
+            SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex()
+    };
+
+    private static final BinaryTupleSchema USER_INDEX_SCHEMA = 
BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS);
+
+    private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER 
= new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
+
+    /** Key-value marshaller for tests. */
+    private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER
+            = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class, 
TestValue.class);
+
+    private static final UUID TX_ID = UUID.randomUUID();
+
+    private static final HybridClock CLOCK = new HybridClockImpl();
+
+    private TestHashIndexStorage pkInnerStorage;
+    private TestSortedIndexStorage sortedInnerStorage;
+    private TestHashIndexStorage hashInnerStorage;
+    private TestMvPartitionStorage storage;
+    private StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp() {
+        UUID pkIndexId = UUID.randomUUID();
+        UUID sortedIndexId = UUID.randomUUID();
+        UUID hashIndexId = UUID.randomUUID();
+
+        pkInnerStorage = new TestHashIndexStorage(null);
+
+        TableSchemaAwareIndexStorage pkStorage = new 
TableSchemaAwareIndexStorage(
+                pkIndexId,
+                pkInnerStorage,
+                PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        sortedInnerStorage = new TestSortedIndexStorage(new 
SortedIndexDescriptor(sortedIndexId, List.of(
+                new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false, true),
+                new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false, true)
+        )));
+
+        TableSchemaAwareIndexStorage sortedIndexStorage = new 
TableSchemaAwareIndexStorage(
+                sortedIndexId,
+                sortedInnerStorage,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        hashInnerStorage = new TestHashIndexStorage(new 
HashIndexDescriptor(hashIndexId, List.of(
+                new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false),
+                new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false)
+        )));
+
+        TableSchemaAwareIndexStorage hashIndexStorage = new 
TableSchemaAwareIndexStorage(
+                hashIndexId,
+                hashInnerStorage,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        storage = new TestMvPartitionStorage(1);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage),
+                () -> Map.of(
+                        pkIndexId, pkStorage,
+                        sortedIndexId, sortedIndexStorage,
+                        hashIndexId, hashIndexStorage
+                )
+        );
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbort(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(0, storage.rowsCount());
+        assertTrue(pkInnerStorage.allRowsIds().isEmpty());
+        assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
+        assertTrue(hashInnerStorage.allRowsIds().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testTombstoneCleansUpIndexes(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+        // Write intent is in the storage.
+        assertEquals(1, storage.rowsCount());
+
+        // Indexes are already in the storage.
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        // Write intent is in the storage.
+        assertEquals(1, storage.rowsCount());
+
+        // But indexes are removed.
+        assertTrue(pkInnerStorage.allRowsIds().isEmpty());
+        assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
+        assertTrue(hashInnerStorage.allRowsIds().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbortTombstone(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(0, storage.rowsCount());
+        assertTrue(pkInnerStorage.allRowsIds().isEmpty());
+        assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
+        assertTrue(hashInnerStorage.allRowsIds().isEmpty());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testAbortConsecutiveTxWithMatchingIndexes(AddWrite writer) {
+        UUID rowUuid1 = UUID.randomUUID();
+        UUID rowUuid2 = UUID.randomUUID();
+        RowId rowId1 = new RowId(1, rowUuid1);
+        RowId rowId2 = new RowId(1, rowUuid2);
+
+        var key1 = new TestKey(1, "foo");
+        var key2 = new TestKey(2, "baz");
+        var value = new TestValue(2, "bar");
+
+        writer.addWrite(storageUpdateHandler, rowUuid1, binaryRow(key1, 
value));
+        commitWrite(rowId1);
+
+        writer.addWrite(storageUpdateHandler, rowUuid2, binaryRow(key2, 
value));
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId2), () -> 
{});
+
+        assertEquals(1, storage.rowsCount());
+
+        Set<RowId> pkRows = pkInnerStorage.allRowsIds();
+        Set<RowId> sortedRows = sortedInnerStorage.allRowsIds();
+        Set<RowId> hashRows = hashInnerStorage.allRowsIds();
+
+        assertThat(pkRows, contains(rowId1));
+        assertThat(sortedRows, contains(rowId1));
+        assertThat(hashRows, contains(rowId1));
+
+        assertThat(pkRows, not(contains(rowId2)));
+        assertThat(sortedRows, not(contains(rowId2)));
+        assertThat(hashRows, not(contains(rowId2)));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testIndexNotRemovedOnTombstone(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testIndexNotRemovedWhileAbortingIfPreviousVersionExists(AddWrite 
writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testIndexNotRemovedWhileWritingIfPreviousVersionExists(AddWrite 
writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void 
testIndexNotRemovedWhileWritingIfMultiplePreviousVersionsExists(AddWrite 
writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, null);
+
+        assertEquals(1, storage.rowsCount());
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+    }
+
+    /** Enum that encapsulates update API. */
+    enum AddWrite {
+        /** Uses update api. */
+        USE_UPDATE {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+                handler.handleUpdate(
+                        TX_ID,
+                        rowUuid,
+                        partitionId,
+                        row == null ? null : row.byteBuffer(),
+                        (unused) -> {}
+                );
+            }
+        },
+        /** Uses updateAll api. */
+        USE_UPDATE_ALL {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+                handler.handleUpdateAll(
+                        TX_ID,
+                        singletonMap(rowUuid, row == null ? null : 
row.byteBuffer()),
+                        partitionId,
+                        (unused) -> {}
+                );
+            }
+        };
+
+        void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row) {
+            TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+            addWrite(handler, tablePartitionId, rowUuid, row);
+        }
+
+        abstract void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row);
+    }
+
+    private static BinaryRow binaryRow(TestKey key, TestValue value) {
+        try {
+            return KV_MARSHALLER.marshal(key, value);
+        } catch (MarshallerException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void commitWrite(RowId rowId) {
+        storage.runConsistently(() -> {
+            storage.commitWrite(rowId, CLOCK.now());
+
+            return null;
+        });
+    }
+
+    private static class TestKey {
+        int intKey;
+
+        String strKey;
+
+        TestKey() {
+        }
+
+        TestKey(int intKey, String strKey) {
+            this.intKey = intKey;
+            this.strKey = strKey;
+        }
+    }
+
+    private static class TestValue {
+        Integer intVal;
+
+        String strVal;
+
+        TestValue() {
+        }
+
+        TestValue(Integer intVal, String strVal) {
+            this.intVal = intVal;
+            this.strVal = strVal;
+        }
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 5997e3be94..12c6b00d9f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -821,7 +821,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         checkRowInMvStorage(br, true);
 
         doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE);
-        checkRowInMvStorage(binaryRow(0), false);
+        checkNoRowInIndex(binaryRow(0));
 
         doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
         checkRowInMvStorage(binaryRow(0), true);
@@ -835,12 +835,12 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         checkRowInMvStorage(br, true);
 
         doSingleRowRequest(txId, br, RequestType.RW_GET_AND_DELETE);
-        checkRowInMvStorage(br, false);
+        checkNoRowInIndex(br);
 
         doSingleRowRequest(txId, binaryRow(0), RequestType.RW_INSERT);
         checkRowInMvStorage(binaryRow(0), true);
         doSingleRowRequest(txId, binaryRow(0), RequestType.RW_DELETE_EXACT);
-        checkRowInMvStorage(binaryRow(0), false);
+        checkNoRowInIndex(binaryRow(0));
 
         cleanup(txId);
     }
@@ -866,17 +866,17 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         checkRowInMvStorage(newRow1, true);
 
         doMultiRowRequest(txId, newRows, RequestType.RW_DELETE_ALL);
-        checkRowInMvStorage(row0, false);
-        checkRowInMvStorage(row1, false);
-        checkRowInMvStorage(newRow0, false);
-        checkRowInMvStorage(newRow1, false);
+        checkNoRowInIndex(row0);
+        checkNoRowInIndex(row1);
+        checkNoRowInIndex(newRow0);
+        checkNoRowInIndex(newRow1);
 
         doMultiRowRequest(txId, rows, RequestType.RW_INSERT_ALL);
         checkRowInMvStorage(row0, true);
         checkRowInMvStorage(row1, true);
         doMultiRowRequest(txId, rows, RequestType.RW_DELETE_EXACT_ALL);
-        checkRowInMvStorage(row0, false);
-        checkRowInMvStorage(row1, false);
+        checkNoRowInIndex(row0);
+        checkNoRowInIndex(row1);
 
         cleanup(txId);
     }
@@ -980,6 +980,12 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
         }
     }
 
+    private void checkNoRowInIndex(BinaryRow binaryRow) {
+        try (Cursor<RowId> cursor = pkStorage.get().get(binaryRow)) {
+            assertFalse(cursor.hasNext());
+        }
+    }
+
     private void testWriteIntentOnPrimaryReplica(
             UUID txId,
             Supplier<ReadWriteReplicaRequest> updatingRequestSupplier,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index dec8b7d67f..3391eff013 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -26,10 +26,12 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
+import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -106,6 +108,11 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
         partitionStorage.commitWrite(rowId, timestamp);
     }
 
+    @Override
+    public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
+        return partitionStorage.scanVersions(rowId);
+    }
+
     @Override
     public MvPartitionStorage getStorage() {
         return partitionStorage;


Reply via email to