This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-18033
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 39a3934d2ae259a3f4424814958d44f45c482bd7
Author: Semyon Danilov <[email protected]>
AuthorDate: Fri Feb 24 16:56:58 2023 +0400

    IGNITE-18033 Implement cooperative GC of MV data during RAFT commands 
execution
---
 .../table/distributed/StorageUpdateHandler.java    | 125 ++++++++++++--
 .../table/distributed/PartitionGcOnWriteTest.java  | 185 +++++++++++++++++++++
 2 files changed, 295 insertions(+), 15 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index bad1f2f50a..ed7fc2cd82 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.ByteBufferRow;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -53,6 +55,9 @@ public class StorageUpdateHandler {
 
     private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
 
+    /** Last recorded GC low watermark. */
+    private final AtomicReference<HybridTimestamp> lastRecordedLwm = new 
AtomicReference<>();
+
     /**
      * The constructor.
      *
@@ -67,7 +72,7 @@ public class StorageUpdateHandler {
     }
 
     /**
-     * Handle single update.
+     * Handles single update.
      *
      * @param txId Transaction id.
      * @param rowUuid Row UUID.
@@ -81,8 +86,31 @@ public class StorageUpdateHandler {
             TablePartitionId commitPartitionId,
             @Nullable ByteBuffer rowBuffer,
             @Nullable Consumer<RowId> onReplication
+    ) {
+        handleUpdate(txId, rowUuid, commitPartitionId, rowBuffer, 
onReplication, null);
+    }
+
+    /**
+     * Handles single update.
+     *
+     * @param txId Transaction id.
+     * @param rowUuid Row UUID.
+     * @param commitPartitionId Commit partition id.
+     * @param rowBuffer Row buffer.
+     * @param onReplication Callback on replication.
+     * @param lowWatermark GC low watermark.
+     */
+    public void handleUpdate(
+            UUID txId,
+            UUID rowUuid,
+            TablePartitionId commitPartitionId,
+            @Nullable ByteBuffer rowBuffer,
+            @Nullable Consumer<RowId> onReplication,
+            @Nullable HybridTimestamp lowWatermark
     ) {
         storage.runConsistently(() -> {
+            executeBatchGc(lowWatermark);
+
             BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) : 
null;
             RowId rowId = new RowId(partitionId, rowUuid);
             UUID commitTblId = commitPartitionId.tableId();
@@ -106,7 +134,7 @@ public class StorageUpdateHandler {
     }
 
     /**
-     * Handle multiple updates.
+     * Handles multiple updates.
      *
      * @param txId Transaction id.
      * @param rowsToUpdate Collection of rows to update.
@@ -118,8 +146,29 @@ public class StorageUpdateHandler {
             Map<UUID, ByteBuffer> rowsToUpdate,
             TablePartitionId commitPartitionId,
             @Nullable Consumer<Collection<RowId>> onReplication
+    ) {
+        handleUpdateAll(txId, rowsToUpdate, commitPartitionId, onReplication, 
null);
+    }
+
+    /**
+     * Handle multiple updates.
+     *
+     * @param txId Transaction id.
+     * @param rowsToUpdate Collection of rows to update.
+     * @param commitPartitionId Commit partition id.
+     * @param onReplication On replication callback.
+     * @param lowWatermark GC low watermark.
+     */
+    public void handleUpdateAll(
+            UUID txId,
+            Map<UUID, ByteBuffer> rowsToUpdate,
+            TablePartitionId commitPartitionId,
+            @Nullable Consumer<Collection<RowId>> onReplication,
+            @Nullable HybridTimestamp lowWatermark
     ) {
         storage.runConsistently(() -> {
+            executeBatchGc(lowWatermark);
+
             UUID commitTblId = commitPartitionId.tableId();
             int commitPartId = commitPartitionId.partitionId();
 
@@ -150,6 +199,25 @@ public class StorageUpdateHandler {
         });
     }
 
+    private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
+        if (newLwm != null) {
+            HybridTimestamp curLwm = lastRecordedLwm.updateAndGet(prevLwm -> {
+                if (prevLwm == null) {
+                    return newLwm;
+                }
+
+                return newLwm.compareTo(prevLwm) > 0 ? newLwm : prevLwm;
+            });
+
+            if (curLwm == newLwm) {
+                // Iff the lwm we have is the new lwm.
+                // Otherwise our newLwm is either was smaller than last 
recorded lwm or last recorded lwm has changed
+                // concurrently and it become greater. If that's the case, 
another thread will perform the GC.
+                vacuumBatch(curLwm, 5);
+            }
+        }
+    }
+
     /**
      * Tries to remove a previous write from index.
      *
@@ -263,26 +331,53 @@ public class StorageUpdateHandler {
      * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
      */
     public boolean vacuum(HybridTimestamp lowWatermark) {
-        return storage.runConsistently(() -> {
-            BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
+        return storage.runConsistently(() -> internalVacuum(lowWatermark));
+    }
 
-            if (vacuumed == null) {
-                // Nothing was garbage collected.
-                return false;
+    /**
+     * Tries removing {@code count} oldest stale entries and their indexes.
+     * If there's less entries that can be removed, then exits prematurely.
+     *
+     * @param lowWatermark Low watermark for the vacuum.
+     * @param count Count of entries to GC.
+     */
+    public void vacuumBatch(HybridTimestamp lowWatermark, int count) {
+        storage.runConsistently(() -> {
+            for (int i = 0; i < count; i++) {
+                if (!internalVacuum(lowWatermark)) {
+                    break;
+                }
             }
 
-            BinaryRow binaryRow = vacuumed.binaryRow();
+            return null;
+        });
+    }
+
+    /**
+     * Executes garbage collection. Must be called inside a {@link 
MvPartitionStorage#runConsistently(WriteClosure)} closure.
+     *
+     * @param lowWatermark Low watermark for the vacuum.
+     * @return {@code true} if an entry was garbage collected, {@code false} 
if there was nothing to collect.
+     */
+    private boolean internalVacuum(HybridTimestamp lowWatermark) {
+        BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
 
-            assert binaryRow != null;
+        if (vacuumed == null) {
+            // Nothing was garbage collected.
+            return false;
+        }
 
-            RowId rowId = vacuumed.rowId();
+        BinaryRow binaryRow = vacuumed.binaryRow();
 
-            try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
-                tryRemoveFromIndexes(binaryRow, rowId, cursor);
-            }
+        assert binaryRow != null;
 
-            return true;
-        });
+        RowId rowId = vacuumed.rowId();
+
+        try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+            tryRemoveFromIndexes(binaryRow, rowId, cursor);
+        }
+
+        return true;
     }
 
     private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
new file mode 100644
index 0000000000..7b156ea080
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests for cooperative GC (GC that is executed on write). */
+public class PartitionGcOnWriteTest extends BaseMvStoragesTest {
+    private static final int WRITES_COUNT = 10;
+    private static final UUID TX_ID = UUID.randomUUID();
+    private static final int PARTITION_ID = 1;
+    private static final TablePartitionId TABLE_PARTITION_ID = new 
TablePartitionId(UUID.randomUUID(), PARTITION_ID);
+
+    private TestMvPartitionStorage storage;
+    private StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp() {
+        storage = new TestMvPartitionStorage(1);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage), Collections::emptyMap);
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWriteWithGc.class)
+    void testNullLwm(AddWriteWithGc writer) {
+        RowId rowId = fillWithDataForGc();
+
+        writeWithGc(writer, null);
+
+        assertEquals(WRITES_COUNT, getRowVersions(rowId).size());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWriteWithGc.class)
+    void testOlderLwm(AddWriteWithGc writer) {
+        HybridTimestamp older = clock.now();
+
+        HybridTimestamp newer = clock.now();
+
+        writeWithGc(writer, newer);
+
+        RowId rowId = fillWithDataForGc();
+
+        writeWithGc(writer, older);
+
+        assertEquals(WRITES_COUNT, getRowVersions(rowId).size());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWriteWithGc.class)
+    void testNewerLwm(AddWriteWithGc writer) {
+        RowId rowId = fillWithDataForGc();
+
+        writeWithGc(writer, clock.now());
+
+        assertEquals(5, getRowVersions(rowId).size());
+    }
+
+    private List<ReadResult> getRowVersions(RowId rowId) {
+        try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) {
+            return readResults.stream().collect(toList());
+        }
+    }
+
+    private void writeWithGc(AddWriteWithGc writer, @Nullable HybridTimestamp 
lwm) {
+        UUID rowUuid = UUID.randomUUID();
+
+        TestKey key = new TestKey(1337, "leet");
+        BinaryRow row = binaryRow(key, new TestValue(999, "bar"));
+
+        writer.addWrite(storageUpdateHandler, rowUuid, row, lwm);
+    }
+
+    private RowId fillWithDataForGc() {
+        UUID rowUuid = UUID.randomUUID();
+        var rowId = new RowId(PARTITION_ID, rowUuid);
+
+        TestKey key = new TestKey(1, "foo");
+
+        for (int i = 0; i < WRITES_COUNT; i++) {
+            BinaryRow row = binaryRow(key, new TestValue(i, "bar" + i));
+
+            addWrite(storageUpdateHandler, rowUuid, row);
+
+            commitWrite(rowId);
+        }
+
+        return rowId;
+    }
+
+    private static void addWrite(StorageUpdateHandler handler, UUID rowUuid, 
@Nullable BinaryRow row) {
+        handler.handleUpdate(
+                TX_ID,
+                rowUuid,
+                TABLE_PARTITION_ID,
+                row == null ? null : row.byteBuffer(),
+                (unused) -> {}
+        );
+    }
+
+    private void commitWrite(RowId rowId) {
+        storage.runConsistently(() -> {
+            storage.commitWrite(rowId, clock.now());
+
+            return null;
+        });
+    }
+
+    /** Enum that encapsulates update API. */
+    enum AddWriteWithGc {
+        /** Uses update api. */
+        USE_UPDATE {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+                    @Nullable HybridTimestamp lwm) {
+                handler.handleUpdate(
+                        TX_ID,
+                        rowUuid,
+                        partitionId,
+                        row == null ? null : row.byteBuffer(),
+                        (unused) -> {},
+                        lwm
+                );
+            }
+        },
+        /** Uses updateAll api. */
+        USE_UPDATE_ALL {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+                    @Nullable HybridTimestamp lwm) {
+                handler.handleUpdateAll(
+                        TX_ID,
+                        singletonMap(rowUuid, row == null ? null : 
row.byteBuffer()),
+                        partitionId,
+                        (unused) -> {},
+                        lwm
+                );
+            }
+        };
+
+        void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row, @Nullable HybridTimestamp lwm) {
+            TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+            addWrite(handler, tablePartitionId, rowUuid, row, lwm);
+        }
+
+        abstract void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+                @Nullable HybridTimestamp lwm);
+    }
+}

Reply via email to