This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new af6aadea04 IGNITE-18739 Add index garbage collection (#1706)
af6aadea04 is described below

commit af6aadea040259f86c3bf659ab6d5e38fd264e69
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu Feb 23 15:46:51 2023 +0400

    IGNITE-18739 Add index garbage collection (#1706)
---
 .../internal/storage/MvPartitionStorage.java       |   3 +-
 .../storage/AbstractMvPartitionStorageGcTest.java  |  15 ++
 .../internal/storage/rocksdb/GarbageCollector.java |   2 +
 .../table/distributed/StorageUpdateHandler.java    |  37 ++-
 .../distributed/TableSchemaAwareIndexStorage.java  |   4 +-
 .../distributed/raft/PartitionDataStorage.java     |   8 +
 .../SnapshotAwarePartitionDataStorage.java         |  18 +-
 .../internal/table/distributed/IndexBaseTest.java  | 228 +++++++++++++++++
 .../table/distributed/IndexCleanupTest.java        | 269 ++++-----------------
 .../internal/table/distributed/IndexGcTest.java    | 164 +++++++++++++
 .../distributed/TestPartitionDataStorage.java      |   6 +
 11 files changed, 518 insertions(+), 236 deletions(-)

diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 5bf5b9b9d3..de3e533078 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -220,7 +220,8 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
     /**
      * Polls the oldest row in the partition, removing it at the same time.
      *
-     * @param lowWatermark A time threshold for the row. Rows younger then the 
watermark value will not be removed.
+     * @param lowWatermark A time threshold for the row. Only rows that have 
versions with timestamp higher or equal to the watermark
+     *      can be removed.
      * @return A pair of table row and row id, where a timestamp of the row is 
less than or equal to {@code lowWatermark}.
      *      {@code null} if there's no such value.
      * @throws StorageException If failed to poll element for vacuum.
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
index c0c3abff25..f98e1822f3 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -129,4 +130,18 @@ public abstract class AbstractMvPartitionStorageGcTest 
extends BaseMvPartitionSt
         // Nothing else to poll.
         assertNull(pollForVacuum(lowWatermark));
     }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18882";)
+    void testVacuumsSecondRowIfTombstoneIsFirst() {
+        addAndCommit(null);
+
+        addAndCommit(TABLE_ROW);
+
+        addAndCommit(TABLE_ROW2);
+
+        BinaryRowAndRowId row = pollForVacuum(HybridTimestamp.MAX_VALUE);
+
+        assertRowMatches(row.binaryRow(), TABLE_ROW);
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index e5bb63e3a8..d150deac98 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -235,6 +235,8 @@ class GarbageCollector {
                 // At this point there's definitely a value that needs to be 
garbage collected in the iterator.
                 byte[] valueBytes = it.value();
 
+                assert valueBytes.length > 0; // Can't be a tombstone.
+
                 var row = new 
ByteBufferRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
                 BinaryRowAndRowId retVal = new BinaryRowAndRowId(row, 
gcRowVersion.getRowId());
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 3bdc0aff09..bad1f2f50a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -28,9 +28,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -234,7 +237,7 @@ public class StorageUpdateHandler {
                         continue;
                     }
 
-                    // If any of the previous versions' index value matches 
the index value of
+                    // If any of the previous versions' index value equals the 
index value of
                     // the row to remove, then we can't remove that index as 
it can still be used.
                     BinaryTuple previousRowIndex = 
index.resolveIndexRow(previousRow);
 
@@ -252,10 +255,34 @@ public class StorageUpdateHandler {
         }
     }
 
-    private void removeFromIndex(BinaryRow row, RowId rowId) {
-        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
-            index.remove(row, rowId);
-        }
+    /**
+     * Tries removing partition's oldest stale entry and its indexes.
+     *
+     * @param lowWatermark Low watermark for the vacuum.
+     * @return {@code true} if an entry was garbage collected, {@code false} 
if there was nothing to collect.
+     * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
+     */
+    public boolean vacuum(HybridTimestamp lowWatermark) {
+        return storage.runConsistently(() -> {
+            BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
+
+            if (vacuumed == null) {
+                // Nothing was garbage collected.
+                return false;
+            }
+
+            BinaryRow binaryRow = vacuumed.binaryRow();
+
+            assert binaryRow != null;
+
+            RowId rowId = vacuumed.rowId();
+
+            try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+                tryRemoveFromIndexes(binaryRow, rowId, cursor);
+            }
+
+            return true;
+        });
     }
 
     private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index c0a530bec6..fd1cd8ae1f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -87,9 +87,9 @@ public class TableSchemaAwareIndexStorage {
      * Resolves index row value.
      *
      * @param row Full row.
-     * @return Index value.
+     * @return A tuple that represents indexed columns of a row.
      */
-    public BinaryTuple resolveIndexRow(BinaryRow row) {
+    BinaryTuple resolveIndexRow(BinaryRow row) {
         return indexRowResolver.apply(row);
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index eacb234335..aa6347bb89 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -165,6 +166,13 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      */
     Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
 
+    /**
+     * Tries to garbage collect the oldest stale entry of the partition.
+     *
+     * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
+     */
+    @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark);
+
     /**
      * Returns the underlying {@link MvPartitionStorage}. Only for tests!
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 7f470d0bc8..f6458d5579 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -131,11 +132,21 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
 
     @Override
     public Cursor<ReadResult> scanVersions(RowId rowId) throws 
StorageException {
-        handleSnapshotInterference(rowId);
-
         return partitionStorage.scanVersions(rowId);
     }
 
+    @Override
+    public @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return partitionStorage.pollForVacuum(lowWatermark);
+    }
+
+    /**
+     * Handles the situation when snapshots are running concurrently with 
write operations.
+     * In case if a row that is going to be changed was not yet sent in an 
ongoing snapshot,
+     * schedule an out-of-order sending of said row.
+     *
+     * @param rowId Row id.
+     */
     private void handleSnapshotInterference(RowId rowId) {
         PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
 
@@ -144,13 +155,16 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
 
             try {
                 if (snapshot.alreadyPassed(rowId)) {
+                    // Row already sent.
                     continue;
                 }
 
                 if (!snapshot.addRowIdToSkip(rowId)) {
+                    // Already scheduled.
                     continue;
                 }
 
+                // Collect all versions of row and schedule the send operation.
                 snapshot.enqueueForSending(rowId);
             } finally {
                 snapshot.releaseMvLock();
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
new file mode 100644
index 0000000000..cf1e032f9d
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Base test for indexes. Sets up a table with (int, string) key and (int, 
string) value and
+ * three indexes: primary key, hash index over value columns and sorted index 
over value columns.
+ */
+public abstract class IndexBaseTest extends BaseMvStoragesTest {
+    private static final BinaryTupleSchema TUPLE_SCHEMA = 
BinaryTupleSchema.createRowSchema(schemaDescriptor);
+
+    private static final BinaryTupleSchema PK_INDEX_SCHEMA = 
BinaryTupleSchema.createKeySchema(schemaDescriptor);
+
+    private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = 
new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
+
+    private static final int[] USER_INDEX_COLS = {
+            schemaDescriptor.column("INTVAL").schemaIndex(),
+            schemaDescriptor.column("STRVAL").schemaIndex()
+    };
+
+    private static final BinaryTupleSchema USER_INDEX_SCHEMA = 
BinaryTupleSchema.createSchema(schemaDescriptor, USER_INDEX_COLS);
+
+    private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER 
= new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
+
+    private static final UUID TX_ID = UUID.randomUUID();
+
+    TestHashIndexStorage pkInnerStorage;
+    TestSortedIndexStorage sortedInnerStorage;
+    TestHashIndexStorage hashInnerStorage;
+    TestMvPartitionStorage storage;
+    StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp() {
+        UUID pkIndexId = UUID.randomUUID();
+        UUID sortedIndexId = UUID.randomUUID();
+        UUID hashIndexId = UUID.randomUUID();
+
+        pkInnerStorage = new TestHashIndexStorage(null);
+
+        TableSchemaAwareIndexStorage pkStorage = new 
TableSchemaAwareIndexStorage(
+                pkIndexId,
+                pkInnerStorage,
+                PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        sortedInnerStorage = new TestSortedIndexStorage(new 
SortedIndexDescriptor(sortedIndexId, List.of(
+                new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false, true),
+                new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false, true)
+        )));
+
+        TableSchemaAwareIndexStorage sortedIndexStorage = new 
TableSchemaAwareIndexStorage(
+                sortedIndexId,
+                sortedInnerStorage,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        hashInnerStorage = new TestHashIndexStorage(new 
HashIndexDescriptor(hashIndexId, List.of(
+                new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false),
+                new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false)
+        )));
+
+        TableSchemaAwareIndexStorage hashIndexStorage = new 
TableSchemaAwareIndexStorage(
+                hashIndexId,
+                hashInnerStorage,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        storage = new TestMvPartitionStorage(1);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage),
+                () -> Map.of(
+                        pkIndexId, pkStorage,
+                        sortedIndexId, sortedIndexStorage,
+                        hashIndexId, hashIndexStorage
+                )
+        );
+    }
+
+    List<ReadResult> getRowVersions(RowId rowId) {
+        try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) {
+            return readResults.stream().collect(toList());
+        }
+    }
+
+    static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row) {
+        TablePartitionId partitionId = new TablePartitionId(UUID.randomUUID(), 
1);
+
+        handler.handleUpdate(
+                TX_ID,
+                rowUuid,
+                partitionId,
+                row == null ? null : row.byteBuffer(),
+                (unused) -> {}
+        );
+    }
+
+    static BinaryRow defaultRow() {
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+
+        return binaryRow(key, value);
+    }
+
+    boolean inAllIndexes(BinaryRow row) {
+        return inIndexes(row, true, true);
+    }
+
+    boolean notInAnyIndex(BinaryRow row) {
+        return inIndexes(row, false, false);
+    }
+
+    boolean inIndexes(BinaryRow row, boolean mustBeInPk, boolean mustBeInUser) 
{
+        BinaryTuple pkIndexValue = 
PK_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row);
+        BinaryTuple userIndexValue = 
USER_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row);
+
+        assert pkIndexValue != null;
+        assert userIndexValue != null;
+
+        try (Cursor<RowId> pkCursor = pkInnerStorage.get(pkIndexValue)) {
+            if (pkCursor.hasNext() != mustBeInPk) {
+                return false;
+            }
+        }
+
+        try (Cursor<RowId> sortedIdxCursor = 
sortedInnerStorage.get(userIndexValue)) {
+            if (sortedIdxCursor.hasNext() != mustBeInUser) {
+                return false;
+            }
+        }
+
+        try (Cursor<RowId> hashIdxCursor = 
hashInnerStorage.get(userIndexValue)) {
+            return hashIdxCursor.hasNext() == mustBeInUser;
+        }
+    }
+
+    HybridTimestamp now() {
+        return clock.now();
+    }
+
+    void commitWrite(RowId rowId) {
+        storage.runConsistently(() -> {
+            storage.commitWrite(rowId, now());
+
+            return null;
+        });
+    }
+
+    /** Enum that encapsulates update API. */
+    enum AddWrite {
+        /** Uses update api. */
+        USE_UPDATE {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+                handler.handleUpdate(
+                        TX_ID,
+                        rowUuid,
+                        partitionId,
+                        row == null ? null : row.byteBuffer(),
+                        (unused) -> {}
+                );
+            }
+        },
+        /** Uses updateAll api. */
+        USE_UPDATE_ALL {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+                handler.handleUpdateAll(
+                        TX_ID,
+                        singletonMap(rowUuid, row == null ? null : 
row.byteBuffer()),
+                        partitionId,
+                        (unused) -> {}
+                );
+            }
+        };
+
+        void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row) {
+            TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+            addWrite(handler, tablePartitionId, rowUuid, row);
+        }
+
+        abstract void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row);
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
index 44ba9a280d..12b7780727 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -17,144 +17,30 @@
 
 package org.apache.ignite.internal.table.distributed;
 
-import static java.util.Collections.singletonMap;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import org.apache.ignite.distributed.TestPartitionDataStorage;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.BinaryRowConverter;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
-import org.apache.ignite.internal.schema.marshaller.MarshallerException;
-import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
-import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
-import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
-import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
-import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
-import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
-import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
 /** Tests indexes cleaning up on data removal and transaction abortions. */
-public class IndexCleanupTest {
-    /** Default reflection marshaller factory. */
-    private static final MarshallerFactory MARSHALLER_FACTORY = new 
ReflectionMarshallerFactory();
-
-    private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new 
SchemaDescriptor(1, new Column[]{
-            new Column("INTKEY", NativeTypes.INT32, false),
-            new Column("STRKEY", NativeTypes.STRING, false),
-    }, new Column[]{
-            new Column("INTVAL", NativeTypes.INT32, false),
-            new Column("STRVAL", NativeTypes.STRING, false),
-    });
-
-    private static final BinaryTupleSchema TUPLE_SCHEMA = 
BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
-
-    private static final BinaryTupleSchema PK_INDEX_SCHEMA = 
BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR);
-
-    private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = 
new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
-
-    private static final int[] USER_INDEX_COLS = {
-            SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(),
-            SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex()
-    };
-
-    private static final BinaryTupleSchema USER_INDEX_SCHEMA = 
BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS);
-
-    private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER 
= new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
-
-    /** Key-value marshaller for tests. */
-    private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER
-            = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class, 
TestValue.class);
-
-    private static final UUID TX_ID = UUID.randomUUID();
-
-    private static final HybridClock CLOCK = new HybridClockImpl();
-
-    private TestHashIndexStorage pkInnerStorage;
-    private TestSortedIndexStorage sortedInnerStorage;
-    private TestHashIndexStorage hashInnerStorage;
-    private TestMvPartitionStorage storage;
-    private StorageUpdateHandler storageUpdateHandler;
-
-    @BeforeEach
-    void setUp() {
-        UUID pkIndexId = UUID.randomUUID();
-        UUID sortedIndexId = UUID.randomUUID();
-        UUID hashIndexId = UUID.randomUUID();
-
-        pkInnerStorage = new TestHashIndexStorage(null);
-
-        TableSchemaAwareIndexStorage pkStorage = new 
TableSchemaAwareIndexStorage(
-                pkIndexId,
-                pkInnerStorage,
-                PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
-        );
-
-        sortedInnerStorage = new TestSortedIndexStorage(new 
SortedIndexDescriptor(sortedIndexId, List.of(
-                new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false, true),
-                new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false, true)
-        )));
-
-        TableSchemaAwareIndexStorage sortedIndexStorage = new 
TableSchemaAwareIndexStorage(
-                sortedIndexId,
-                sortedInnerStorage,
-                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
-        );
-
-        hashInnerStorage = new TestHashIndexStorage(new 
HashIndexDescriptor(hashIndexId, List.of(
-                new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false),
-                new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false)
-        )));
-
-        TableSchemaAwareIndexStorage hashIndexStorage = new 
TableSchemaAwareIndexStorage(
-                hashIndexId,
-                hashInnerStorage,
-                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
-        );
-
-        storage = new TestMvPartitionStorage(1);
-
-        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage),
-                () -> Map.of(
-                        pkIndexId, pkStorage,
-                        sortedIndexId, sortedIndexStorage,
-                        hashIndexId, hashIndexStorage
-                )
-        );
-    }
-
+public class IndexCleanupTest extends IndexBaseTest {
     @ParameterizedTest
     @EnumSource(AddWrite.class)
     void testAbort(AddWrite writer) {
         UUID rowUuid = UUID.randomUUID();
         RowId rowId = new RowId(1, rowUuid);
 
-        var key = new TestKey(1, "foo");
-        var value = new TestValue(2, "bar");
-        BinaryRow tableRow = binaryRow(key, value);
+        BinaryRow row = defaultRow();
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
 
         assertEquals(1, storage.rowsCount());
         assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
@@ -175,11 +61,9 @@ public class IndexCleanupTest {
         UUID rowUuid = UUID.randomUUID();
         RowId rowId = new RowId(1, rowUuid);
 
-        var key = new TestKey(1, "foo");
-        var value = new TestValue(2, "bar");
-        BinaryRow tableRow = binaryRow(key, value);
+        BinaryRow row = defaultRow();
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
 
         // Write intent is in the storage.
         assertEquals(1, storage.rowsCount());
@@ -206,11 +90,9 @@ public class IndexCleanupTest {
         UUID rowUuid = UUID.randomUUID();
         RowId rowId = new RowId(1, rowUuid);
 
-        var key = new TestKey(1, "foo");
-        var value = new TestValue(2, "bar");
-        BinaryRow tableRow = binaryRow(key, value);
+        BinaryRow row = defaultRow();
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         writer.addWrite(storageUpdateHandler, rowUuid, null);
 
         storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
@@ -257,15 +139,37 @@ public class IndexCleanupTest {
 
     @ParameterizedTest
     @EnumSource(AddWrite.class)
-    void testIndexNotRemovedOnTombstone(AddWrite writer) {
+    void testAbortConsecutiveTxWithMatchingIndexesSameRow(AddWrite writer) {
         UUID rowUuid = UUID.randomUUID();
         RowId rowId = new RowId(1, rowUuid);
 
         var key = new TestKey(1, "foo");
-        var value = new TestValue(2, "bar");
-        BinaryRow tableRow = binaryRow(key, value);
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        BinaryRow row1 = binaryRow(key, new TestValue(2, "bar"));
+        BinaryRow row2 = binaryRow(key, new TestValue(3, "baz"));
+
+        writer.addWrite(storageUpdateHandler, rowUuid, row1);
+        commitWrite(rowId);
+
+        writer.addWrite(storageUpdateHandler, rowUuid, row2);
+
+        storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
+
+        assertEquals(1, storage.rowsCount());
+
+        assertTrue(inAllIndexes(row1));
+        assertTrue(inIndexes(row2, true, false));
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWrite.class)
+    void testIndexNotRemovedOnTombstone(AddWrite writer) {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        BinaryRow row = defaultRow();
+
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         commitWrite(rowId);
 
         writer.addWrite(storageUpdateHandler, rowUuid, null);
@@ -284,14 +188,12 @@ public class IndexCleanupTest {
         UUID rowUuid = UUID.randomUUID();
         RowId rowId = new RowId(1, rowUuid);
 
-        var key = new TestKey(1, "foo");
-        var value = new TestValue(2, "bar");
-        BinaryRow tableRow = binaryRow(key, value);
+        BinaryRow row = defaultRow();
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         commitWrite(rowId);
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
 
         storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () -> 
{});
 
@@ -307,14 +209,12 @@ public class IndexCleanupTest {
         UUID rowUuid = UUID.randomUUID();
         RowId rowId = new RowId(1, rowUuid);
 
-        var key = new TestKey(1, "foo");
-        var value = new TestValue(2, "bar");
-        BinaryRow tableRow = binaryRow(key, value);
+        BinaryRow row = defaultRow();
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         commitWrite(rowId);
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         writer.addWrite(storageUpdateHandler, rowUuid, null);
 
         assertEquals(1, storage.rowsCount());
@@ -329,17 +229,15 @@ public class IndexCleanupTest {
         UUID rowUuid = UUID.randomUUID();
         RowId rowId = new RowId(1, rowUuid);
 
-        var key = new TestKey(1, "foo");
-        var value = new TestValue(2, "bar");
-        BinaryRow tableRow = binaryRow(key, value);
+        BinaryRow row = defaultRow();
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         commitWrite(rowId);
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         commitWrite(rowId);
 
-        writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+        writer.addWrite(storageUpdateHandler, rowUuid, row);
         writer.addWrite(storageUpdateHandler, rowUuid, null);
 
         assertEquals(1, storage.rowsCount());
@@ -347,85 +245,4 @@ public class IndexCleanupTest {
         assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
         assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
     }
-
-    /** Enum that encapsulates update API. */
-    enum AddWrite {
-        /** Uses update api. */
-        USE_UPDATE {
-            @Override
-            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
-                handler.handleUpdate(
-                        TX_ID,
-                        rowUuid,
-                        partitionId,
-                        row == null ? null : row.byteBuffer(),
-                        (unused) -> {}
-                );
-            }
-        },
-        /** Uses updateAll api. */
-        USE_UPDATE_ALL {
-            @Override
-            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
-                handler.handleUpdateAll(
-                        TX_ID,
-                        singletonMap(rowUuid, row == null ? null : 
row.byteBuffer()),
-                        partitionId,
-                        (unused) -> {}
-                );
-            }
-        };
-
-        void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row) {
-            TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
-
-            addWrite(handler, tablePartitionId, rowUuid, row);
-        }
-
-        abstract void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row);
-    }
-
-    private static BinaryRow binaryRow(TestKey key, TestValue value) {
-        try {
-            return KV_MARSHALLER.marshal(key, value);
-        } catch (MarshallerException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void commitWrite(RowId rowId) {
-        storage.runConsistently(() -> {
-            storage.commitWrite(rowId, CLOCK.now());
-
-            return null;
-        });
-    }
-
-    private static class TestKey {
-        int intKey;
-
-        String strKey;
-
-        TestKey() {
-        }
-
-        TestKey(int intKey, String strKey) {
-            this.intKey = intKey;
-            this.strKey = strKey;
-        }
-    }
-
-    private static class TestValue {
-        Integer intVal;
-
-        String strVal;
-
-        TestValue() {
-        }
-
-        TestValue(Integer intVal, String strVal) {
-            this.intVal = intVal;
-            this.strVal = strVal;
-        }
-    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
new file mode 100644
index 0000000000..b8f09c2229
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.junit.jupiter.api.Test;
+
+/** Tests indexes cleaning up on garbage collection. */
+public class IndexGcTest extends IndexBaseTest {
+    @Test
+    void testRemoveStaleEntryWithSameIndex() {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        BinaryRow row = defaultRow();
+
+        addWrite(storageUpdateHandler, rowUuid, row);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, row);
+        commitWrite(rowId);
+
+        assertTrue(storageUpdateHandler.vacuum(now()));
+
+        assertEquals(1, getRowVersions(rowId).size());
+        // Newer entry has the same index value, so it should not be removed.
+        assertTrue(inAllIndexes(row));
+    }
+
+    @Test
+    void testRemoveStaleEntriesWithDifferentIndexes() {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+
+        BinaryRow row1 = binaryRow(key, new TestValue(2, "bar"));
+        BinaryRow row2 = binaryRow(key, new TestValue(5, "baz"));
+
+        addWrite(storageUpdateHandler, rowUuid, row1);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, row1);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, row2);
+        commitWrite(rowId);
+
+        HybridTimestamp afterCommits = now();
+
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+
+        // row1 should still be in the index, because second write was 
identical to the first.
+        assertTrue(inAllIndexes(row1));
+
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+        assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+        assertEquals(1, getRowVersions(rowId).size());
+        // Older entries have different indexes, should be removed.
+        assertTrue(inIndexes(row1, true, false));
+        assertTrue(inAllIndexes(row2));
+    }
+
+    @Test
+    void testRemoveTombstonesRowNullNull() {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        BinaryRow row = defaultRow();
+
+        addWrite(storageUpdateHandler, rowUuid, row);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, null);
+        commitWrite(rowId);
+
+        // Second tombstone won't be actually put into the storage, but still, 
let's check.
+        addWrite(storageUpdateHandler, rowUuid, null);
+        commitWrite(rowId);
+
+        HybridTimestamp afterCommits = now();
+
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+        assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+        assertEquals(0, getRowVersions(rowId).size());
+        // The last entry was a tombstone, so no indexes should be left.
+        assertTrue(notInAnyIndex(row));
+    }
+
+    @Test
+    void testRemoveTombstonesRowNullRow() {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        BinaryRow row = defaultRow();
+
+        addWrite(storageUpdateHandler, rowUuid, row);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, null);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, row);
+        commitWrite(rowId);
+
+        HybridTimestamp afterCommits = now();
+
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+        assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+        assertEquals(1, getRowVersions(rowId).size());
+        assertTrue(inAllIndexes(row));
+    }
+
+    @Test
+    void testRemoveTombstonesRowRowNull() {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        BinaryRow row = defaultRow();
+
+        addWrite(storageUpdateHandler, rowUuid, row);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, row);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, null);
+        commitWrite(rowId);
+
+        HybridTimestamp afterCommits = now();
+
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+        assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+        assertEquals(0, getRowVersions(rowId).size());
+        assertTrue(notInAnyIndex(row));
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index eb891ebeeb..022d9c863b 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.ReadResult;
@@ -111,6 +112,11 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
         return partitionStorage.scanVersions(rowId);
     }
 
+    @Override
+    public @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp 
lowWatermark) {
+        return partitionStorage.pollForVacuum(lowWatermark);
+    }
+
     @Override
     public MvPartitionStorage getStorage() {
         return partitionStorage;


Reply via email to