This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new bbb715d6a4 IGNITE-18033 Implement cooperative GC of MV data during
RAFT commands execution (#1716)
bbb715d6a4 is described below
commit bbb715d6a40d656b82b613decc44ab8f9d0db840
Author: Semyon Danilov <[email protected]>
AuthorDate: Tue Feb 28 20:03:29 2023 +0400
IGNITE-18033 Implement cooperative GC of MV data during RAFT commands
execution (#1716)
---
.../storage/DataStorageConfigurationSchema.java | 5 +
.../ignite/distributed/ItTablePersistenceTest.java | 3 +-
.../distributed/ItTxDistributedTestSingleNode.java | 6 +-
.../table/distributed/StorageUpdateHandler.java | 143 +++++++++++++--
.../internal/table/distributed/TableManager.java | 15 +-
.../internal/table/distributed/IndexBaseTest.java | 10 +-
.../PartitionGcOnWriteConcurrentTest.java | 160 +++++++++++++++++
.../table/distributed/PartitionGcOnWriteTest.java | 191 +++++++++++++++++++++
.../raft/PartitionCommandListenerTest.java | 14 +-
.../PartitionReplicaListenerIndexLockingTest.java | 10 +-
.../replication/PartitionReplicaListenerTest.java | 10 +-
.../table/impl/DummyInternalTableImpl.java | 10 +-
12 files changed, 543 insertions(+), 34 deletions(-)
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
index e32e6b65aa..eb83d38467 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.schema.configuration.storage;
import org.apache.ignite.configuration.annotation.PolymorphicConfig;
import org.apache.ignite.configuration.annotation.PolymorphicId;
+import org.apache.ignite.configuration.annotation.Value;
/**
* Configuration schema for data storage.
@@ -28,4 +29,8 @@ public class DataStorageConfigurationSchema {
/** Name of data storage. */
@PolymorphicId(hasDefault = true)
public String name =
UnknownDataStorageConfigurationSchema.UNKNOWN_DATA_STORAGE;
+
+ /** The number of entries in the storage to be garbage collected during a
storage update operation. */
+ @Value(hasDefault = true)
+ public int gcOnUpdateBatchSize = 5;
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index e4bc29511f..36fcabbeb1 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -372,7 +372,8 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartitionStorage);
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(0, partitionDataStorage, Map::of);
+ StorageUpdateHandler storageUpdateHandler =
+ new StorageUpdateHandler(0, partitionDataStorage,
Map::of, tableCfg.dataStorage());
PartitionListener listener = new PartitionListener(
partitionDataStorage,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 2e0d70a14c..80755377d8 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
@@ -123,6 +124,9 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
@InjectConfiguration
private static RaftConfiguration raftConfiguration;
+ @InjectConfiguration
+ private static DataStorageConfiguration dsCfg;
+
private static final IgniteLogger LOG =
Loggers.forClass(ItTxDistributedTestSingleNode.class);
public static final int NODE_PORT_BASE = 20_000;
@@ -422,7 +426,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMpPartStorage);
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = ()
-> Map.of(pkStorage.get().id(), pkStorage.get());
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(partId, partitionDataStorage, indexes);
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(partId, partitionDataStorage, indexes, dsCfg);
CompletableFuture<Void> partitionReadyFuture =
raftServers.get(assignment).startRaftGroupNode(
new RaftNodeId(grpId, configuration.peer(assignment)),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index bad1f2f50a..16e21d1825 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -26,14 +26,17 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.storage.BinaryRowAndRowId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -53,21 +56,34 @@ public class StorageUpdateHandler {
private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
+ /** Last recorded GC low watermark. */
+ private final AtomicReference<HybridTimestamp> lastRecordedLwm = new
AtomicReference<>();
+
+ /** Data storage configuration. */
+ private final DataStorageConfiguration dsCfg;
+
/**
* The constructor.
*
* @param partitionId Partition id.
* @param storage Partition data storage.
* @param indexes Indexes supplier.
+ * @param dsCfg Data storage configuration.
*/
- public StorageUpdateHandler(int partitionId, PartitionDataStorage storage,
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes) {
+ public StorageUpdateHandler(
+ int partitionId,
+ PartitionDataStorage storage,
+ Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
+ DataStorageConfiguration dsCfg
+ ) {
this.partitionId = partitionId;
this.storage = storage;
this.indexes = indexes;
+ this.dsCfg = dsCfg;
}
/**
- * Handle single update.
+ * Handles single update.
*
* @param txId Transaction id.
* @param rowUuid Row UUID.
@@ -81,8 +97,32 @@ public class StorageUpdateHandler {
TablePartitionId commitPartitionId,
@Nullable ByteBuffer rowBuffer,
@Nullable Consumer<RowId> onReplication
+ ) {
+ handleUpdate(txId, rowUuid, commitPartitionId, rowBuffer,
onReplication, null);
+ }
+
+ /**
+ * Handles single update.
+ *
+ * @param txId Transaction id.
+ * @param rowUuid Row UUID.
+ * @param commitPartitionId Commit partition id.
+ * @param rowBuffer Row buffer.
+ * @param onReplication Callback on replication.
+ * @param lowWatermark GC low watermark.
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-18909 Pass low
watermark.
+ public void handleUpdate(
+ UUID txId,
+ UUID rowUuid,
+ TablePartitionId commitPartitionId,
+ @Nullable ByteBuffer rowBuffer,
+ @Nullable Consumer<RowId> onReplication,
+ @Nullable HybridTimestamp lowWatermark
) {
storage.runConsistently(() -> {
+ executeBatchGc(lowWatermark);
+
BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) :
null;
RowId rowId = new RowId(partitionId, rowUuid);
UUID commitTblId = commitPartitionId.tableId();
@@ -106,7 +146,7 @@ public class StorageUpdateHandler {
}
/**
- * Handle multiple updates.
+ * Handles multiple updates.
*
* @param txId Transaction id.
* @param rowsToUpdate Collection of rows to update.
@@ -118,8 +158,30 @@ public class StorageUpdateHandler {
Map<UUID, ByteBuffer> rowsToUpdate,
TablePartitionId commitPartitionId,
@Nullable Consumer<Collection<RowId>> onReplication
+ ) {
+ handleUpdateAll(txId, rowsToUpdate, commitPartitionId, onReplication,
null);
+ }
+
+ /**
+ * Handle multiple updates.
+ *
+ * @param txId Transaction id.
+ * @param rowsToUpdate Collection of rows to update.
+ * @param commitPartitionId Commit partition id.
+ * @param onReplication On replication callback.
+ * @param lowWatermark GC low watermark.
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-18909 Pass low
watermark.
+ public void handleUpdateAll(
+ UUID txId,
+ Map<UUID, ByteBuffer> rowsToUpdate,
+ TablePartitionId commitPartitionId,
+ @Nullable Consumer<Collection<RowId>> onReplication,
+ @Nullable HybridTimestamp lowWatermark
) {
storage.runConsistently(() -> {
+ executeBatchGc(lowWatermark);
+
UUID commitTblId = commitPartitionId.tableId();
int commitPartId = commitPartitionId.partitionId();
@@ -150,6 +212,28 @@ public class StorageUpdateHandler {
});
}
+ private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
+ if (newLwm == null) {
+ return;
+ }
+
+ @Nullable HybridTimestamp oldLwm;
+ do {
+ oldLwm = lastRecordedLwm.get();
+
+ if (oldLwm != null && newLwm.compareTo(oldLwm) <= 0) {
+ break;
+ }
+ } while (!lastRecordedLwm.compareAndSet(oldLwm, newLwm));
+
+ if (oldLwm == null || newLwm.compareTo(oldLwm) > 0) {
+ // Iff the lwm we have is the new lwm.
+ // Otherwise our newLwm is either smaller than last recorded lwm
or last recorded lwm has changed
+ // concurrently and it become greater. If that's the case, another
thread will perform the GC.
+ vacuumBatch(newLwm, dsCfg.gcOnUpdateBatchSize().value());
+ }
+ }
+
/**
* Tries to remove a previous write from index.
*
@@ -263,26 +347,53 @@ public class StorageUpdateHandler {
* @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
*/
public boolean vacuum(HybridTimestamp lowWatermark) {
- return storage.runConsistently(() -> {
- BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
+ return storage.runConsistently(() -> internalVacuum(lowWatermark));
+ }
- if (vacuumed == null) {
- // Nothing was garbage collected.
- return false;
+ /**
+ * Tries removing {@code count} oldest stale entries and their indexes.
+ * If there's less entries that can be removed, then exits prematurely.
+ *
+ * @param lowWatermark Low watermark for the vacuum.
+ * @param count Count of entries to GC.
+ */
+ private void vacuumBatch(HybridTimestamp lowWatermark, int count) {
+ storage.runConsistently(() -> {
+ for (int i = 0; i < count; i++) {
+ if (!internalVacuum(lowWatermark)) {
+ break;
+ }
}
- BinaryRow binaryRow = vacuumed.binaryRow();
+ return null;
+ });
+ }
- assert binaryRow != null;
+ /**
+ * Executes garbage collection. Must be called inside a {@link
MvPartitionStorage#runConsistently(WriteClosure)} closure.
+ *
+ * @param lowWatermark Low watermark for the vacuum.
+ * @return {@code true} if an entry was garbage collected, {@code false}
if there was nothing to collect.
+ */
+ private boolean internalVacuum(HybridTimestamp lowWatermark) {
+ BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
- RowId rowId = vacuumed.rowId();
+ if (vacuumed == null) {
+ // Nothing was garbage collected.
+ return false;
+ }
- try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
- tryRemoveFromIndexes(binaryRow, rowId, cursor);
- }
+ BinaryRow binaryRow = vacuumed.binaryRow();
- return true;
- });
+ assert binaryRow != null;
+
+ RowId rowId = vacuumed.rowId();
+
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ tryRemoveFromIndexes(binaryRow, rowId, cursor);
+ }
+
+ return true;
}
private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b038f64cd7..62a30dece5 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -742,7 +742,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTbl, partId));
CompletableFuture<StorageUpdateHandler>
storageUpdateHandlerFut = partitionDataStorageFut
- .thenApply(storage -> new StorageUpdateHandler(partId,
storage, table.indexStorageAdapters(partId)));
+ .thenApply(storage -> new StorageUpdateHandler(
+ partId,
+ storage,
+ table.indexStorageAdapters(partId),
+ tblCfg.dataStorage()
+ ));
CompletableFuture<Void> startGroupFut;
@@ -1986,8 +1991,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
PartitionDataStorage partitionDataStorage =
partitionDataStorage(mvPartitionStorage, internalTable, partId);
- StorageUpdateHandler storageUpdateHandler =
- new StorageUpdateHandler(partId,
partitionDataStorage, tbl.indexStorageAdapters(partId));
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(
+ partId,
+ partitionDataStorage,
+ tbl.indexStorageAdapters(partId),
+ tblCfg.dataStorage()
+ );
RaftGroupOptions groupOptions =
groupOptionsForPartition(
internalTable.storage(),
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index cf1e032f9d..17c1009b91 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -24,12 +24,15 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.distributed.TestPartitionDataStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.NativeTypes;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
@@ -44,11 +47,13 @@ import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Base test for indexes. Sets up a table with (int, string) key and (int,
string) value and
* three indexes: primary key, hash index over value columns and sorted index
over value columns.
*/
+@ExtendWith(ConfigurationExtension.class)
public abstract class IndexBaseTest extends BaseMvStoragesTest {
private static final BinaryTupleSchema TUPLE_SCHEMA =
BinaryTupleSchema.createRowSchema(schemaDescriptor);
@@ -74,7 +79,7 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
StorageUpdateHandler storageUpdateHandler;
@BeforeEach
- void setUp() {
+ void setUp(@InjectConfiguration DataStorageConfiguration dsCfg) {
UUID pkIndexId = UUID.randomUUID();
UUID sortedIndexId = UUID.randomUUID();
UUID hashIndexId = UUID.randomUUID();
@@ -116,7 +121,8 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
pkIndexId, pkStorage,
sortedIndexId, sortedIndexStorage,
hashIndexId, hashIndexStorage
- )
+ ),
+ dsCfg
);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
new file mode 100644
index 0000000000..2c3997498a
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests for concurrent cooperative GC (GC that is executed on write). */
+@ExtendWith(ConfigurationExtension.class)
+public class PartitionGcOnWriteConcurrentTest {
+ private static final int PARTITION_ID = 1;
+ private static final TablePartitionId TABLE_PARTITION_ID = new
TablePartitionId(UUID.randomUUID(), PARTITION_ID);
+ private static final HybridClock CLOCK = new HybridClockImpl();
+
+ private MvPartitionStorage storage;
+ private StorageUpdateHandler storageUpdateHandler;
+
+ @BeforeEach
+ void setUp(@InjectConfiguration DataStorageConfiguration dsCfg) {
+ storage = mock(MvPartitionStorage.class);
+ doAnswer(invocation -> {
+ WriteClosure<?> cls = invocation.getArgument(0);
+
+ return cls.execute();
+ }).when(storage).runConsistently(any());
+
+ when(storage.pollForVacuum(any())).thenReturn(null);
+
+ storageUpdateHandler = new StorageUpdateHandler(1, new
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+ }
+
+ @ParameterizedTest
+ @EnumSource(UpdateType.class)
+ void testSameLwm(UpdateType updateType) {
+ HybridTimestamp lwm = CLOCK.now();
+
+ runRace(createRunnable(updateType, lwm), createRunnable(updateType,
lwm));
+
+ verify(storage, times(1)).pollForVacuum(lwm);
+ }
+
+ @ParameterizedTest
+ @EnumSource(UpdateType.class)
+ void testDifferentLwm(UpdateType updateType) {
+ int count = 10;
+
+ HybridTimestamp[] timestamps = new HybridTimestamp[count];
+
+ RunnableX[] runnables = new RunnableX[count];
+
+ for (int i = 0; i < count; i++) {
+ HybridTimestamp ts = CLOCK.now();
+
+ timestamps[i] = ts;
+
+ runnables[i] = createRunnable(updateType, ts);
+ }
+
+ runRace(runnables);
+
+ for (int i = 0; i < count - 1; i++) {
+ verify(storage, atMostOnce()).pollForVacuum(timestamps[i]);
+ }
+
+ verify(storage, times(1)).pollForVacuum(timestamps[count - 1]);
+ }
+
+ @ParameterizedTest
+ @EnumSource(UpdateType.class)
+ void testDifferentLwmWithPreviousVacuums(UpdateType updateType) throws
Throwable {
+ HybridTimestamp lwm1 = CLOCK.now();
+ HybridTimestamp lwm2 = CLOCK.now();
+ HybridTimestamp lwm3 = CLOCK.now();
+
+ createRunnable(updateType, lwm1).run();
+
+ runRace(createRunnable(updateType, lwm2), createRunnable(updateType,
lwm3));
+
+ verify(storage, times(1)).pollForVacuum(lwm1);
+ verify(storage, atMostOnce()).pollForVacuum(lwm2);
+ verify(storage, times(1)).pollForVacuum(lwm3);
+ }
+
+ private RunnableX createRunnable(UpdateType updateType, HybridTimestamp
lwm) {
+ if (updateType == UpdateType.UPDATE) {
+ //noinspection unchecked
+ return () -> storageUpdateHandler.handleUpdate(
+ UUID.randomUUID(),
+ UUID.randomUUID(),
+ TABLE_PARTITION_ID,
+ buffer(),
+ mock(Consumer.class),
+ lwm
+ );
+ } else {
+ //noinspection unchecked
+ return () -> storageUpdateHandler.handleUpdateAll(
+ UUID.randomUUID(),
+ Collections.emptyMap(),
+ TABLE_PARTITION_ID,
+ mock(Consumer.class),
+ lwm
+ );
+ }
+ }
+
+ private static ByteBuffer buffer() {
+ ByteBuffer buf = mock(ByteBuffer.class);
+ when(buf.order()).thenReturn(ByteBufferRow.ORDER);
+
+ return buf;
+ }
+
+ private enum UpdateType {
+ UPDATE,
+ UPDATE_ALL;
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
new file mode 100644
index 0000000000..62a3006d77
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests for cooperative GC (GC that is executed on write). */
+@ExtendWith(ConfigurationExtension.class)
+public class PartitionGcOnWriteTest extends BaseMvStoragesTest {
+ private static final int WRITES_COUNT = 10;
+ private static final int GC_BATCH_SIZE = 7;
+ private static final UUID TX_ID = UUID.randomUUID();
+ private static final int PARTITION_ID = 1;
+ private static final TablePartitionId TABLE_PARTITION_ID = new
TablePartitionId(UUID.randomUUID(), PARTITION_ID);
+
+ private TestMvPartitionStorage storage;
+ private StorageUpdateHandler storageUpdateHandler;
+
+ @BeforeEach
+ void setUp(@InjectConfiguration("mock.gcOnUpdateBatchSize=" +
GC_BATCH_SIZE) DataStorageConfiguration dsCfg) {
+ storage = new TestMvPartitionStorage(1);
+
+ storageUpdateHandler = new StorageUpdateHandler(1, new
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWriteWithGc.class)
+ void testNullLwm(AddWriteWithGc writer) {
+ RowId rowId = fillWithDataForGc();
+
+ writeWithGc(writer, null);
+
+ assertEquals(WRITES_COUNT, getRowVersions(rowId).size());
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWriteWithGc.class)
+ void testOlderLwm(AddWriteWithGc writer) {
+ HybridTimestamp older = clock.now();
+
+ HybridTimestamp newer = clock.now();
+
+ writeWithGc(writer, newer);
+
+ RowId rowId = fillWithDataForGc();
+
+ writeWithGc(writer, older);
+
+ assertEquals(WRITES_COUNT, getRowVersions(rowId).size());
+ }
+
+ @ParameterizedTest
+ @EnumSource(AddWriteWithGc.class)
+ void testNewerLwm(AddWriteWithGc writer) {
+ RowId rowId = fillWithDataForGc();
+
+ writeWithGc(writer, clock.now());
+
+ assertEquals(WRITES_COUNT - GC_BATCH_SIZE,
getRowVersions(rowId).size());
+ }
+
+ private List<ReadResult> getRowVersions(RowId rowId) {
+ try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) {
+ return readResults.stream().collect(toList());
+ }
+ }
+
+ private void writeWithGc(AddWriteWithGc writer, @Nullable HybridTimestamp
lwm) {
+ UUID rowUuid = UUID.randomUUID();
+
+ TestKey key = new TestKey(1337, "leet");
+ BinaryRow row = binaryRow(key, new TestValue(999, "bar"));
+
+ writer.addWrite(storageUpdateHandler, rowUuid, row, lwm);
+ }
+
+ private RowId fillWithDataForGc() {
+ UUID rowUuid = UUID.randomUUID();
+ var rowId = new RowId(PARTITION_ID, rowUuid);
+
+ TestKey key = new TestKey(1, "foo");
+
+ for (int i = 0; i < WRITES_COUNT; i++) {
+ BinaryRow row = binaryRow(key, new TestValue(i, "bar" + i));
+
+ addWrite(storageUpdateHandler, rowUuid, row);
+
+ commitWrite(rowId);
+ }
+
+ return rowId;
+ }
+
+ private static void addWrite(StorageUpdateHandler handler, UUID rowUuid,
@Nullable BinaryRow row) {
+ handler.handleUpdate(
+ TX_ID,
+ rowUuid,
+ TABLE_PARTITION_ID,
+ row == null ? null : row.byteBuffer(),
+ (unused) -> {}
+ );
+ }
+
+ private void commitWrite(RowId rowId) {
+ storage.runConsistently(() -> {
+ storage.commitWrite(rowId, clock.now());
+
+ return null;
+ });
+ }
+
+ /** Enum that encapsulates update API. */
+ enum AddWriteWithGc {
+ /** Uses update api. */
+ USE_UPDATE {
+ @Override
+ void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+ @Nullable HybridTimestamp lwm) {
+ handler.handleUpdate(
+ TX_ID,
+ rowUuid,
+ partitionId,
+ row == null ? null : row.byteBuffer(),
+ (unused) -> {},
+ lwm
+ );
+ }
+ },
+ /** Uses updateAll api. */
+ USE_UPDATE_ALL {
+ @Override
+ void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+ @Nullable HybridTimestamp lwm) {
+ handler.handleUpdateAll(
+ TX_ID,
+ singletonMap(rowUuid, row == null ? null :
row.byteBuffer()),
+ partitionId,
+ (unused) -> {},
+ lwm
+ );
+ }
+ };
+
+ void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable
BinaryRow row, @Nullable HybridTimestamp lwm) {
+ TablePartitionId tablePartitionId = new
TablePartitionId(UUID.randomUUID(), 1);
+
+ addWrite(handler, tablePartitionId, rowUuid, row, lwm);
+ }
+
+ abstract void addWrite(StorageUpdateHandler handler, TablePartitionId
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+ @Nullable HybridTimestamp lwm);
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index c1c2e699c9..7861ae3e31 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -53,6 +53,8 @@ import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.TestHybridClock;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -71,6 +73,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -110,6 +113,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
public class PartitionCommandListenerTest {
/** Key count. */
private static final int KEY_COUNT = 100;
@@ -173,7 +177,7 @@ public class PartitionCommandListenerTest {
* Initializes a table listener before tests.
*/
@BeforeEach
- public void before() {
+ public void before(@InjectConfiguration DataStorageConfiguration dsCfg) {
NetworkAddress addr = new NetworkAddress("127.0.0.1", 5003);
ClusterService clusterService = mock(ClusterService.class,
RETURNS_DEEP_STUBS);
@@ -188,7 +192,7 @@ public class PartitionCommandListenerTest {
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.id(), pkStorage);
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(0, partitionDataStorage, indexes);
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(0, partitionDataStorage, indexes, dsCfg);
commandListener = new PartitionListener(
partitionDataStorage,
@@ -273,14 +277,12 @@ public class PartitionCommandListenerTest {
* the maximal last applied index among storages to all storages.
*/
@Test
- public void testOnSnapshotSavePropagateLastAppliedIndexAndTerm() {
- ReplicaService replicaService = mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
-
+ public void
testOnSnapshotSavePropagateLastAppliedIndexAndTerm(@InjectConfiguration
DataStorageConfiguration dsCfg) {
TestPartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartitionStorage);
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.id(), pkStorage);
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes);
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes, dsCfg);
PartitionListener testCommandListener = new PartitionListener(
partitionDataStorage,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index dc97d41f36..cf880c2585 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -34,6 +34,8 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.distributed.TestPartitionDataStorage;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import org.apache.ignite.internal.schema.marshaller.MarshallerException;
import
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
@@ -83,10 +86,12 @@ import org.hamcrest.CustomMatcher;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
/** There are tests for partition replica listener. */
+@ExtendWith(ConfigurationExtension.class)
public class PartitionReplicaListenerIndexLockingTest extends
IgniteAbstractTest {
private static final int PART_ID = 0;
private static final UUID TABLE_ID = new UUID(0L, 0L);
@@ -108,7 +113,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
private static Function<BinaryRow, BinaryTuple> row2SortKeyConverter;
@BeforeAll
- private static void beforeAll() {
+ public static void beforeAll(@InjectConfiguration DataStorageConfiguration
dsCfg) {
RaftGroupService mockRaftClient = mock(RaftGroupService.class);
when(mockRaftClient.refreshAndGetLeaderWithTerm())
@@ -184,7 +189,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
new StorageUpdateHandler(
PART_ID,
new
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE),
- () -> Map.of(pkStorage.get().id(), pkStorage.get())
+ () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+ dsCfg
),
peer -> true,
CompletableFuture.completedFuture(schemaManager)
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 88a061a482..12a8141194 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -54,6 +54,8 @@ import java.util.stream.IntStream;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -69,6 +71,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import org.apache.ignite.internal.schema.marshaller.MarshallerException;
import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
@@ -123,9 +126,11 @@ import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
/** There are tests for partition replica listener. */
+@ExtendWith(ConfigurationExtension.class)
public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** Partition id. */
private static final int partId = 0;
@@ -230,7 +235,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private static Function<PartitionCommand, CompletableFuture<?>>
raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
@BeforeAll
- private static void beforeAll() {
+ public static void beforeAll(@InjectConfiguration DataStorageConfiguration
dsCfg) {
when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock
-> {
if (!localLeader) {
return completedFuture(new LeaderWithTerm(new
Peer(anotherNode.name()), 1L));
@@ -331,7 +336,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new StorageUpdateHandler(
partId,
partitionDataStorage,
- () -> Map.of(pkStorage.get().id(), pkStorage.get())
+ () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+ dsCfg
),
peer -> localNode.name().equals(peer.consistentId()),
completedFuture(schemaManager)
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 1edcf931a1..71cbce1d0f 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.naming.OperationNotSupportedException;
+import org.apache.ignite.configuration.ConfigurationValue;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
+import
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
@@ -258,7 +260,13 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartStorage);
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () ->
Map.of(pkStorage.get().id(), pkStorage.get());
- StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(PART_ID, partitionDataStorage, indexes);
+
+ DataStorageConfiguration dsCfg = mock(DataStorageConfiguration.class);
+ ConfigurationValue<Integer> gcBatchSizeValue =
mock(ConfigurationValue.class);
+ lenient().when(gcBatchSizeValue.value()).thenReturn(5);
+
lenient().when(dsCfg.gcOnUpdateBatchSize()).thenReturn(gcBatchSizeValue);
+
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(PART_ID, partitionDataStorage, indexes, dsCfg);
DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schema);