This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-18033 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 39a3934d2ae259a3f4424814958d44f45c482bd7 Author: Semyon Danilov <[email protected]> AuthorDate: Fri Feb 24 16:56:58 2023 +0400 IGNITE-18033 Implement cooperative GC of MV data during RAFT commands execution --- .../table/distributed/StorageUpdateHandler.java | 125 ++++++++++++-- .../table/distributed/PartitionGcOnWriteTest.java | 185 +++++++++++++++++++++ 2 files changed, 295 insertions(+), 15 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index bad1f2f50a..ed7fc2cd82 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.BinaryRowAndRowId; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; @@ -53,6 +55,9 @@ public class StorageUpdateHandler { private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes; + /** Last recorded GC low watermark. */ + private final AtomicReference<HybridTimestamp> lastRecordedLwm = new AtomicReference<>(); + /** * The constructor. * @@ -67,7 +72,7 @@ public class StorageUpdateHandler { } /** - * Handle single update. + * Handles single update. * * @param txId Transaction id. * @param rowUuid Row UUID. @@ -81,8 +86,31 @@ public class StorageUpdateHandler { TablePartitionId commitPartitionId, @Nullable ByteBuffer rowBuffer, @Nullable Consumer<RowId> onReplication + ) { + handleUpdate(txId, rowUuid, commitPartitionId, rowBuffer, onReplication, null); + } + + /** + * Handles single update. + * + * @param txId Transaction id. + * @param rowUuid Row UUID. + * @param commitPartitionId Commit partition id. + * @param rowBuffer Row buffer. + * @param onReplication Callback on replication. + * @param lowWatermark GC low watermark. + */ + public void handleUpdate( + UUID txId, + UUID rowUuid, + TablePartitionId commitPartitionId, + @Nullable ByteBuffer rowBuffer, + @Nullable Consumer<RowId> onReplication, + @Nullable HybridTimestamp lowWatermark ) { storage.runConsistently(() -> { + executeBatchGc(lowWatermark); + BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) : null; RowId rowId = new RowId(partitionId, rowUuid); UUID commitTblId = commitPartitionId.tableId(); @@ -106,7 +134,7 @@ public class StorageUpdateHandler { } /** - * Handle multiple updates. + * Handles multiple updates. * * @param txId Transaction id. * @param rowsToUpdate Collection of rows to update. @@ -118,8 +146,29 @@ public class StorageUpdateHandler { Map<UUID, ByteBuffer> rowsToUpdate, TablePartitionId commitPartitionId, @Nullable Consumer<Collection<RowId>> onReplication + ) { + handleUpdateAll(txId, rowsToUpdate, commitPartitionId, onReplication, null); + } + + /** + * Handle multiple updates. + * + * @param txId Transaction id. + * @param rowsToUpdate Collection of rows to update. + * @param commitPartitionId Commit partition id. + * @param onReplication On replication callback. + * @param lowWatermark GC low watermark. + */ + public void handleUpdateAll( + UUID txId, + Map<UUID, ByteBuffer> rowsToUpdate, + TablePartitionId commitPartitionId, + @Nullable Consumer<Collection<RowId>> onReplication, + @Nullable HybridTimestamp lowWatermark ) { storage.runConsistently(() -> { + executeBatchGc(lowWatermark); + UUID commitTblId = commitPartitionId.tableId(); int commitPartId = commitPartitionId.partitionId(); @@ -150,6 +199,25 @@ public class StorageUpdateHandler { }); } + private void executeBatchGc(@Nullable HybridTimestamp newLwm) { + if (newLwm != null) { + HybridTimestamp curLwm = lastRecordedLwm.updateAndGet(prevLwm -> { + if (prevLwm == null) { + return newLwm; + } + + return newLwm.compareTo(prevLwm) > 0 ? newLwm : prevLwm; + }); + + if (curLwm == newLwm) { + // Iff the lwm we have is the new lwm. + // Otherwise our newLwm is either was smaller than last recorded lwm or last recorded lwm has changed + // concurrently and it become greater. If that's the case, another thread will perform the GC. + vacuumBatch(curLwm, 5); + } + } + } + /** * Tries to remove a previous write from index. * @@ -263,26 +331,53 @@ public class StorageUpdateHandler { * @see MvPartitionStorage#pollForVacuum(HybridTimestamp) */ public boolean vacuum(HybridTimestamp lowWatermark) { - return storage.runConsistently(() -> { - BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark); + return storage.runConsistently(() -> internalVacuum(lowWatermark)); + } - if (vacuumed == null) { - // Nothing was garbage collected. - return false; + /** + * Tries removing {@code count} oldest stale entries and their indexes. + * If there's less entries that can be removed, then exits prematurely. + * + * @param lowWatermark Low watermark for the vacuum. + * @param count Count of entries to GC. + */ + public void vacuumBatch(HybridTimestamp lowWatermark, int count) { + storage.runConsistently(() -> { + for (int i = 0; i < count; i++) { + if (!internalVacuum(lowWatermark)) { + break; + } } - BinaryRow binaryRow = vacuumed.binaryRow(); + return null; + }); + } + + /** + * Executes garbage collection. Must be called inside a {@link MvPartitionStorage#runConsistently(WriteClosure)} closure. + * + * @param lowWatermark Low watermark for the vacuum. + * @return {@code true} if an entry was garbage collected, {@code false} if there was nothing to collect. + */ + private boolean internalVacuum(HybridTimestamp lowWatermark) { + BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark); - assert binaryRow != null; + if (vacuumed == null) { + // Nothing was garbage collected. + return false; + } - RowId rowId = vacuumed.rowId(); + BinaryRow binaryRow = vacuumed.binaryRow(); - try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { - tryRemoveFromIndexes(binaryRow, rowId, cursor); - } + assert binaryRow != null; - return true; - }); + RowId rowId = vacuumed.rowId(); + + try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { + tryRemoveFromIndexes(binaryRow, rowId, cursor); + } + + return true; } private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java new file mode 100644 index 0000000000..7b156ea080 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import org.apache.ignite.distributed.TestPartitionDataStorage; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.BaseMvStoragesTest; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +/** Tests for cooperative GC (GC that is executed on write). */ +public class PartitionGcOnWriteTest extends BaseMvStoragesTest { + private static final int WRITES_COUNT = 10; + private static final UUID TX_ID = UUID.randomUUID(); + private static final int PARTITION_ID = 1; + private static final TablePartitionId TABLE_PARTITION_ID = new TablePartitionId(UUID.randomUUID(), PARTITION_ID); + + private TestMvPartitionStorage storage; + private StorageUpdateHandler storageUpdateHandler; + + @BeforeEach + void setUp() { + storage = new TestMvPartitionStorage(1); + + storageUpdateHandler = new StorageUpdateHandler(1, new TestPartitionDataStorage(storage), Collections::emptyMap); + } + + @ParameterizedTest + @EnumSource(AddWriteWithGc.class) + void testNullLwm(AddWriteWithGc writer) { + RowId rowId = fillWithDataForGc(); + + writeWithGc(writer, null); + + assertEquals(WRITES_COUNT, getRowVersions(rowId).size()); + } + + @ParameterizedTest + @EnumSource(AddWriteWithGc.class) + void testOlderLwm(AddWriteWithGc writer) { + HybridTimestamp older = clock.now(); + + HybridTimestamp newer = clock.now(); + + writeWithGc(writer, newer); + + RowId rowId = fillWithDataForGc(); + + writeWithGc(writer, older); + + assertEquals(WRITES_COUNT, getRowVersions(rowId).size()); + } + + @ParameterizedTest + @EnumSource(AddWriteWithGc.class) + void testNewerLwm(AddWriteWithGc writer) { + RowId rowId = fillWithDataForGc(); + + writeWithGc(writer, clock.now()); + + assertEquals(5, getRowVersions(rowId).size()); + } + + private List<ReadResult> getRowVersions(RowId rowId) { + try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) { + return readResults.stream().collect(toList()); + } + } + + private void writeWithGc(AddWriteWithGc writer, @Nullable HybridTimestamp lwm) { + UUID rowUuid = UUID.randomUUID(); + + TestKey key = new TestKey(1337, "leet"); + BinaryRow row = binaryRow(key, new TestValue(999, "bar")); + + writer.addWrite(storageUpdateHandler, rowUuid, row, lwm); + } + + private RowId fillWithDataForGc() { + UUID rowUuid = UUID.randomUUID(); + var rowId = new RowId(PARTITION_ID, rowUuid); + + TestKey key = new TestKey(1, "foo"); + + for (int i = 0; i < WRITES_COUNT; i++) { + BinaryRow row = binaryRow(key, new TestValue(i, "bar" + i)); + + addWrite(storageUpdateHandler, rowUuid, row); + + commitWrite(rowId); + } + + return rowId; + } + + private static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) { + handler.handleUpdate( + TX_ID, + rowUuid, + TABLE_PARTITION_ID, + row == null ? null : row.byteBuffer(), + (unused) -> {} + ); + } + + private void commitWrite(RowId rowId) { + storage.runConsistently(() -> { + storage.commitWrite(rowId, clock.now()); + + return null; + }); + } + + /** Enum that encapsulates update API. */ + enum AddWriteWithGc { + /** Uses update api. */ + USE_UPDATE { + @Override + void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row, + @Nullable HybridTimestamp lwm) { + handler.handleUpdate( + TX_ID, + rowUuid, + partitionId, + row == null ? null : row.byteBuffer(), + (unused) -> {}, + lwm + ); + } + }, + /** Uses updateAll api. */ + USE_UPDATE_ALL { + @Override + void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row, + @Nullable HybridTimestamp lwm) { + handler.handleUpdateAll( + TX_ID, + singletonMap(rowUuid, row == null ? null : row.byteBuffer()), + partitionId, + (unused) -> {}, + lwm + ); + } + }; + + void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row, @Nullable HybridTimestamp lwm) { + TablePartitionId tablePartitionId = new TablePartitionId(UUID.randomUUID(), 1); + + addWrite(handler, tablePartitionId, rowUuid, row, lwm); + } + + abstract void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row, + @Nullable HybridTimestamp lwm); + } +}
