This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new af6aadea04 IGNITE-18739 Add index garbage collection (#1706)
af6aadea04 is described below
commit af6aadea040259f86c3bf659ab6d5e38fd264e69
Author: Semyon Danilov <[email protected]>
AuthorDate: Thu Feb 23 15:46:51 2023 +0400
IGNITE-18739 Add index garbage collection (#1706)
---
.../internal/storage/MvPartitionStorage.java | 3 +-
.../storage/AbstractMvPartitionStorageGcTest.java | 15 ++
.../internal/storage/rocksdb/GarbageCollector.java | 2 +
.../table/distributed/StorageUpdateHandler.java | 37 ++-
.../distributed/TableSchemaAwareIndexStorage.java | 4 +-
.../distributed/raft/PartitionDataStorage.java | 8 +
.../SnapshotAwarePartitionDataStorage.java | 18 +-
.../internal/table/distributed/IndexBaseTest.java | 228 +++++++++++++++++
.../table/distributed/IndexCleanupTest.java | 269 ++++-----------------
.../internal/table/distributed/IndexGcTest.java | 164 +++++++++++++
.../distributed/TestPartitionDataStorage.java | 6 +
11 files changed, 518 insertions(+), 236 deletions(-)
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 5bf5b9b9d3..de3e533078 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -220,7 +220,8 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
/**
* Polls the oldest row in the partition, removing it at the same time.
*
- * @param lowWatermark A time threshold for the row. Rows younger then the
watermark value will not be removed.
+ * @param lowWatermark A time threshold for the row. Only rows that have
versions with timestamp higher or equal to the watermark
+ * can be removed.
* @return A pair of table row and row id, where a timestamp of the row is
less than or equal to {@code lowWatermark}.
* {@code null} if there's no such value.
* @throws StorageException If failed to poll element for vacuum.
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
index c0c3abff25..f98e1822f3 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageGcTest.java
@@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -129,4 +130,18 @@ public abstract class AbstractMvPartitionStorageGcTest
extends BaseMvPartitionSt
// Nothing else to poll.
assertNull(pollForVacuum(lowWatermark));
}
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18882")
+ void testVacuumsSecondRowIfTombstoneIsFirst() {
+ addAndCommit(null);
+
+ addAndCommit(TABLE_ROW);
+
+ addAndCommit(TABLE_ROW2);
+
+ BinaryRowAndRowId row = pollForVacuum(HybridTimestamp.MAX_VALUE);
+
+ assertRowMatches(row.binaryRow(), TABLE_ROW);
+ }
}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index e5bb63e3a8..d150deac98 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -235,6 +235,8 @@ class GarbageCollector {
// At this point there's definitely a value that needs to be
garbage collected in the iterator.
byte[] valueBytes = it.value();
+ assert valueBytes.length > 0; // Can't be a tombstone.
+
var row = new
ByteBufferRow(ByteBuffer.wrap(valueBytes).order(TABLE_ROW_BYTE_ORDER));
BinaryRowAndRowId retVal = new BinaryRowAndRowId(row,
gcRowVersion.getRowId());
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 3bdc0aff09..bad1f2f50a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -28,9 +28,12 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -234,7 +237,7 @@ public class StorageUpdateHandler {
continue;
}
- // If any of the previous versions' index value matches
the index value of
+ // If any of the previous versions' index value equals the
index value of
// the row to remove, then we can't remove that index as
it can still be used.
BinaryTuple previousRowIndex =
index.resolveIndexRow(previousRow);
@@ -252,10 +255,34 @@ public class StorageUpdateHandler {
}
}
- private void removeFromIndex(BinaryRow row, RowId rowId) {
- for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
- index.remove(row, rowId);
- }
+ /**
+ * Tries removing partition's oldest stale entry and its indexes.
+ *
+ * @param lowWatermark Low watermark for the vacuum.
+ * @return {@code true} if an entry was garbage collected, {@code false}
if there was nothing to collect.
+ * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
+ */
+ public boolean vacuum(HybridTimestamp lowWatermark) {
+ return storage.runConsistently(() -> {
+ BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
+
+ if (vacuumed == null) {
+ // Nothing was garbage collected.
+ return false;
+ }
+
+ BinaryRow binaryRow = vacuumed.binaryRow();
+
+ assert binaryRow != null;
+
+ RowId rowId = vacuumed.rowId();
+
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ tryRemoveFromIndexes(binaryRow, rowId, cursor);
+ }
+
+ return true;
+ });
}
private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index c0a530bec6..fd1cd8ae1f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -87,9 +87,9 @@ public class TableSchemaAwareIndexStorage {
* Resolves index row value.
*
* @param row Full row.
- * @return Index value.
+ * @return A tuple that represents indexed columns of a row.
*/
- public BinaryTuple resolveIndexRow(BinaryRow row) {
+ BinaryTuple resolveIndexRow(BinaryRow row) {
return indexRowResolver.apply(row);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
index eacb234335..aa6347bb89 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionDataStorage.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.ReadResult;
@@ -165,6 +166,13 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
*/
Cursor<ReadResult> scanVersions(RowId rowId) throws StorageException;
+ /**
+ * Tries to garbage collect the oldest stale entry of the partition.
+ *
+ * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
+ */
+ @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark);
+
/**
* Returns the underlying {@link MvPartitionStorage}. Only for tests!
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index 7f470d0bc8..f6458d5579 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -21,6 +21,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.ReadResult;
@@ -131,11 +132,21 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
@Override
public Cursor<ReadResult> scanVersions(RowId rowId) throws
StorageException {
- handleSnapshotInterference(rowId);
-
return partitionStorage.scanVersions(rowId);
}
+ @Override
+ public @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return partitionStorage.pollForVacuum(lowWatermark);
+ }
+
+ /**
+ * Handles the situation when snapshots are running concurrently with
write operations.
+ * In case if a row that is going to be changed was not yet sent in an
ongoing snapshot,
+ * schedule an out-of-order sending of said row.
+ *
+ * @param rowId Row id.
+ */
private void handleSnapshotInterference(RowId rowId) {
PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
@@ -144,13 +155,16 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
try {
if (snapshot.alreadyPassed(rowId)) {
+ // Row already sent.
continue;
}
if (!snapshot.addRowIdToSkip(rowId)) {
+ // Already scheduled.
continue;
}
+ // Collect all versions of row and schedule the send operation.
snapshot.enqueueForSending(rowId);
} finally {
snapshot.releaseMvLock();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
new file mode 100644
index 0000000000..cf1e032f9d
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+
+/**
+ * Base test for indexes. Sets up a table with (int, string) key and (int,
string) value and
+ * three indexes: primary key, hash index over value columns and sorted index
over value columns.
+ */
+public abstract class IndexBaseTest extends BaseMvStoragesTest {
+ private static final BinaryTupleSchema TUPLE_SCHEMA =
BinaryTupleSchema.createRowSchema(schemaDescriptor);
+
+ private static final BinaryTupleSchema PK_INDEX_SCHEMA =
BinaryTupleSchema.createKeySchema(schemaDescriptor);
+
+ private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER =
new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
+
+ private static final int[] USER_INDEX_COLS = {
+ schemaDescriptor.column("INTVAL").schemaIndex(),
+ schemaDescriptor.column("STRVAL").schemaIndex()
+ };
+
+ private static final BinaryTupleSchema USER_INDEX_SCHEMA =
BinaryTupleSchema.createSchema(schemaDescriptor, USER_INDEX_COLS);
+
+ private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER
= new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
+
+ private static final UUID TX_ID = UUID.randomUUID();
+
+ TestHashIndexStorage pkInnerStorage;
+ TestSortedIndexStorage sortedInnerStorage;
+ TestHashIndexStorage hashInnerStorage;
+ TestMvPartitionStorage storage;
+ StorageUpdateHandler storageUpdateHandler;
+
+ @BeforeEach
+ void setUp() {
+ UUID pkIndexId = UUID.randomUUID();
+ UUID sortedIndexId = UUID.randomUUID();
+ UUID hashIndexId = UUID.randomUUID();
+
+ pkInnerStorage = new TestHashIndexStorage(null);
+
+ TableSchemaAwareIndexStorage pkStorage = new
TableSchemaAwareIndexStorage(
+ pkIndexId,
+ pkInnerStorage,
+ PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+ );
+
+ sortedInnerStorage = new TestSortedIndexStorage(new
SortedIndexDescriptor(sortedIndexId, List.of(
+ new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32,
false, true),
+ new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING,
false, true)
+ )));
+
+ TableSchemaAwareIndexStorage sortedIndexStorage = new
TableSchemaAwareIndexStorage(
+ sortedIndexId,
+ sortedInnerStorage,
+ USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+ );
+
+ hashInnerStorage = new TestHashIndexStorage(new
HashIndexDescriptor(hashIndexId, List.of(
+ new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32,
false),
+ new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING,
false)
+ )));
+
+ TableSchemaAwareIndexStorage hashIndexStorage = new
TableSchemaAwareIndexStorage(
+ hashIndexId,
+ hashInnerStorage,
+ USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
+ );
+
+ storage = new TestMvPartitionStorage(1);
+
+ storageUpdateHandler = new StorageUpdateHandler(1, new
TestPartitionDataStorage(storage),
+ () -> Map.of(
+ pkIndexId, pkStorage,
+ sortedIndexId, sortedIndexStorage,
+ hashIndexId, hashIndexStorage
+ )
+ );
+ }
+
+ List<ReadResult> getRowVersions(RowId rowId) {
+ try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) {
+ return readResults.stream().collect(toList());
+ }
+ }
+
+ static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable
BinaryRow row) {
+ TablePartitionId partitionId = new TablePartitionId(UUID.randomUUID(),
1);
+
+ handler.handleUpdate(
+ TX_ID,
+ rowUuid,
+ partitionId,
+ row == null ? null : row.byteBuffer(),
+ (unused) -> {}
+ );
+ }
+
+ static BinaryRow defaultRow() {
+ var key = new TestKey(1, "foo");
+ var value = new TestValue(2, "bar");
+
+ return binaryRow(key, value);
+ }
+
+ boolean inAllIndexes(BinaryRow row) {
+ return inIndexes(row, true, true);
+ }
+
+ boolean notInAnyIndex(BinaryRow row) {
+ return inIndexes(row, false, false);
+ }
+
+ boolean inIndexes(BinaryRow row, boolean mustBeInPk, boolean mustBeInUser)
{
+ BinaryTuple pkIndexValue =
PK_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row);
+ BinaryTuple userIndexValue =
USER_INDEX_BINARY_TUPLE_CONVERTER.toTuple(row);
+
+ assert pkIndexValue != null;
+ assert userIndexValue != null;
+
+ try (Cursor<RowId> pkCursor = pkInnerStorage.get(pkIndexValue)) {
+ if (pkCursor.hasNext() != mustBeInPk) {
+ return false;
+ }
+ }
+
+ try (Cursor<RowId> sortedIdxCursor =
sortedInnerStorage.get(userIndexValue)) {
+ if (sortedIdxCursor.hasNext() != mustBeInUser) {
+ return false;
+ }
+ }
+
+ try (Cursor<RowId> hashIdxCursor =
hashInnerStorage.get(userIndexValue)) {
+ return hashIdxCursor.hasNext() == mustBeInUser;
+ }
+ }
+
+ HybridTimestamp now() {
+ return clock.now();
+ }
+
+ void commitWrite(RowId rowId) {
+ storage.runConsistently(() -> {
+ storage.commitWrite(rowId, now());
+
+ return null;
+ });
+ }
+
+ /** Enum that encapsulates update API. */
+ enum AddWrite {
+ /** Uses update api. */
+ USE_UPDATE {
+ @Override
+ void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+ handler.handleUpdate(
+ TX_ID,
+ rowUuid,
+ partitionId,
+ row == null ? null : row.byteBuffer(),
+ (unused) -> {}
+ );
+ }
+ },
+ /** Uses updateAll api. */
+ USE_UPDATE_ALL {
+ @Override
+ void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+ handler.handleUpdateAll(
+ TX_ID,
+ singletonMap(rowUuid, row == null ? null :
row.byteBuffer()),
+ partitionId,
+ (unused) -> {}
+ );
+ }
+ };
+
+ void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable
BinaryRow row) {
+ TablePartitionId tablePartitionId = new
TablePartitionId(UUID.randomUUID(), 1);
+
+ addWrite(handler, tablePartitionId, rowUuid, row);
+ }
+
+ abstract void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row);
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
index 44ba9a280d..12b7780727 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -17,144 +17,30 @@
package org.apache.ignite.internal.table.distributed;
-import static java.util.Collections.singletonMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import org.apache.ignite.distributed.TestPartitionDataStorage;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.BinaryRowConverter;
-import org.apache.ignite.internal.schema.BinaryTupleSchema;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.NativeTypes;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
-import org.apache.ignite.internal.schema.marshaller.MarshallerException;
-import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
-import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
-import org.apache.ignite.internal.storage.index.HashIndexDescriptor;
-import
org.apache.ignite.internal.storage.index.HashIndexDescriptor.HashIndexColumnDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import
org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
-import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
-import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
-import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
/** Tests indexes cleaning up on data removal and transaction abortions. */
-public class IndexCleanupTest {
- /** Default reflection marshaller factory. */
- private static final MarshallerFactory MARSHALLER_FACTORY = new
ReflectionMarshallerFactory();
-
- private static final SchemaDescriptor SCHEMA_DESCRIPTOR = new
SchemaDescriptor(1, new Column[]{
- new Column("INTKEY", NativeTypes.INT32, false),
- new Column("STRKEY", NativeTypes.STRING, false),
- }, new Column[]{
- new Column("INTVAL", NativeTypes.INT32, false),
- new Column("STRVAL", NativeTypes.STRING, false),
- });
-
- private static final BinaryTupleSchema TUPLE_SCHEMA =
BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
-
- private static final BinaryTupleSchema PK_INDEX_SCHEMA =
BinaryTupleSchema.createKeySchema(SCHEMA_DESCRIPTOR);
-
- private static final BinaryRowConverter PK_INDEX_BINARY_TUPLE_CONVERTER =
new BinaryRowConverter(TUPLE_SCHEMA, PK_INDEX_SCHEMA);
-
- private static final int[] USER_INDEX_COLS = {
- SCHEMA_DESCRIPTOR.column("INTVAL").schemaIndex(),
- SCHEMA_DESCRIPTOR.column("STRVAL").schemaIndex()
- };
-
- private static final BinaryTupleSchema USER_INDEX_SCHEMA =
BinaryTupleSchema.createSchema(SCHEMA_DESCRIPTOR, USER_INDEX_COLS);
-
- private static final BinaryRowConverter USER_INDEX_BINARY_TUPLE_CONVERTER
= new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
-
- /** Key-value marshaller for tests. */
- private static final KvMarshaller<TestKey, TestValue> KV_MARSHALLER
- = MARSHALLER_FACTORY.create(SCHEMA_DESCRIPTOR, TestKey.class,
TestValue.class);
-
- private static final UUID TX_ID = UUID.randomUUID();
-
- private static final HybridClock CLOCK = new HybridClockImpl();
-
- private TestHashIndexStorage pkInnerStorage;
- private TestSortedIndexStorage sortedInnerStorage;
- private TestHashIndexStorage hashInnerStorage;
- private TestMvPartitionStorage storage;
- private StorageUpdateHandler storageUpdateHandler;
-
- @BeforeEach
- void setUp() {
- UUID pkIndexId = UUID.randomUUID();
- UUID sortedIndexId = UUID.randomUUID();
- UUID hashIndexId = UUID.randomUUID();
-
- pkInnerStorage = new TestHashIndexStorage(null);
-
- TableSchemaAwareIndexStorage pkStorage = new
TableSchemaAwareIndexStorage(
- pkIndexId,
- pkInnerStorage,
- PK_INDEX_BINARY_TUPLE_CONVERTER::toTuple
- );
-
- sortedInnerStorage = new TestSortedIndexStorage(new
SortedIndexDescriptor(sortedIndexId, List.of(
- new SortedIndexColumnDescriptor("INTVAL", NativeTypes.INT32,
false, true),
- new SortedIndexColumnDescriptor("STRVAL", NativeTypes.STRING,
false, true)
- )));
-
- TableSchemaAwareIndexStorage sortedIndexStorage = new
TableSchemaAwareIndexStorage(
- sortedIndexId,
- sortedInnerStorage,
- USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
- );
-
- hashInnerStorage = new TestHashIndexStorage(new
HashIndexDescriptor(hashIndexId, List.of(
- new HashIndexColumnDescriptor("INTVAL", NativeTypes.INT32,
false),
- new HashIndexColumnDescriptor("STRVAL", NativeTypes.STRING,
false)
- )));
-
- TableSchemaAwareIndexStorage hashIndexStorage = new
TableSchemaAwareIndexStorage(
- hashIndexId,
- hashInnerStorage,
- USER_INDEX_BINARY_TUPLE_CONVERTER::toTuple
- );
-
- storage = new TestMvPartitionStorage(1);
-
- storageUpdateHandler = new StorageUpdateHandler(1, new
TestPartitionDataStorage(storage),
- () -> Map.of(
- pkIndexId, pkStorage,
- sortedIndexId, sortedIndexStorage,
- hashIndexId, hashIndexStorage
- )
- );
- }
-
+public class IndexCleanupTest extends IndexBaseTest {
@ParameterizedTest
@EnumSource(AddWrite.class)
void testAbort(AddWrite writer) {
UUID rowUuid = UUID.randomUUID();
RowId rowId = new RowId(1, rowUuid);
- var key = new TestKey(1, "foo");
- var value = new TestValue(2, "bar");
- BinaryRow tableRow = binaryRow(key, value);
+ BinaryRow row = defaultRow();
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
assertEquals(1, storage.rowsCount());
assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
@@ -175,11 +61,9 @@ public class IndexCleanupTest {
UUID rowUuid = UUID.randomUUID();
RowId rowId = new RowId(1, rowUuid);
- var key = new TestKey(1, "foo");
- var value = new TestValue(2, "bar");
- BinaryRow tableRow = binaryRow(key, value);
+ BinaryRow row = defaultRow();
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
// Write intent is in the storage.
assertEquals(1, storage.rowsCount());
@@ -206,11 +90,9 @@ public class IndexCleanupTest {
UUID rowUuid = UUID.randomUUID();
RowId rowId = new RowId(1, rowUuid);
- var key = new TestKey(1, "foo");
- var value = new TestValue(2, "bar");
- BinaryRow tableRow = binaryRow(key, value);
+ BinaryRow row = defaultRow();
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
writer.addWrite(storageUpdateHandler, rowUuid, null);
storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () ->
{});
@@ -257,15 +139,37 @@ public class IndexCleanupTest {
@ParameterizedTest
@EnumSource(AddWrite.class)
- void testIndexNotRemovedOnTombstone(AddWrite writer) {
+ void testAbortConsecutiveTxWithMatchingIndexesSameRow(AddWrite writer) {
UUID rowUuid = UUID.randomUUID();
RowId rowId = new RowId(1, rowUuid);
var key = new TestKey(1, "foo");
- var value = new TestValue(2, "bar");
- BinaryRow tableRow = binaryRow(key, value);
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ BinaryRow row1 = binaryRow(key, new TestValue(2, "bar"));
+ BinaryRow row2 = binaryRow(key, new TestValue(3, "baz"));
+
+ writer.addWrite(storageUpdateHandler, rowUuid, row1);
+ commitWrite(rowId);
+
+ writer.addWrite(storageUpdateHandler, rowUuid, row2);
+
+ storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () ->
{});
+
+ assertEquals(1, storage.rowsCount());
+
+ assertTrue(inAllIndexes(row1));
+ assertTrue(inIndexes(row2, true, false));
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWrite.class)
+ void testIndexNotRemovedOnTombstone(AddWrite writer) {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ BinaryRow row = defaultRow();
+
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
commitWrite(rowId);
writer.addWrite(storageUpdateHandler, rowUuid, null);
@@ -284,14 +188,12 @@ public class IndexCleanupTest {
UUID rowUuid = UUID.randomUUID();
RowId rowId = new RowId(1, rowUuid);
- var key = new TestKey(1, "foo");
- var value = new TestValue(2, "bar");
- BinaryRow tableRow = binaryRow(key, value);
+ BinaryRow row = defaultRow();
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
commitWrite(rowId);
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
storageUpdateHandler.handleTransactionAbortion(Set.of(rowId), () ->
{});
@@ -307,14 +209,12 @@ public class IndexCleanupTest {
UUID rowUuid = UUID.randomUUID();
RowId rowId = new RowId(1, rowUuid);
- var key = new TestKey(1, "foo");
- var value = new TestValue(2, "bar");
- BinaryRow tableRow = binaryRow(key, value);
+ BinaryRow row = defaultRow();
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
commitWrite(rowId);
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
writer.addWrite(storageUpdateHandler, rowUuid, null);
assertEquals(1, storage.rowsCount());
@@ -329,17 +229,15 @@ public class IndexCleanupTest {
UUID rowUuid = UUID.randomUUID();
RowId rowId = new RowId(1, rowUuid);
- var key = new TestKey(1, "foo");
- var value = new TestValue(2, "bar");
- BinaryRow tableRow = binaryRow(key, value);
+ BinaryRow row = defaultRow();
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
commitWrite(rowId);
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
commitWrite(rowId);
- writer.addWrite(storageUpdateHandler, rowUuid, tableRow);
+ writer.addWrite(storageUpdateHandler, rowUuid, row);
writer.addWrite(storageUpdateHandler, rowUuid, null);
assertEquals(1, storage.rowsCount());
@@ -347,85 +245,4 @@ public class IndexCleanupTest {
assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
}
-
- /** Enum that encapsulates update API. */
- enum AddWrite {
- /** Uses update api. */
- USE_UPDATE {
- @Override
- void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
- handler.handleUpdate(
- TX_ID,
- rowUuid,
- partitionId,
- row == null ? null : row.byteBuffer(),
- (unused) -> {}
- );
- }
- },
- /** Uses updateAll api. */
- USE_UPDATE_ALL {
- @Override
- void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row) {
- handler.handleUpdateAll(
- TX_ID,
- singletonMap(rowUuid, row == null ? null :
row.byteBuffer()),
- partitionId,
- (unused) -> {}
- );
- }
- };
-
- void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable
BinaryRow row) {
- TablePartitionId tablePartitionId = new
TablePartitionId(UUID.randomUUID(), 1);
-
- addWrite(handler, tablePartitionId, rowUuid, row);
- }
-
- abstract void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row);
- }
-
- private static BinaryRow binaryRow(TestKey key, TestValue value) {
- try {
- return KV_MARSHALLER.marshal(key, value);
- } catch (MarshallerException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void commitWrite(RowId rowId) {
- storage.runConsistently(() -> {
- storage.commitWrite(rowId, CLOCK.now());
-
- return null;
- });
- }
-
- private static class TestKey {
- int intKey;
-
- String strKey;
-
- TestKey() {
- }
-
- TestKey(int intKey, String strKey) {
- this.intKey = intKey;
- this.strKey = strKey;
- }
- }
-
- private static class TestValue {
- Integer intVal;
-
- String strVal;
-
- TestValue() {
- }
-
- TestValue(Integer intVal, String strVal) {
- this.intVal = intVal;
- this.strVal = strVal;
- }
- }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
new file mode 100644
index 0000000000..b8f09c2229
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+import org.junit.jupiter.api.Test;
+
+/** Tests indexes cleaning up on garbage collection. */
+public class IndexGcTest extends IndexBaseTest {
+ @Test
+ void testRemoveStaleEntryWithSameIndex() {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ BinaryRow row = defaultRow();
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+ commitWrite(rowId);
+
+ assertTrue(storageUpdateHandler.vacuum(now()));
+
+ assertEquals(1, getRowVersions(rowId).size());
+ // Newer entry has the same index value, so it should not be removed.
+ assertTrue(inAllIndexes(row));
+ }
+
+ @Test
+ void testRemoveStaleEntriesWithDifferentIndexes() {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ var key = new TestKey(1, "foo");
+
+ BinaryRow row1 = binaryRow(key, new TestValue(2, "bar"));
+ BinaryRow row2 = binaryRow(key, new TestValue(5, "baz"));
+
+ addWrite(storageUpdateHandler, rowUuid, row1);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, row1);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, row2);
+ commitWrite(rowId);
+
+ HybridTimestamp afterCommits = now();
+
+ assertTrue(storageUpdateHandler.vacuum(afterCommits));
+
+ // row1 should still be in the index, because second write was
identical to the first.
+ assertTrue(inAllIndexes(row1));
+
+ assertTrue(storageUpdateHandler.vacuum(afterCommits));
+ assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+ assertEquals(1, getRowVersions(rowId).size());
+ // Older entries have different indexes, should be removed.
+ assertTrue(inIndexes(row1, true, false));
+ assertTrue(inAllIndexes(row2));
+ }
+
+ @Test
+ void testRemoveTombstonesRowNullNull() {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ BinaryRow row = defaultRow();
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, null);
+ commitWrite(rowId);
+
+ // Second tombstone won't be actually put into the storage, but still,
let's check.
+ addWrite(storageUpdateHandler, rowUuid, null);
+ commitWrite(rowId);
+
+ HybridTimestamp afterCommits = now();
+
+ assertTrue(storageUpdateHandler.vacuum(afterCommits));
+ assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+ assertEquals(0, getRowVersions(rowId).size());
+ // The last entry was a tombstone, so no indexes should be left.
+ assertTrue(notInAnyIndex(row));
+ }
+
+ @Test
+ void testRemoveTombstonesRowNullRow() {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ BinaryRow row = defaultRow();
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, null);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+ commitWrite(rowId);
+
+ HybridTimestamp afterCommits = now();
+
+ assertTrue(storageUpdateHandler.vacuum(afterCommits));
+ assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+ assertEquals(1, getRowVersions(rowId).size());
+ assertTrue(inAllIndexes(row));
+ }
+
+ @Test
+ void testRemoveTombstonesRowRowNull() {
+ UUID rowUuid = UUID.randomUUID();
+ RowId rowId = new RowId(1, rowUuid);
+
+ BinaryRow row = defaultRow();
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+ commitWrite(rowId);
+
+ addWrite(storageUpdateHandler, rowUuid, null);
+ commitWrite(rowId);
+
+ HybridTimestamp afterCommits = now();
+
+ assertTrue(storageUpdateHandler.vacuum(afterCommits));
+ assertTrue(storageUpdateHandler.vacuum(afterCommits));
+ assertFalse(storageUpdateHandler.vacuum(afterCommits));
+
+ assertEquals(0, getRowVersions(rowId).size());
+ assertTrue(notInAnyIndex(row));
+ }
+}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index eb891ebeeb..022d9c863b 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.ReadResult;
@@ -111,6 +112,11 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
return partitionStorage.scanVersions(rowId);
}
+ @Override
+ public @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp
lowWatermark) {
+ return partitionStorage.pollForVacuum(lowWatermark);
+ }
+
@Override
public MvPartitionStorage getStorage() {
return partitionStorage;