This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-18739 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit af2e134692bdb0f0e22b03c909e1acbde54bd503 Author: Semyon Danilov <[email protected]> AuthorDate: Wed Feb 22 13:09:33 2023 +0400 IGNITE-18739 Add index garbage collection --- .../table/distributed/StorageUpdateHandler.java | 34 ++- .../distributed/raft/PartitionDataStorage.java | 9 + .../SnapshotAwarePartitionDataStorage.java | 6 + .../internal/table/distributed/IndexGcTest.java | 300 +++++++++++++++++++++ .../distributed/TestPartitionDataStorage.java | 6 + 5 files changed, 351 insertions(+), 4 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java index 3bdc0aff09..6958cc3404 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java @@ -28,9 +28,11 @@ import java.util.Set; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.storage.BinaryRowAndRowId; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage; @@ -252,10 +254,34 @@ public class StorageUpdateHandler { } } - private void removeFromIndex(BinaryRow row, RowId rowId) { - for (TableSchemaAwareIndexStorage index : indexes.get().values()) { - index.remove(row, rowId); - } + /** + * Tries removing the oldest stale entry in the partition. + * + * @param lowWatermark Low watermark for the vacuum. + */ + public boolean vacuum(HybridTimestamp lowWatermark) { + return storage.runConsistently(() -> { + BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark); + + if (vacuumed == null) { + // Nothing was garbage collected. + return false; + } + + BinaryRow binaryRow = vacuumed.binaryRow(); + + assert binaryRow != null; + + RowId rowId = vacuumed.rowId(); + + try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { + assert cursor.hasNext(); + + tryRemoveFromIndexes(binaryRow, rowId, cursor); + } + + return true; + }); } private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java index 5b9ac5b6e1..7a6ca8f546 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.BinaryRowAndRowId; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.RaftGroupConfiguration; @@ -175,6 +176,14 @@ public interface PartitionDataStorage extends ManuallyCloseable { */ Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException; + /** + * Tries to garbage collect the oldest stale entry of the partition. + * + * @see MvPartitionStorage#pollForVacuum(HybridTimestamp) + */ + @Nullable + BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark); + /** * Returns the underlying {@link MvPartitionStorage}. Only for tests! * diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java index a5bf3601d9..d50e404eba 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java @@ -21,6 +21,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.BinaryRowAndRowId; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.RaftGroupConfiguration; @@ -138,6 +139,11 @@ public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage { return partitionStorage.scanVersions(rowId); } + @Override + public BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) { + return partitionStorage.pollForVacuum(lowWatermark); + } + private void handleSnapshotInterference(RowId rowId) { PartitionSnapshots partitionSnapshots = getPartitionSnapshots(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java new file mode 100644 index 0000000000..437db12836 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.distributed.TestPartitionDataStorage; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowConverter; +import org.apache.ignite.internal.schema.BinaryTuple; +import org.apache.ignite.internal.schema.BinaryTupleSchema; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.marshaller.KvMarshaller; +import org.apache.ignite.internal.schema.marshaller.MarshallerException; +import org.apache.ignite.internal.schema.marshaller.MarshallerFactory; +import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; +import org.apache.ignite.internal.storage.index.HashIndexDescriptor; +import org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor; +import org.apache.ignite.internal.storage.index.SortedIndexDescriptor; +import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor; +import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; +import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage; +import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** Tests indexes cleaning up on garbage collection. */ +public class IndexGcTest { + /** Default reflection marshaller factory. */ + private static final MarshallerFactory MARSHALLER_FACTORY = new ReflectionMarshallerFactory(); + + private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new SchemaDescriptor(1, new Column[]{ + new Column("INTKEY", NativeTypes.INT32, false), + new Column("STRKEY", NativeTypes.STRING, false), + }, new Column[]{ + new Column("INTVAL", NativeTypes.INT32, false), + new Column("STRVAL", NativeTypes.STRING, false), + }); + + private static final BinaryTupleSchema TUPLE_SCHEMA = BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR); + + private static final BinaryTupleSchema PK_INDEX_SCHEMA = BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR); + + private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER = new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA); + + private static final int[] USER_INDEX_COLS = { + SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(), + SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex() + }; + + private static final BinaryTupleSchema USER_INDEX_SCHEMA = BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS); + + private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER = new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA); + + /** Key-value marshaller for tests. */ + private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER + = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class, TestValue.class); + + private static final UUID TX_ID = UUID.randomUUID(); + + private static final HybridClock CLOCK = new HybridClockImpl(); + + private TestHashIndexStorage pkInnerStorage; + private TestSortedIndexStorage sortedInnerStorage; + private TestHashIndexStorage hashInnerStorage; + private TestMvPartitionStorage storage; + private StorageUpdateHandler storageUpdateHandler; + + @BeforeEach + void setUp() { + UUID pkIndexId = UUID.randomUUID(); + UUID sortedIndexId = UUID.randomUUID(); + UUID hashIndexId = UUID.randomUUID(); + + pkInnerStorage = new TestHashIndexStorage(null); + + TableSchemaAwareIndexStorage pkStorage = new TableSchemaAwareIndexStorage( + pkIndexId, + pkInnerStorage, + PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple + ); + + sortedInnerStorage = new TestSortedIndexStorage(new SortedIndexDescriptor(sortedIndexId, List.of( + new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32, false, true), + new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING, false, true) + ))); + + TableSchemaAwareIndexStorage sortedIndexStorage = new TableSchemaAwareIndexStorage( + sortedIndexId, + sortedInnerStorage, + USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple + ); + + hashInnerStorage = new TestHashIndexStorage(new HashIndexDescriptor(hashIndexId, List.of( + new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32, false), + new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING, false) + ))); + + TableSchemaAwareIndexStorage hashIndexStorage = new TableSchemaAwareIndexStorage( + hashIndexId, + hashInnerStorage, + USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple + ); + + storage = new TestMvPartitionStorage(1); + + storageUpdateHandler = new StorageUpdateHandler(1, new TestPartitionDataStorage(storage), + () -> Map.of( + pkIndexId, pkStorage, + sortedIndexId, sortedIndexStorage, + hashIndexId, hashIndexStorage + ) + ); + } + + @Test + void test1() { + UUID rowUuid = UUID.randomUUID(); + RowId rowId = new RowId(1, rowUuid); + + var key = new TestKey(1, "foo"); + var value = new TestValue(2, "bar"); + BinaryRow tableRow = binaryRow(key, value); + + addWrite(storageUpdateHandler, rowUuid, tableRow); + commitWrite(rowId); + + addWrite(storageUpdateHandler, rowUuid, tableRow); + commitWrite(rowId); + + assertEquals(2, getRowVersions(rowId).size()); + assertThat(pkInnerStorage.allRowsIds(), contains(rowId)); + assertThat(sortedInnerStorage.allRowsIds(), contains(rowId)); + assertThat(hashInnerStorage.allRowsIds(), contains(rowId)); + + assertTrue(storageUpdateHandler.vacuum(CLOCK.now())); + + assertEquals(1, getRowVersions(rowId).size()); + assertTrue(inIndex(tableRow)); + } + + @Test + void test2() { + UUID rowUuid = UUID.randomUUID(); + RowId rowId = new RowId(1, rowUuid); + + var key1 = new TestKey(1, "foo"); + var value1 = new TestValue(2, "bar"); + BinaryRow tableRow1 = binaryRow(key1, value1); + + var key2 = new TestKey(1, "foo"); + var value2 = new TestValue(5, "baz"); + BinaryRow tableRow2 = binaryRow(key2, value2); + + addWrite(storageUpdateHandler, rowUuid, tableRow1); + commitWrite(rowId); + + addWrite(storageUpdateHandler, rowUuid, tableRow1); + commitWrite(rowId); + + addWrite(storageUpdateHandler, rowUuid, tableRow2); + commitWrite(rowId); + + assertEquals(3, getRowVersions(rowId).size()); + assertThat(pkInnerStorage.allRowsIds(), contains(rowId)); + assertThat(sortedInnerStorage.allRowsIds(), contains(rowId)); + assertThat(hashInnerStorage.allRowsIds(), contains(rowId)); + + HybridTimestamp afterCommits = CLOCK.now(); + + assertTrue(storageUpdateHandler.vacuum(afterCommits)); + assertTrue(storageUpdateHandler.vacuum(afterCommits)); + assertFalse(storageUpdateHandler.vacuum(afterCommits)); + + assertEquals(1, getRowVersions(rowId).size()); + assertFalse(inIndex(tableRow1)); + assertTrue(inIndex(tableRow2)); + } + + private boolean inIndex(BinaryRow row) { + BinaryTuple pkIndexValue = PK_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row); + BinaryTuple userIndexValue = USER_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row); + + assert pkIndexValue != null; + assert userIndexValue != null; + + try (Cursor<RowId> pkCursor = pkInnerStorage.get(pkIndexValue)) { + if (!pkCursor.hasNext()) { + return false; + } + } + + try (Cursor<RowId> sortedIdxCursor = sortedInnerStorage.get(userIndexValue)) { + if (!sortedIdxCursor.hasNext()) { + return false; + } + } + + try (Cursor<RowId> hashIdxCursor = hashInnerStorage.get(userIndexValue)) { + return hashIdxCursor.hasNext(); + } + } + + private List<ReadResult> getRowVersions(RowId rowId) { + try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) { + return readResults.stream().collect(Collectors.toList()); + } + } + + private static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) { + TablePartitionId partitionId = new TablePartitionId(UUID.randomUUID(), 1); + + handler.handleUpdate( + TX_ID, + rowUuid, + partitionId, + row == null ? null : row.byteBuffer(), + (unused) -> {} + ); + } + + private static BinaryRow binaryRow(TestKey key, TestValue value) { + try { + return KV_MARSHALLER.marshal(key, value); + } catch (MarshallerException e) { + throw new RuntimeException(e); + } + } + + private HybridTimestamp commitWrite(RowId rowId) { + return storage.runConsistently(() -> { + HybridTimestamp commitTimestamp = CLOCK.now(); + + storage.commitWrite(rowId, commitTimestamp); + + return commitTimestamp; + }); + } + + private static class TestKey { + int intKey; + + String strKey; + + TestKey() { + } + + TestKey(int intKey, String strKey) { + this.intKey = intKey; + this.strKey = strKey; + } + } + + private static class TestValue { + Integer intVal; + + String strVal; + + TestValue() { + } + + TestValue(Integer intVal, String strVal) { + this.intVal = intVal; + this.strVal = strVal; + } + } +} diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java index 3391eff013..1d6093abd8 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.BinaryRowAndRowId; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure; import org.apache.ignite.internal.storage.RaftGroupConfiguration; @@ -113,6 +114,11 @@ public class TestPartitionDataStorage implements PartitionDataStorage { return partitionStorage.scanVersions(rowId); } + @Override + public BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) { + return partitionStorage.pollForVacuum(lowWatermark); + } + @Override public MvPartitionStorage getStorage() { return partitionStorage;
