This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new bbb715d6a4 IGNITE-18033 Implement cooperative GC of MV data during 
RAFT commands execution (#1716)
bbb715d6a4 is described below

commit bbb715d6a40d656b82b613decc44ab8f9d0db840
Author: Semyon Danilov <[email protected]>
AuthorDate: Tue Feb 28 20:03:29 2023 +0400

    IGNITE-18033 Implement cooperative GC of MV data during RAFT commands 
execution (#1716)
---
 .../storage/DataStorageConfigurationSchema.java    |   5 +
 .../ignite/distributed/ItTablePersistenceTest.java |   3 +-
 .../distributed/ItTxDistributedTestSingleNode.java |   6 +-
 .../table/distributed/StorageUpdateHandler.java    | 143 +++++++++++++--
 .../internal/table/distributed/TableManager.java   |  15 +-
 .../internal/table/distributed/IndexBaseTest.java  |  10 +-
 .../PartitionGcOnWriteConcurrentTest.java          | 160 +++++++++++++++++
 .../table/distributed/PartitionGcOnWriteTest.java  | 191 +++++++++++++++++++++
 .../raft/PartitionCommandListenerTest.java         |  14 +-
 .../PartitionReplicaListenerIndexLockingTest.java  |  10 +-
 .../replication/PartitionReplicaListenerTest.java  |  10 +-
 .../table/impl/DummyInternalTableImpl.java         |  10 +-
 12 files changed, 543 insertions(+), 34 deletions(-)

diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
index e32e6b65aa..eb83d38467 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/storage/DataStorageConfigurationSchema.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.schema.configuration.storage;
 
 import org.apache.ignite.configuration.annotation.PolymorphicConfig;
 import org.apache.ignite.configuration.annotation.PolymorphicId;
+import org.apache.ignite.configuration.annotation.Value;
 
 /**
  * Configuration schema for data storage.
@@ -28,4 +29,8 @@ public class DataStorageConfigurationSchema {
     /** Name of data storage. */
     @PolymorphicId(hasDefault = true)
     public String name = 
UnknownDataStorageConfigurationSchema.UNKNOWN_DATA_STORAGE;
+
+    /** The number of entries in the storage to be garbage collected during a 
storage update operation. */
+    @Value(hasDefault = true)
+    public int gcOnUpdateBatchSize = 5;
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index e4bc29511f..36fcabbeb1 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -372,7 +372,8 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
 
                     PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
 
-                    StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(0, partitionDataStorage, Map::of);
+                    StorageUpdateHandler storageUpdateHandler =
+                            new StorageUpdateHandler(0, partitionDataStorage, 
Map::of, tableCfg.dataStorage());
 
                     PartitionListener listener = new PartitionListener(
                             partitionDataStorage,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 2e0d70a14c..80755377d8 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
@@ -123,6 +124,9 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
     @InjectConfiguration
     private static RaftConfiguration raftConfiguration;
 
+    @InjectConfiguration
+    private static DataStorageConfiguration dsCfg;
+
     private static final IgniteLogger LOG = 
Loggers.forClass(ItTxDistributedTestSingleNode.class);
 
     public static final int NODE_PORT_BASE = 20_000;
@@ -422,7 +426,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
                 PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(testMpPartStorage);
                 Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () 
-> Map.of(pkStorage.get().id(), pkStorage.get());
-                StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(partId, partitionDataStorage, indexes);
+                StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(partId, partitionDataStorage, indexes, dsCfg);
 
                 CompletableFuture<Void> partitionReadyFuture = 
raftServers.get(assignment).startRaftGroupNode(
                         new RaftNodeId(grpId, configuration.peer(assignment)),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index bad1f2f50a..16e21d1825 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -26,14 +26,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.ByteBufferRow;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.storage.BinaryRowAndRowId;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -53,21 +56,34 @@ public class StorageUpdateHandler {
 
     private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
 
+    /** Last recorded GC low watermark. */
+    private final AtomicReference<HybridTimestamp> lastRecordedLwm = new 
AtomicReference<>();
+
+    /** Data storage configuration. */
+    private final DataStorageConfiguration dsCfg;
+
     /**
      * The constructor.
      *
      * @param partitionId Partition id.
      * @param storage Partition data storage.
      * @param indexes Indexes supplier.
+     * @param dsCfg Data storage configuration.
      */
-    public StorageUpdateHandler(int partitionId, PartitionDataStorage storage, 
Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes) {
+    public StorageUpdateHandler(
+            int partitionId,
+            PartitionDataStorage storage,
+            Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes,
+            DataStorageConfiguration dsCfg
+    ) {
         this.partitionId = partitionId;
         this.storage = storage;
         this.indexes = indexes;
+        this.dsCfg = dsCfg;
     }
 
     /**
-     * Handle single update.
+     * Handles single update.
      *
      * @param txId Transaction id.
      * @param rowUuid Row UUID.
@@ -81,8 +97,32 @@ public class StorageUpdateHandler {
             TablePartitionId commitPartitionId,
             @Nullable ByteBuffer rowBuffer,
             @Nullable Consumer<RowId> onReplication
+    ) {
+        handleUpdate(txId, rowUuid, commitPartitionId, rowBuffer, 
onReplication, null);
+    }
+
+    /**
+     * Handles single update.
+     *
+     * @param txId Transaction id.
+     * @param rowUuid Row UUID.
+     * @param commitPartitionId Commit partition id.
+     * @param rowBuffer Row buffer.
+     * @param onReplication Callback on replication.
+     * @param lowWatermark GC low watermark.
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-18909 Pass low 
watermark.
+    public void handleUpdate(
+            UUID txId,
+            UUID rowUuid,
+            TablePartitionId commitPartitionId,
+            @Nullable ByteBuffer rowBuffer,
+            @Nullable Consumer<RowId> onReplication,
+            @Nullable HybridTimestamp lowWatermark
     ) {
         storage.runConsistently(() -> {
+            executeBatchGc(lowWatermark);
+
             BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) : 
null;
             RowId rowId = new RowId(partitionId, rowUuid);
             UUID commitTblId = commitPartitionId.tableId();
@@ -106,7 +146,7 @@ public class StorageUpdateHandler {
     }
 
     /**
-     * Handle multiple updates.
+     * Handles multiple updates.
      *
      * @param txId Transaction id.
      * @param rowsToUpdate Collection of rows to update.
@@ -118,8 +158,30 @@ public class StorageUpdateHandler {
             Map<UUID, ByteBuffer> rowsToUpdate,
             TablePartitionId commitPartitionId,
             @Nullable Consumer<Collection<RowId>> onReplication
+    ) {
+        handleUpdateAll(txId, rowsToUpdate, commitPartitionId, onReplication, 
null);
+    }
+
+    /**
+     * Handle multiple updates.
+     *
+     * @param txId Transaction id.
+     * @param rowsToUpdate Collection of rows to update.
+     * @param commitPartitionId Commit partition id.
+     * @param onReplication On replication callback.
+     * @param lowWatermark GC low watermark.
+     */
+    // TODO: https://issues.apache.org/jira/browse/IGNITE-18909 Pass low 
watermark.
+    public void handleUpdateAll(
+            UUID txId,
+            Map<UUID, ByteBuffer> rowsToUpdate,
+            TablePartitionId commitPartitionId,
+            @Nullable Consumer<Collection<RowId>> onReplication,
+            @Nullable HybridTimestamp lowWatermark
     ) {
         storage.runConsistently(() -> {
+            executeBatchGc(lowWatermark);
+
             UUID commitTblId = commitPartitionId.tableId();
             int commitPartId = commitPartitionId.partitionId();
 
@@ -150,6 +212,28 @@ public class StorageUpdateHandler {
         });
     }
 
+    private void executeBatchGc(@Nullable HybridTimestamp newLwm) {
+        if (newLwm == null) {
+            return;
+        }
+
+        @Nullable HybridTimestamp oldLwm;
+        do {
+            oldLwm = lastRecordedLwm.get();
+
+            if (oldLwm != null && newLwm.compareTo(oldLwm) <= 0) {
+                break;
+            }
+        } while (!lastRecordedLwm.compareAndSet(oldLwm, newLwm));
+
+        if (oldLwm == null || newLwm.compareTo(oldLwm) > 0) {
+            // Iff the lwm we have is the new lwm.
+            // Otherwise our newLwm is either smaller than last recorded lwm 
or last recorded lwm has changed
+            // concurrently and it become greater. If that's the case, another 
thread will perform the GC.
+            vacuumBatch(newLwm, dsCfg.gcOnUpdateBatchSize().value());
+        }
+    }
+
     /**
      * Tries to remove a previous write from index.
      *
@@ -263,26 +347,53 @@ public class StorageUpdateHandler {
      * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
      */
     public boolean vacuum(HybridTimestamp lowWatermark) {
-        return storage.runConsistently(() -> {
-            BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
+        return storage.runConsistently(() -> internalVacuum(lowWatermark));
+    }
 
-            if (vacuumed == null) {
-                // Nothing was garbage collected.
-                return false;
+    /**
+     * Tries removing {@code count} oldest stale entries and their indexes.
+     * If there's less entries that can be removed, then exits prematurely.
+     *
+     * @param lowWatermark Low watermark for the vacuum.
+     * @param count Count of entries to GC.
+     */
+    private void vacuumBatch(HybridTimestamp lowWatermark, int count) {
+        storage.runConsistently(() -> {
+            for (int i = 0; i < count; i++) {
+                if (!internalVacuum(lowWatermark)) {
+                    break;
+                }
             }
 
-            BinaryRow binaryRow = vacuumed.binaryRow();
+            return null;
+        });
+    }
 
-            assert binaryRow != null;
+    /**
+     * Executes garbage collection. Must be called inside a {@link 
MvPartitionStorage#runConsistently(WriteClosure)} closure.
+     *
+     * @param lowWatermark Low watermark for the vacuum.
+     * @return {@code true} if an entry was garbage collected, {@code false} 
if there was nothing to collect.
+     */
+    private boolean internalVacuum(HybridTimestamp lowWatermark) {
+        BinaryRowAndRowId vacuumed = storage.pollForVacuum(lowWatermark);
 
-            RowId rowId = vacuumed.rowId();
+        if (vacuumed == null) {
+            // Nothing was garbage collected.
+            return false;
+        }
 
-            try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
-                tryRemoveFromIndexes(binaryRow, rowId, cursor);
-            }
+        BinaryRow binaryRow = vacuumed.binaryRow();
 
-            return true;
-        });
+        assert binaryRow != null;
+
+        RowId rowId = vacuumed.rowId();
+
+        try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+            tryRemoveFromIndexes(binaryRow, rowId, cursor);
+        }
+
+        return true;
     }
 
     private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b038f64cd7..62a30dece5 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -742,7 +742,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 internalTbl, partId));
 
                 CompletableFuture<StorageUpdateHandler> 
storageUpdateHandlerFut = partitionDataStorageFut
-                        .thenApply(storage -> new StorageUpdateHandler(partId, 
storage, table.indexStorageAdapters(partId)));
+                        .thenApply(storage -> new StorageUpdateHandler(
+                                partId,
+                                storage,
+                                table.indexStorageAdapters(partId),
+                                tblCfg.dataStorage()
+                        ));
 
                 CompletableFuture<Void> startGroupFut;
 
@@ -1986,8 +1991,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         TxStateStorage txStatePartitionStorage = 
partitionStorages.getTxStateStorage();
 
                         PartitionDataStorage partitionDataStorage = 
partitionDataStorage(mvPartitionStorage, internalTable, partId);
-                        StorageUpdateHandler storageUpdateHandler =
-                                new StorageUpdateHandler(partId, 
partitionDataStorage, tbl.indexStorageAdapters(partId));
+                        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(
+                                partId,
+                                partitionDataStorage,
+                                tbl.indexStorageAdapters(partId),
+                                tblCfg.dataStorage()
+                        );
 
                         RaftGroupOptions groupOptions = 
groupOptionsForPartition(
                                 internalTable.storage(),
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index cf1e032f9d..17c1009b91 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -24,12 +24,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.NativeTypes;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.storage.BaseMvStoragesTest;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
@@ -44,11 +47,13 @@ import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Base test for indexes. Sets up a table with (int, string) key and (int, 
string) value and
  * three indexes: primary key, hash index over value columns and sorted index 
over value columns.
  */
+@ExtendWith(ConfigurationExtension.class)
 public abstract class IndexBaseTest extends BaseMvStoragesTest {
     private static final BinaryTupleSchema TUPLE_SCHEMA = 
BinaryTupleSchema.createRowSchema(schemaDescriptor);
 
@@ -74,7 +79,7 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
     StorageUpdateHandler storageUpdateHandler;
 
     @BeforeEach
-    void setUp() {
+    void setUp(@InjectConfiguration DataStorageConfiguration dsCfg) {
         UUID pkIndexId = UUID.randomUUID();
         UUID sortedIndexId = UUID.randomUUID();
         UUID hashIndexId = UUID.randomUUID();
@@ -116,7 +121,8 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
                         pkIndexId, pkStorage,
                         sortedIndexId, sortedIndexStorage,
                         hashIndexId, hashIndexStorage
-                )
+                ),
+                dsCfg
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
new file mode 100644
index 0000000000..2c3997498a
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteConcurrentTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.IgniteTestUtils.RunnableX;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests for concurrent cooperative GC (GC that is executed on write). */
+@ExtendWith(ConfigurationExtension.class)
+public class PartitionGcOnWriteConcurrentTest {
+    private static final int PARTITION_ID = 1;
+    private static final TablePartitionId TABLE_PARTITION_ID = new 
TablePartitionId(UUID.randomUUID(), PARTITION_ID);
+    private static final HybridClock CLOCK = new HybridClockImpl();
+
+    private MvPartitionStorage storage;
+    private StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp(@InjectConfiguration DataStorageConfiguration dsCfg) {
+        storage = mock(MvPartitionStorage.class);
+        doAnswer(invocation -> {
+            WriteClosure<?> cls = invocation.getArgument(0);
+
+            return cls.execute();
+        }).when(storage).runConsistently(any());
+
+        when(storage.pollForVacuum(any())).thenReturn(null);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+    }
+
+    @ParameterizedTest
+    @EnumSource(UpdateType.class)
+    void testSameLwm(UpdateType updateType) {
+        HybridTimestamp lwm = CLOCK.now();
+
+        runRace(createRunnable(updateType, lwm), createRunnable(updateType, 
lwm));
+
+        verify(storage, times(1)).pollForVacuum(lwm);
+    }
+
+    @ParameterizedTest
+    @EnumSource(UpdateType.class)
+    void testDifferentLwm(UpdateType updateType) {
+        int count = 10;
+
+        HybridTimestamp[] timestamps = new HybridTimestamp[count];
+
+        RunnableX[] runnables = new RunnableX[count];
+
+        for (int i = 0; i < count; i++) {
+            HybridTimestamp ts = CLOCK.now();
+
+            timestamps[i] = ts;
+
+            runnables[i] = createRunnable(updateType, ts);
+        }
+
+        runRace(runnables);
+
+        for (int i = 0; i < count - 1; i++) {
+            verify(storage, atMostOnce()).pollForVacuum(timestamps[i]);
+        }
+
+        verify(storage, times(1)).pollForVacuum(timestamps[count - 1]);
+    }
+
+    @ParameterizedTest
+    @EnumSource(UpdateType.class)
+    void testDifferentLwmWithPreviousVacuums(UpdateType updateType) throws 
Throwable {
+        HybridTimestamp lwm1 = CLOCK.now();
+        HybridTimestamp lwm2 = CLOCK.now();
+        HybridTimestamp lwm3 = CLOCK.now();
+
+        createRunnable(updateType, lwm1).run();
+
+        runRace(createRunnable(updateType, lwm2), createRunnable(updateType, 
lwm3));
+
+        verify(storage, times(1)).pollForVacuum(lwm1);
+        verify(storage, atMostOnce()).pollForVacuum(lwm2);
+        verify(storage, times(1)).pollForVacuum(lwm3);
+    }
+
+    private RunnableX createRunnable(UpdateType updateType, HybridTimestamp 
lwm) {
+        if (updateType == UpdateType.UPDATE) {
+            //noinspection unchecked
+            return () -> storageUpdateHandler.handleUpdate(
+                    UUID.randomUUID(),
+                    UUID.randomUUID(),
+                    TABLE_PARTITION_ID,
+                    buffer(),
+                    mock(Consumer.class),
+                    lwm
+            );
+        } else {
+            //noinspection unchecked
+            return () -> storageUpdateHandler.handleUpdateAll(
+                    UUID.randomUUID(),
+                    Collections.emptyMap(),
+                    TABLE_PARTITION_ID,
+                    mock(Consumer.class),
+                    lwm
+            );
+        }
+    }
+
+    private static ByteBuffer buffer() {
+        ByteBuffer buf = mock(ByteBuffer.class);
+        when(buf.order()).thenReturn(ByteBufferRow.ORDER);
+
+        return buf;
+    }
+
+    private enum UpdateType {
+        UPDATE,
+        UPDATE_ALL;
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
new file mode 100644
index 0000000000..62a3006d77
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/PartitionGcOnWriteTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.distributed.TestPartitionDataStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryRow;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
+import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.ReadResult;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/** Tests for cooperative GC (GC that is executed on write). */
+@ExtendWith(ConfigurationExtension.class)
+public class PartitionGcOnWriteTest extends BaseMvStoragesTest {
+    private static final int WRITES_COUNT = 10;
+    private static final int GC_BATCH_SIZE = 7;
+    private static final UUID TX_ID = UUID.randomUUID();
+    private static final int PARTITION_ID = 1;
+    private static final TablePartitionId TABLE_PARTITION_ID = new 
TablePartitionId(UUID.randomUUID(), PARTITION_ID);
+
+    private TestMvPartitionStorage storage;
+    private StorageUpdateHandler storageUpdateHandler;
+
+    @BeforeEach
+    void setUp(@InjectConfiguration("mock.gcOnUpdateBatchSize=" + 
GC_BATCH_SIZE) DataStorageConfiguration dsCfg) {
+        storage = new TestMvPartitionStorage(1);
+
+        storageUpdateHandler = new StorageUpdateHandler(1, new 
TestPartitionDataStorage(storage), Collections::emptyMap, dsCfg);
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWriteWithGc.class)
+    void testNullLwm(AddWriteWithGc writer) {
+        RowId rowId = fillWithDataForGc();
+
+        writeWithGc(writer, null);
+
+        assertEquals(WRITES_COUNT, getRowVersions(rowId).size());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWriteWithGc.class)
+    void testOlderLwm(AddWriteWithGc writer) {
+        HybridTimestamp older = clock.now();
+
+        HybridTimestamp newer = clock.now();
+
+        writeWithGc(writer, newer);
+
+        RowId rowId = fillWithDataForGc();
+
+        writeWithGc(writer, older);
+
+        assertEquals(WRITES_COUNT, getRowVersions(rowId).size());
+    }
+
+    @ParameterizedTest
+    @EnumSource(AddWriteWithGc.class)
+    void testNewerLwm(AddWriteWithGc writer) {
+        RowId rowId = fillWithDataForGc();
+
+        writeWithGc(writer, clock.now());
+
+        assertEquals(WRITES_COUNT - GC_BATCH_SIZE, 
getRowVersions(rowId).size());
+    }
+
+    private List<ReadResult> getRowVersions(RowId rowId) {
+        try (Cursor<ReadResult> readResults = storage.scanVersions(rowId)) {
+            return readResults.stream().collect(toList());
+        }
+    }
+
+    private void writeWithGc(AddWriteWithGc writer, @Nullable HybridTimestamp 
lwm) {
+        UUID rowUuid = UUID.randomUUID();
+
+        TestKey key = new TestKey(1337, "leet");
+        BinaryRow row = binaryRow(key, new TestValue(999, "bar"));
+
+        writer.addWrite(storageUpdateHandler, rowUuid, row, lwm);
+    }
+
+    private RowId fillWithDataForGc() {
+        UUID rowUuid = UUID.randomUUID();
+        var rowId = new RowId(PARTITION_ID, rowUuid);
+
+        TestKey key = new TestKey(1, "foo");
+
+        for (int i = 0; i < WRITES_COUNT; i++) {
+            BinaryRow row = binaryRow(key, new TestValue(i, "bar" + i));
+
+            addWrite(storageUpdateHandler, rowUuid, row);
+
+            commitWrite(rowId);
+        }
+
+        return rowId;
+    }
+
+    private static void addWrite(StorageUpdateHandler handler, UUID rowUuid, 
@Nullable BinaryRow row) {
+        handler.handleUpdate(
+                TX_ID,
+                rowUuid,
+                TABLE_PARTITION_ID,
+                row == null ? null : row.byteBuffer(),
+                (unused) -> {}
+        );
+    }
+
+    private void commitWrite(RowId rowId) {
+        storage.runConsistently(() -> {
+            storage.commitWrite(rowId, clock.now());
+
+            return null;
+        });
+    }
+
+    /** Enum that encapsulates update API. */
+    enum AddWriteWithGc {
+        /** Uses update api. */
+        USE_UPDATE {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+                    @Nullable HybridTimestamp lwm) {
+                handler.handleUpdate(
+                        TX_ID,
+                        rowUuid,
+                        partitionId,
+                        row == null ? null : row.byteBuffer(),
+                        (unused) -> {},
+                        lwm
+                );
+            }
+        },
+        /** Uses updateAll api. */
+        USE_UPDATE_ALL {
+            @Override
+            void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+                    @Nullable HybridTimestamp lwm) {
+                handler.handleUpdateAll(
+                        TX_ID,
+                        singletonMap(rowUuid, row == null ? null : 
row.byteBuffer()),
+                        partitionId,
+                        (unused) -> {},
+                        lwm
+                );
+            }
+        };
+
+        void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row, @Nullable HybridTimestamp lwm) {
+            TablePartitionId tablePartitionId = new 
TablePartitionId(UUID.randomUUID(), 1);
+
+            addWrite(handler, tablePartitionId, rowUuid, row, lwm);
+        }
+
+        abstract void addWrite(StorageUpdateHandler handler, TablePartitionId 
partitionId, UUID rowUuid, @Nullable BinaryRow row,
+                @Nullable HybridTimestamp lwm);
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index c1c2e699c9..7861ae3e31 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -53,6 +53,8 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.TestHybridClock;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -71,6 +73,7 @@ import org.apache.ignite.internal.schema.BinaryRowConverter;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -110,6 +113,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
  */
 @ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(MockitoExtension.class)
+@ExtendWith(ConfigurationExtension.class)
 public class PartitionCommandListenerTest {
     /** Key count. */
     private static final int KEY_COUNT = 100;
@@ -173,7 +177,7 @@ public class PartitionCommandListenerTest {
      * Initializes a table listener before tests.
      */
     @BeforeEach
-    public void before() {
+    public void before(@InjectConfiguration DataStorageConfiguration dsCfg) {
         NetworkAddress addr = new NetworkAddress("127.0.0.1", 5003);
 
         ClusterService clusterService = mock(ClusterService.class, 
RETURNS_DEEP_STUBS);
@@ -188,7 +192,7 @@ public class PartitionCommandListenerTest {
 
         Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.id(), pkStorage);
 
-        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(0, partitionDataStorage, indexes);
+        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(0, partitionDataStorage, indexes, dsCfg);
 
         commandListener = new PartitionListener(
                 partitionDataStorage,
@@ -273,14 +277,12 @@ public class PartitionCommandListenerTest {
      * the maximal last applied index among storages to all storages.
      */
     @Test
-    public void testOnSnapshotSavePropagateLastAppliedIndexAndTerm() {
-        ReplicaService replicaService = mock(ReplicaService.class, 
RETURNS_DEEP_STUBS);
-
+    public void 
testOnSnapshotSavePropagateLastAppliedIndexAndTerm(@InjectConfiguration 
DataStorageConfiguration dsCfg) {
         TestPartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartitionStorage);
 
         Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.id(), pkStorage);
 
-        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes);
+        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(PARTITION_ID, partitionDataStorage, indexes, dsCfg);
 
         PartitionListener testCommandListener = new PartitionListener(
                 partitionDataStorage,
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index dc97d41f36..cf880c2585 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -34,6 +34,8 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -46,6 +48,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import org.apache.ignite.internal.schema.marshaller.MarshallerException;
 import 
org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
@@ -83,10 +86,12 @@ import org.hamcrest.CustomMatcher;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
 /** There are tests for partition replica listener. */
+@ExtendWith(ConfigurationExtension.class)
 public class PartitionReplicaListenerIndexLockingTest extends 
IgniteAbstractTest {
     private static final int PART_ID = 0;
     private static final UUID TABLE_ID = new UUID(0L, 0L);
@@ -108,7 +113,7 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
     private static Function<BinaryRow, BinaryTuple> row2SortKeyConverter;
 
     @BeforeAll
-    private static void beforeAll() {
+    public static void beforeAll(@InjectConfiguration DataStorageConfiguration 
dsCfg) {
         RaftGroupService mockRaftClient = mock(RaftGroupService.class);
 
         when(mockRaftClient.refreshAndGetLeaderWithTerm())
@@ -184,7 +189,8 @@ public class PartitionReplicaListenerIndexLockingTest 
extends IgniteAbstractTest
                 new StorageUpdateHandler(
                         PART_ID,
                         new 
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE),
-                        () -> Map.of(pkStorage.get().id(), pkStorage.get())
+                        () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+                        dsCfg
                 ),
                 peer -> true,
                 CompletableFuture.completedFuture(schemaManager)
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 88a061a482..12a8141194 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -54,6 +54,8 @@ import java.util.stream.IntStream;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTuplePrefixBuilder;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -69,6 +71,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import org.apache.ignite.internal.schema.marshaller.MarshallerException;
 import org.apache.ignite.internal.schema.marshaller.MarshallerFactory;
@@ -123,9 +126,11 @@ import org.apache.ignite.tx.TransactionException;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 
 /** There are tests for partition replica listener. */
+@ExtendWith(ConfigurationExtension.class)
 public class PartitionReplicaListenerTest extends IgniteAbstractTest {
     /** Partition id. */
     private static final int partId = 0;
@@ -230,7 +235,7 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
     private static Function<PartitionCommand, CompletableFuture<?>> 
raftClientFutureClosure = DEFAULT_MOCK_RAFT_FUTURE_CLOSURE;
 
     @BeforeAll
-    private static void beforeAll() {
+    public static void beforeAll(@InjectConfiguration DataStorageConfiguration 
dsCfg) {
         
when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock 
-> {
             if (!localLeader) {
                 return completedFuture(new LeaderWithTerm(new 
Peer(anotherNode.name()), 1L));
@@ -331,7 +336,8 @@ public class PartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 new StorageUpdateHandler(
                         partId,
                         partitionDataStorage,
-                        () -> Map.of(pkStorage.get().id(), pkStorage.get())
+                        () -> Map.of(pkStorage.get().id(), pkStorage.get()),
+                        dsCfg
                 ),
                 peer -> localNode.name().equals(peer.consistentId()),
                 completedFuture(schemaManager)
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 1edcf931a1..71cbce1d0f 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.naming.OperationNotSupportedException;
+import org.apache.ignite.configuration.ConfigurationValue;
 import org.apache.ignite.distributed.TestPartitionDataStorage;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
+import 
org.apache.ignite.internal.schema.configuration.storage.DataStorageConfiguration;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
@@ -258,7 +260,13 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
         PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
         PartitionDataStorage partitionDataStorage = new 
TestPartitionDataStorage(mvPartStorage);
         Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes = () -> 
Map.of(pkStorage.get().id(), pkStorage.get());
-        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(PART_ID, partitionDataStorage, indexes);
+
+        DataStorageConfiguration dsCfg = mock(DataStorageConfiguration.class);
+        ConfigurationValue<Integer> gcBatchSizeValue = 
mock(ConfigurationValue.class);
+        lenient().when(gcBatchSizeValue.value()).thenReturn(5);
+        
lenient().when(dsCfg.gcOnUpdateBatchSize()).thenReturn(gcBatchSizeValue);
+
+        StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(PART_ID, partitionDataStorage, indexes, dsCfg);
 
         DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schema);
 


Reply via email to