This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-18739
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit af2e134692bdb0f0e22b03c909e1acbde54bd503
Author: Semyon Danilov <[email protected]>
AuthorDate: Wed Feb 22 13:09:33 2023 +0400

    IGNITE-18739 Add index garbage collection
---
 .../table/distributed/StorageUpdateHandler.java    |  34 ++-
 .../distributed/raft/PartitionDataStorage.java     |   9 +
 .../SnapshotAwarePartitionDataStorage.java         |   6 +
 .../internal/table/distributed/IndexGcTest.java    | 300 +++++++++++++++++++++
 .../distributed/TestPartitionDataStorage.java      |   6 +
 5 files changed, 351 insertions(+), 4 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 3bdc0aff09..6958cc3404 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -28,9 +28,11 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -252,10 +254,34 @@ public class StorageUpdateHandler {
         }
     }
 
-    private void removeFromIndex(BinaryRow row, RowId rowId) {
-        for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
-            index.remove(row, rowId);
-        }
+    /**
+     * Tries removing the oldest stale entry in the partition.
+     *
+     * @param lowWatermark Low watermark for the vacuum.
+     */
+    public boolean vacuum(HybridTimestamp lowWatermark) {
+        return storage.runConsistently(() -> {
+            BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
+
+            if (vacuumed == null) {
+                // Nothing was garbage collected.
+                return false;
+            }
+
+            BinaryRow binaryRow = vacuumed.binaryRow();
+
+            assert binaryRow != null;
+
+            RowId rowId = vacuumed.rowId();
+
+            try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+                assert cursor.hasNext();
+
+                tryRemoveFromIndexes(binaryRow, rowId, cursor);
+            }
+
+            return true;
+        });
     }
 
     private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index 5b9ac5b6e1..7a6ca8f546 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
@@ -175,6 +176,14 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
      */
     Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
 
+    /**
+     * Tries to garbage collect the oldest stale entry of the partition.
+     *
+     * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
+     */
+    @Nullable
+    BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark);
+
     /**
      * Returns the underlying {@link MvPartitionStorage}. Only for tests!
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index a5bf3601d9..d50e404eba 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
@@ -138,6 +139,11 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
         return partitionStorage.scanVersions(rowId);
     }
 
+    @Override
+    public BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
+        return partitionStorage.pollForVacuum(lowWatermark);
+    }
+
     private void handleSnapshotInterference(RowId rowId) {
         PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
new file mode 100644
index 0000000000..437db12836
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import 
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests indexes cleaning up on garbage collection. */
+public class IndexGcTest {
+    /** Default reflection marshaller factory. */
+    private static final MarshallerFactory MARSHALLER_FACTORY = new 
ReflectionMarshallerFactory();
+
+    private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new 
SchemaDescriptor(1, new Column[]{
+            new Column("INTKEY", NativeTypes.INT32, false),
+            new Column("STRKEY", NativeTypes.STRING, false),
+    }, new Column[]{
+            new Column("INTVAL", NativeTypes.INT32, false),
+            new Column("STRVAL", NativeTypes.STRING, false),
+    });
+
+    private static final BinaryTupleSchema TUPLE_SCHEMA = 
BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryTupleSchema PK_INDEX_SCHEMA = 
BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR);
+
+    private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = 
new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
+
+    private static final int[] USER_INDEX_COLS = {
+            SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(),
+            SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex()
+    };
+
+    private static final BinaryTupleSchema USER_INDEX_SCHEMA = 
BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS);
+
+    private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER 
= new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
+
+    /** Key-value marshaller for tests. */
+    private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER
+            = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class, 
TestValue.class);
+
+    private static final UUID TX_ID = UUID.randomUUID();
+
+    private static final HybridClock CLOCK = new HybridClockImpl();
+
+    private TestHashIndexStorage pkInnerStorage;
+    private TestSortedIndexStorage sortedInnerStorage;
+    private TestHashIndexStorage hashInnerStorage;
+    private TestMvPartitionStorage storage;
+    private StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp() {
+        UUID pkIndexId = UUID.randomUUID();
+        UUID sortedIndexId = UUID.randomUUID();
+        UUID hashIndexId = UUID.randomUUID();
+
+        pkInnerStorage = new TestHashIndexStorage(null);
+
+        TableSchemaAwareIndexStorage pkStorage = new 
TableSchemaAwareIndexStorage(
+                pkIndexId,
+                pkInnerStorage,
+                PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        sortedInnerStorage = new TestSortedIndexStorage(new 
SortedIndexDescriptor(sortedIndexId, List.of(
+                new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false, true),
+                new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false, true)
+        )));
+
+        TableSchemaAwareIndexStorage sortedIndexStorage = new 
TableSchemaAwareIndexStorage(
+                sortedIndexId,
+                sortedInnerStorage,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        hashInnerStorage = new TestHashIndexStorage(new 
HashIndexDescriptor(hashIndexId, List.of(
+                new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, 
false),
+                new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, 
false)
+        )));
+
+        TableSchemaAwareIndexStorage hashIndexStorage = new 
TableSchemaAwareIndexStorage(
+                hashIndexId,
+                hashInnerStorage,
+                USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+        );
+
+        storage = new TestMvPartitionStorage(1);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage),
+                () -> Map.of(
+                        pkIndexId, pkStorage,
+                        sortedIndexId, sortedIndexStorage,
+                        hashIndexId, hashIndexStorage
+                )
+        );
+    }
+
+    @Test
+    void test1() {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key = new TestKey(1, "foo");
+        var value = new TestValue(2, "bar");
+        BinaryRow tableRow = binaryRow(key, value);
+
+        addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, tableRow);
+        commitWrite(rowId);
+
+        assertEquals(2, getRowVersions(rowId).size());
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+
+        assertTrue(storageUpdateHandler.vacuum(CLOCK.now()));
+
+        assertEquals(1, getRowVersions(rowId).size());
+        assertTrue(inIndex(tableRow));
+    }
+
+    @Test
+    void test2() {
+        UUID rowUuid = UUID.randomUUID();
+        RowId rowId = new RowId(1, rowUuid);
+
+        var key1 = new TestKey(1, "foo");
+        var value1 = new TestValue(2, "bar");
+        BinaryRow tableRow1 = binaryRow(key1, value1);
+
+        var key2 = new TestKey(1, "foo");
+        var value2 = new TestValue(5, "baz");
+        BinaryRow tableRow2 = binaryRow(key2, value2);
+
+        addWrite(storageUpdateHandler, rowUuid, tableRow1);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, tableRow1);
+        commitWrite(rowId);
+
+        addWrite(storageUpdateHandler, rowUuid, tableRow2);
+        commitWrite(rowId);
+
+        assertEquals(3, getRowVersions(rowId).size());
+        assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
+        assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
+
+        HybridTimestamp afterCommits = CLOCK.now();
+
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+        assertTrue(storageUpdateHandler.vacuum(afterCommits));
+        assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+        assertEquals(1, getRowVersions(rowId).size());
+        assertFalse(inIndex(tableRow1));
+        assertTrue(inIndex(tableRow2));
+    }
+
+    private boolean inIndex(BinaryRow row) {
+        BinaryTuple pkIndexValue = 
PK_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row);
+        BinaryTuple userIndexValue = 
USER_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row);
+
+        assert pkIndexValue != null;
+        assert userIndexValue != null;
+
+        try (Cursor<RowId> pkCursor = pkInnerStorage.get(pkIndexValue)) {
+            if (!pkCursor.hasNext()) {
+                return false;
+            }
+        }
+
+        try (Cursor<RowId> sortedIdxCursor = 
sortedInnerStorage.get(userIndexValue)) {
+            if (!sortedIdxCursor.hasNext()) {
+                return false;
+            }
+        }
+
+        try (Cursor<RowId> hashIdxCursor = 
hashInnerStorage.get(userIndexValue)) {
+            return hashIdxCursor.hasNext();
+        }
+    }
+
+    private List<ReadResult> getRowVersions(RowId rowId) {
+        try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) {
+            return readResults.stream().collect(Collectors.toList());
+        }
+    }
+
+    private static void addWrite(StorageUpdateHandler handler, UUID rowUuid, 
@Nullable BinaryRow row) {
+        TablePartitionId partitionId = new TablePartitionId(UUID.randomUUID(), 
1);
+
+        handler.handleUpdate(
+                TX_ID,
+                rowUuid,
+                partitionId,
+                row == null ? null : row.byteBuffer(),
+                (unused) -> {}
+        );
+    }
+
+    private static BinaryRow binaryRow(TestKey key, TestValue value) {
+        try {
+            return KV_MARSHALLER.marshal(key, value);
+        } catch (MarshallerException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private HybridTimestamp commitWrite(RowId rowId) {
+        return storage.runConsistently(() -> {
+            HybridTimestamp commitTimestamp = CLOCK.now();
+
+            storage.commitWrite(rowId, commitTimestamp);
+
+            return commitTimestamp;
+        });
+    }
+
+    private static class TestKey {
+        int intKey;
+
+        String strKey;
+
+        TestKey() {
+        }
+
+        TestKey(int intKey, String strKey) {
+            this.intKey = intKey;
+            this.strKey = strKey;
+        }
+    }
+
+    private static class TestValue {
+        Integer intVal;
+
+        String strVal;
+
+        TestValue() {
+        }
+
+        TestValue(Integer intVal, String strVal) {
+            this.intVal = intVal;
+            this.strVal = strVal;
+        }
+    }
+}
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 3391eff013..1d6093abd8 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.RaftGroupConfiguration;
@@ -113,6 +114,11 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
         return partitionStorage.scanVersions(rowId);
     }
 
+    @Override
+    public BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
+        return partitionStorage.pollForVacuum(lowWatermark);
+    }
+
     @Override
     public MvPartitionStorage getStorage() {
         return partitionStorage;

Reply via email to