This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fa4922485 IGNITE-18895 Implement a background GC process (#1720)
9fa4922485 is described below

commit 9fa492248596658d9af5ddcbac0b8ec640472767
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Mar 2 18:18:48 2023 +0300

    IGNITE-18895 Implement a background GC process (#1720)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |  11 +
 .../configuration/TablesConfigurationSchema.java   |   4 +
 .../table/distributed/StorageUpdateHandler.java    |  13 +-
 .../internal/table/distributed/TableManager.java   | 127 ++++---
 .../table/distributed/gc/GcStorageHandler.java     |  47 +++
 .../ignite/internal/table/distributed/gc/MvGc.java | 257 +++++++++++++++
 .../table/distributed/raft/PartitionListener.java  |   1 +
 .../raft/snapshot/PartitionAccessImpl.java         |  48 +--
 .../SnapshotAwarePartitionDataStorage.java         |   2 -
 .../TableManagerDistributionZonesTest.java         |   6 +
 .../internal/table/distributed/gc/MvGcTest.java    | 364 +++++++++++++++++++++
 .../raft/snapshot/PartitionAccessImplTest.java     |  33 +-
 .../incoming/IncomingSnapshotCopierTest.java       |  29 +-
 .../SnapshotAwarePartitionDataStorageTest.java     |   2 +-
 .../distributed/TestPartitionDataStorage.java      |   1 -
 15 files changed, 854 insertions(+), 91 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index bed0f9beab..07e5e5ca37 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -415,4 +415,15 @@ public class ErrorGroups {
          */
         public static final int UNIT_CONTENT_READ_ERR = 
CODE_DEPLOYMENT_ERR_GROUP.registerErrorCode(3);
     }
+
+    /**
+     * Garbage collector error group.
+     */
+    public static class GarbageCollector {
+        /** Garbage collector error group. */
+        public static final ErrorGroup GC_ERR_GROUP = 
ErrorGroup.newGroup("GC", 14);
+
+        /** Garbage collector closed error. */
+        public static final int CLOSED_ERR = GC_ERR_GROUP.registerErrorCode(1);
+    }
 }
diff --git 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
index 14493433a0..98b1ea234c 100644
--- 
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
+++ 
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
@@ -49,4 +49,8 @@ public class TablesConfigurationSchema {
     @ExistingDataStorage
     @Value(hasDefault = true)
     public String defaultDataStorage = "aipersist";
+
+    /** Number of garbage collector threads. */
+    @Value(hasDefault = true)
+    public int gcThreads = Runtime.getRuntime().availableProcessors();
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 16e21d1825..b842038cae 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -396,7 +396,10 @@ public class StorageUpdateHandler {
         return true;
     }
 
-    private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
+    /**
+     * Adds a binary row to the indexes, if the tombstone then skips such 
operation.
+     */
+    public void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
         if (binaryRow == null) { // skip removes
             return;
         }
@@ -405,4 +408,12 @@ public class StorageUpdateHandler {
             index.put(binaryRow, rowId);
         }
     }
+
+    /**
+     * Waits for indexes to be created.
+     */
+    // TODO: IGNITE-18619 Fix it, we should have already waited for the 
indexes to be created
+    public void waitIndexes() {
+        indexes.get();
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 62a30dece5..a6b4cf639d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -58,6 +58,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -122,6 +123,7 @@ import 
org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
 import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
 import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -309,6 +311,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     /** Meta storage listener for switch reduce assignments. */
     private final WatchListener assignmentsSwitchRebalanceListener;
 
+    private final MvGc mvGc;
+
     /**
      * Creates a new table manager.
      *
@@ -451,11 +455,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         stableAssignmentsRebalanceListener = 
createStableAssignmentsRebalanceListener();
 
         assignmentsSwitchRebalanceListener = 
createAssignmentsSwitchRebalanceListener();
+
+        mvGc = new MvGc(nodeName, tablesCfg);
     }
 
-    /** {@inheritDoc} */
     @Override
     public void start() {
+        mvGc.start();
+
         tablesCfg.tables().any().replicas().listen(this::onUpdateReplicas);
 
         // TODO: IGNITE-18694 - Recovery for the case when zones watch 
listener processed event but assignments were not updated.
@@ -742,12 +749,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 internalTbl, partId));
 
                 CompletableFuture<StorageUpdateHandler> 
storageUpdateHandlerFut = partitionDataStorageFut
-                        .thenApply(storage -> new StorageUpdateHandler(
-                                partId,
-                                storage,
-                                table.indexStorageAdapters(partId),
-                                tblCfg.dataStorage()
-                        ));
+                        .thenApply(storage -> {
+                            StorageUpdateHandler storageUpdateHandler = new 
StorageUpdateHandler(
+                                    partId,
+                                    storage,
+                                    table.indexStorageAdapters(partId),
+                                    tblCfg.dataStorage()
+                            );
+
+                            mvGc.addStorage(replicaGrpId, 
storageUpdateHandler);
+
+                            return storageUpdateHandler;
+                        });
 
                 CompletableFuture<Void> startGroupFut;
 
@@ -803,7 +816,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                 internalTbl.storage(),
                                                 internalTbl.txStateStorage(),
                                                 partitionKey(internalTbl, 
partId),
-                                                table
+                                                storageUpdateHandler
                                         );
 
                                         Peer serverPeer = 
newConfiguration.peer(localMemberName);
@@ -957,7 +970,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             MvTableStorage mvTableStorage,
             TxStateTableStorage txStateTableStorage,
             PartitionKey partitionKey,
-            TableImpl tableImpl
+            StorageUpdateHandler storageUpdateHandler
     ) {
         RaftGroupOptions raftGroupOptions;
 
@@ -977,7 +990,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         partitionKey,
                         mvTableStorage,
                         txStateTableStorage,
-                        () -> 
tableImpl.indexStorageAdapters(partitionKey.partitionId()).get().values()
+                        storageUpdateHandler,
+                        mvGc
                 ),
                 incomingSnapshotsExecutor
         ));
@@ -1027,7 +1041,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
             List<Runnable> stopping = new ArrayList<>();
 
-            AtomicReference<Exception> exception = new AtomicReference<>();
+            AtomicReference<Throwable> throwable = new AtomicReference<>();
 
             AtomicBoolean nodeStoppingEx = new AtomicBoolean();
 
@@ -1037,32 +1051,27 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 stopping.add(() -> {
                     try {
                         raftMgr.stopRaftNodes(replicationGroupId);
-                    } catch (Exception e) {
-                        if (!exception.compareAndSet(null, e)) {
-                            if (!(e instanceof NodeStoppingException) || 
!nodeStoppingEx.get()) {
-                                exception.get().addSuppressed(e);
-                            }
-                        }
-
-                        if (e instanceof NodeStoppingException) {
-                            nodeStoppingEx.set(true);
-                        }
+                    } catch (Throwable t) {
+                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
                     }
                 });
 
                 stopping.add(() -> {
                     try {
                         replicaMgr.stopReplica(replicationGroupId);
-                    } catch (Exception e) {
-                        if (!exception.compareAndSet(null, e)) {
-                            if (!(e instanceof NodeStoppingException) || 
!nodeStoppingEx.get()) {
-                                exception.get().addSuppressed(e);
-                            }
-                        }
+                    } catch (Throwable t) {
+                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
+                    }
+                });
 
-                        if (e instanceof NodeStoppingException) {
-                            nodeStoppingEx.set(true);
-                        }
+                CompletableFuture<Void> removeFromGcFuture = 
mvGc.removeStorage(replicationGroupId);
+
+                stopping.add(() -> {
+                    try {
+                        // Should be done fairly quickly.
+                        removeFromGcFuture.join();
+                    } catch (Throwable t) {
+                        handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
                     }
                 });
             }
@@ -1075,14 +1084,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                         table.internalTable().txStateStorage(),
                         table.internalTable()
                 );
-            } catch (Exception e) {
-                if (!exception.compareAndSet(null, e)) {
-                    exception.get().addSuppressed(e);
-                }
+            } catch (Throwable t) {
+                handleExceptionOnCleanUpTablesResources(t, throwable, 
nodeStoppingEx);
             }
 
-            if (exception.get() != null) {
-                LOG.info("Unable to stop table [name={}, tableId={}]", 
exception.get(), table.name(), table.tableId());
+            if (throwable.get() != null) {
+                LOG.error("Unable to stop table [name={}, tableId={}]", 
throwable.get(), table.name(), table.tableId());
             }
         }
     }
@@ -1246,12 +1253,16 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         try {
             int partitions = assignment.size();
 
+            CompletableFuture<?>[] removeStorageFromGcFutures = new 
CompletableFuture<?>[partitions];
+
             for (int p = 0; p < partitions; p++) {
                 TablePartitionId replicationGroupId = new 
TablePartitionId(tblId, p);
 
                 raftMgr.stopRaftNodes(replicationGroupId);
 
                 replicaMgr.stopReplica(replicationGroupId);
+
+                removeStorageFromGcFutures[p] = 
mvGc.removeStorage(replicationGroupId);
             }
 
             tablesByIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
@@ -1273,10 +1284,11 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
             // TODO: IGNITE-18703 Destroy raft log and meta
 
-            CompletableFuture<Void> destroyTableStoragesFuture = allOf(
-                    table.internalTable().storage().destroy(),
-                    runAsync(() -> 
table.internalTable().txStateStorage().destroy(), ioExecutor)
-            );
+            CompletableFuture<Void> destroyTableStoragesFuture = 
allOf(removeStorageFromGcFutures)
+                    .thenCompose(unused -> allOf(
+                            table.internalTable().storage().destroy(),
+                            runAsync(() -> 
table.internalTable().txStateStorage().destroy(), ioExecutor))
+                    );
 
             CompletableFuture<?> dropSchemaRegistryFuture = 
schemaManager.dropRegistry(causalityToken, table.tableId())
                     .thenCompose(
@@ -2002,7 +2014,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 internalTable.storage(),
                                 internalTable.txStateStorage(),
                                 partitionKey(internalTable, partId),
-                                tbl
+                                storageUpdateHandler
                         );
 
                         RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -2182,13 +2194,14 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     }
 
     /**
-     * Creates or gets partition stores. If one of the storages has not 
completed the rebalance, then the storages are cleared.
+     * Creates partition stores. If one of the storages has not completed the 
rebalance, then the storages are cleared.
      *
      * @param table Table.
      * @param partitionId Partition ID.
      * @return Future of creating or getting partition stores.
      */
     // TODO: IGNITE-18619 Maybe we should wait here to create indexes, if you 
add now, then the tests start to hang
+    // TODO: IGNITE-18939 Create storages only once, then only get them
     private CompletableFuture<PartitionStorages> 
getOrCreatePartitionStorages(TableImpl table, int partitionId) {
         InternalTable internalTable = table.internalTable();
 
@@ -2260,11 +2273,33 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 // TODO: IGNITE-18703 Destroy raft log and meta
 
                 // Should be done fairly quickly.
-                allOf(
-                        internalTable.storage().destroyPartition(partitionId),
-                        runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
-                ).join();
+                mvGc.removeStorage(tablePartitionId)
+                        .thenCompose(unused -> allOf(
+                                
internalTable.storage().destroyPartition(partitionId),
+                                runAsync(() -> 
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+                        ))
+                        .join();
             }
         });
     }
+
+    private static void handleExceptionOnCleanUpTablesResources(
+            Throwable t,
+            AtomicReference<Throwable> throwable,
+            AtomicBoolean nodeStoppingEx
+    ) {
+        if (t instanceof CompletionException || t instanceof 
ExecutionException) {
+            t = t.getCause();
+        }
+
+        if (!throwable.compareAndSet(null, t)) {
+            if (!(t instanceof NodeStoppingException) || 
!nodeStoppingEx.get()) {
+                throwable.get().addSuppressed(t);
+            }
+        }
+
+        if (t instanceof NodeStoppingException) {
+            nodeStoppingEx.set(true);
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
new file mode 100644
index 0000000000..d8396e115b
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+
+/**
+ * Container for handling storage by the garbage collector.
+ */
+class GcStorageHandler {
+    /**
+     * Handler of multi-versioned partition storage and its indexes for 
garbage collection.
+     *
+     * @see StorageUpdateHandler#vacuum(HybridTimestamp)
+     */
+    final StorageUpdateHandler storageUpdateHandler;
+
+    /**
+     * Reference to the future of garbage collection batch for multi-versioned 
partition storage and its indexes.
+     *
+     * <p>Before the garbage collection batch begins, the new future must be 
inserted, after the garbage collection batch ends, the
+     * inserted future must be completed and removed.
+     */
+    final AtomicReference<CompletableFuture<Void>> gcInProgressFuture = new 
AtomicReference<>();
+
+    GcStorageHandler(StorageUpdateHandler storageUpdateHandler) {
+        this.storageUpdateHandler = storageUpdateHandler;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
new file mode 100644
index 0000000000..91b7867dec
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ErrorGroups.GarbageCollector;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Garbage collector for multi-versioned storages and their indexes in the 
background.
+ *
+ * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
+ */
+public class MvGc implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(MvGc.class);
+
+    /** GC batch size for the storage. */
+    static final int GC_BATCH_SIZE = 5;
+
+    /** Node name. */
+    private final String nodeName;
+
+    /** Tables configuration. */
+    private final TablesConfiguration tablesConfig;
+
+    /** Garbage collection thread pool. */
+    private volatile ExecutorService executor;
+
+    /** Prevents double closing. */
+    private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+    /** Busy lock to close synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Low watermark. */
+    private final AtomicReference<HybridTimestamp> lowWatermarkReference = new 
AtomicReference<>();
+
+    /** Storage handler by table partition ID for which garbage will be 
collected. */
+    private final ConcurrentMap<TablePartitionId, GcStorageHandler> 
storageHandlerByPartitionId = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param tablesConfig Tables configuration.
+     */
+    public MvGc(String nodeName, TablesConfiguration tablesConfig) {
+        this.nodeName = nodeName;
+        this.tablesConfig = tablesConfig;
+    }
+
+    /**
+     * Starts the garbage collector.
+     */
+    public void start() {
+        int threadCount = tablesConfig.gcThreads().value();
+
+        executor = new ThreadPoolExecutor(
+                threadCount,
+                threadCount,
+                30,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new NamedThreadFactory(nodeName, LOG)
+        );
+    }
+
+    /**
+     * Adds storage for background garbage collection when updating a low 
watermark.
+     *
+     * @param tablePartitionId Table partition ID.
+     * @param storageUpdateHandler Storage update handler.
+     * @throws IgniteInternalException with {@link 
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
+     */
+    public void addStorage(TablePartitionId tablePartitionId, 
StorageUpdateHandler storageUpdateHandler) {
+        inBusyLock(() -> {
+            GcStorageHandler previous = 
storageHandlerByPartitionId.putIfAbsent(
+                    tablePartitionId,
+                    new GcStorageHandler(storageUpdateHandler)
+            );
+
+            // TODO: IGNITE-18939 Should be called once, you need to check 
that previous == null
+            if (previous == null && lowWatermarkReference.get() != null) {
+                scheduleGcForStorage(tablePartitionId);
+            }
+        });
+    }
+
+    /**
+     * Removes storage for background garbage collection and completes the 
garbage collection for it.
+     *
+     * <p>Should be called before rebalancing/closing/destroying the storage.
+     *
+     * @param tablePartitionId Table partition ID.
+     * @return Storage garbage collection completion future.
+     * @throws IgniteInternalException with {@link 
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
+     */
+    public CompletableFuture<Void> removeStorage(TablePartitionId 
tablePartitionId) {
+        return inBusyLock(() -> {
+            GcStorageHandler removed = 
storageHandlerByPartitionId.remove(tablePartitionId);
+
+            if (removed == null) {
+                return completedFuture(null);
+            }
+
+            CompletableFuture<Void> gcInProgressFuture = 
removed.gcInProgressFuture.get();
+
+            return gcInProgressFuture == null ? completedFuture(null) : 
gcInProgressFuture;
+        });
+    }
+
+    /**
+     * Updates the new watermark only if it is larger than the current low 
watermark.
+     *
+     * <p>If the update is successful, it will schedule a new garbage 
collection for all storages.
+     *
+     * @param newLwm New low watermark.
+     * @throws IgniteInternalException with {@link 
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
+     */
+    public void updateLowWatermark(HybridTimestamp newLwm) {
+        inBusyLock(() -> {
+            HybridTimestamp updatedLwm = 
lowWatermarkReference.updateAndGet(currentLwm -> {
+                if (currentLwm == null) {
+                    return newLwm;
+                }
+
+                // Update only if the new one is greater than the current one.
+                return newLwm.compareTo(currentLwm) > 0 ? newLwm : currentLwm;
+            });
+
+            // If the new watermark is smaller than the current one or has 
been updated in parallel, then we do nothing.
+            if (updatedLwm != newLwm) {
+                return;
+            }
+
+            executor.submit(() -> inBusyLock(this::initNewGcBusy));
+        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closeGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
+        if (executor != null) {
+            shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+        }
+    }
+
+    private void initNewGcBusy() {
+        
storageHandlerByPartitionId.keySet().forEach(this::scheduleGcForStorage);
+    }
+
+    private void scheduleGcForStorage(TablePartitionId tablePartitionId) {
+        executor.submit(() -> inBusyLock(() -> {
+            GcStorageHandler storageHandler = 
storageHandlerByPartitionId.get(tablePartitionId);
+
+            if (storageHandler == null) {
+                // Storage has been removed from garbage collection.
+                return;
+            }
+
+            CompletableFuture<Void> future = new CompletableFuture<>();
+
+            if (!storageHandler.gcInProgressFuture.compareAndSet(null, 
future)) {
+                // In parallel, another task has already begun collecting 
garbage.
+                return;
+            }
+
+            try {
+                for (int i = 0; i < GC_BATCH_SIZE; i++) {
+                    HybridTimestamp lowWatermark = lowWatermarkReference.get();
+
+                    assert lowWatermark != null : tablePartitionId;
+
+                    // If storage has been deleted or there is no garbage, 
then for now we will stop collecting garbage for this storage.
+                    if 
(!storageHandlerByPartitionId.containsKey(tablePartitionId)
+                            || 
!storageHandler.storageUpdateHandler.vacuum(lowWatermark)) {
+                        return;
+                    }
+                }
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+
+                return;
+            } finally {
+                if (!future.isCompletedExceptionally()) {
+                    future.complete(null);
+                }
+
+                storageHandler.gcInProgressFuture.set(null);
+            }
+
+            scheduleGcForStorage(tablePartitionId);
+        }));
+    }
+
+    private <T> T inBusyLock(Supplier<T> supplier) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(GarbageCollector.CLOSED_ERR);
+        }
+
+        try {
+            return supplier.get();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private void inBusyLock(Runnable runnable) {
+        inBusyLock(() -> {
+            runnable.run();
+
+            return null;
+        });
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index cfa716db62..6106693e96 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -427,6 +427,7 @@ public class PartitionListener implements RaftGroupListener 
{
 
     @Override
     public void onShutdown() {
+        storage.close();
     }
 
     /**
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index bf97b1fb23..4d5b09294e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -17,19 +17,19 @@
 
 package org.apache.ignite.internal.table.distributed.raft.snapshot;
 
-import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -48,28 +48,33 @@ public class PartitionAccessImpl implements PartitionAccess 
{
 
     private final TxStateTableStorage txStateTableStorage;
 
-    private final Supplier<Collection<TableSchemaAwareIndexStorage>> indexes;
-
     private final RaftGroupConfigurationConverter 
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
 
+    private final StorageUpdateHandler storageUpdateHandler;
+
+    private final MvGc mvGc;
+
     /**
      * Constructor.
      *
      * @param partitionKey Partition key.
      * @param mvTableStorage Multi version table storage.
      * @param txStateTableStorage Table transaction state storage.
-     * @param indexes Index storages supplier.
+     * @param storageUpdateHandler Storage update handler.
+     * @param mvGc Garbage collector for multi-versioned storages and their 
indexes in the background.
      */
     public PartitionAccessImpl(
             PartitionKey partitionKey,
             MvTableStorage mvTableStorage,
             TxStateTableStorage txStateTableStorage,
-            Supplier<Collection<TableSchemaAwareIndexStorage>> indexes
+            StorageUpdateHandler storageUpdateHandler,
+            MvGc mvGc
     ) {
         this.partitionKey = partitionKey;
         this.mvTableStorage = mvTableStorage;
         this.txStateTableStorage = txStateTableStorage;
-        this.indexes = indexes;
+        this.storageUpdateHandler = storageUpdateHandler;
+        this.mvGc = mvGc;
     }
 
     @Override
@@ -119,7 +124,7 @@ public class PartitionAccessImpl implements PartitionAccess 
{
         mvPartitionStorage.runConsistently(() -> {
             mvPartitionStorage.addWrite(rowId, row, txId, commitTableId, 
commitPartitionId);
 
-            addToIndexes(row, rowId);
+            storageUpdateHandler.addToIndexes(row, rowId);
 
             return null;
         });
@@ -132,7 +137,7 @@ public class PartitionAccessImpl implements PartitionAccess 
{
         mvPartitionStorage.runConsistently(() -> {
             mvPartitionStorage.addWriteCommitted(rowId, row, commitTimestamp);
 
-            addToIndexes(row, rowId);
+            storageUpdateHandler.addToIndexes(row, rowId);
 
             return null;
         });
@@ -173,14 +178,15 @@ public class PartitionAccessImpl implements 
PartitionAccess {
     @Override
     public CompletableFuture<Void> startRebalance() {
         // TODO: IGNITE-18619 Fix it, we should have already waited for the 
indexes to be created
-        indexes.get();
+        storageUpdateHandler.waitIndexes();
 
         TxStateStorage txStateStorage = getTxStateStorage(partitionId());
 
-        return CompletableFuture.allOf(
-                mvTableStorage.startRebalancePartition(partitionId()),
-                txStateStorage.startRebalance()
-        );
+        return mvGc.removeStorage(toTablePartitionId(partitionKey))
+                .thenCompose(unused -> CompletableFuture.allOf(
+                        mvTableStorage.startRebalancePartition(partitionId()),
+                        txStateStorage.startRebalance()
+                ));
     }
 
     @Override
@@ -190,7 +196,7 @@ public class PartitionAccessImpl implements PartitionAccess 
{
         return CompletableFuture.allOf(
                 mvTableStorage.abortRebalancePartition(partitionId()),
                 txStateStorage.abortRebalance()
-        );
+        ).thenAccept(unused -> 
mvGc.addStorage(toTablePartitionId(partitionKey), storageUpdateHandler));
     }
 
     @Override
@@ -202,7 +208,7 @@ public class PartitionAccessImpl implements PartitionAccess 
{
         return CompletableFuture.allOf(
                 mvTableStorage.finishRebalancePartition(partitionId(), 
lastAppliedIndex, lastAppliedTerm, configBytes),
                 txStateStorage.finishRebalance(lastAppliedIndex, 
lastAppliedTerm)
-        );
+        ).thenAccept(unused -> 
mvGc.addStorage(toTablePartitionId(partitionKey), storageUpdateHandler));
     }
 
     private MvPartitionStorage getMvPartitionStorage(int partitionId) {
@@ -221,11 +227,7 @@ public class PartitionAccessImpl implements 
PartitionAccess {
         return txStateStorage;
     }
 
-    private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
-        if (binaryRow == null) {
-            return;
-        }
-
-        indexes.get().forEach(index -> index.put(binaryRow, rowId));
+    private static TablePartitionId toTablePartitionId(PartitionKey 
partitionKey) {
+        return new TablePartitionId(partitionKey.tableId(), 
partitionKey.partitionId());
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index f6458d5579..28161e3c0a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -175,8 +175,6 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
     @Override
     public void close() {
         cleanupSnapshots();
-
-        partitionStorage.close();
     }
 
     private void cleanupSnapshots() {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index 64e3fed14a..9294016373 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -481,5 +481,11 @@ public class TableManagerDistributionZonesTest extends 
IgniteAbstractTest {
         
when(tableCfg.assignments()).thenReturn(mock(ConfigurationValue.class));
 
         when(tablesConfiguration.tables()).thenReturn(tables);
+
+        ConfigurationValue<Integer> gcThreads = mock(ConfigurationValue.class);
+
+        when(gcThreads.value()).thenReturn(1);
+
+        when(tablesConfiguration.gcThreads()).thenReturn(gcThreads);
     }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
new file mode 100644
index 0000000000..070d22f1b8
--- /dev/null
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.ErrorGroups.GarbageCollector;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.function.Executable;
+
+/**
+ * For testing {@link MvGc}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class MvGcTest {
+    private static final int PARTITION_ID = 0;
+
+    private MvGc gc;
+
+    @BeforeEach
+    void setUp(
+            @InjectConfiguration("mock.gcThreads = 1")
+            TablesConfiguration tablesConfig
+    ) {
+        gc = new MvGc("test", tablesConfig);
+
+        gc.start();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        closeAllManually(gc);
+    }
+
+    @Test
+    void testAddStorageWithoutLowWatermark() {
+        CompletableFuture<Void> invokeVacuumMethodFuture = new 
CompletableFuture<>();
+
+        gc.addStorage(createTablePartitionId(), 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
+
+        // We expect that StorageUpdateHandler#vacuum will not be called.
+        assertThat(invokeVacuumMethodFuture, willTimeoutFast());
+    }
+
+    @Test
+    void testAddStorageWithLowWatermark() {
+        HybridTimestamp lowWatermark = new HybridTimestamp(1, 1);
+
+        gc.updateLowWatermark(lowWatermark);
+
+        CompletableFuture<Void> invokeVacuumMethodFuture = new 
CompletableFuture<>();
+
+        gc.addStorage(createTablePartitionId(), 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, lowWatermark));
+
+        // We expect StorageUpdateHandler#vacuum to be called with the set low 
watermark.
+        assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testStartVacuumOnSuccessfulUpdateLowWatermark() {
+        CompletableFuture<Void> invokeVacuumMethodFuture0 = new 
CompletableFuture<>();
+        CompletableFuture<Void> invokeVacuumMethodFuture1 = new 
CompletableFuture<>();
+
+        HybridTimestamp lowWatermark0 = new HybridTimestamp(1, 1);
+
+        StorageUpdateHandler storageUpdateHandler0 = 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, lowWatermark0);
+        StorageUpdateHandler storageUpdateHandler1 = 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture1, lowWatermark0);
+
+        gc.addStorage(createTablePartitionId(), storageUpdateHandler0);
+        gc.addStorage(createTablePartitionId(), storageUpdateHandler1);
+
+        gc.updateLowWatermark(lowWatermark0);
+
+        // We expect StorageUpdateHandler#vacuum to be called with the set 
lowWatermark0.
+        assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
+        assertThat(invokeVacuumMethodFuture1, willCompleteSuccessfully());
+
+        // What happens if we increase low watermark ?
+        CompletableFuture<Void> invokeVacuumMethodFuture2 = new 
CompletableFuture<>();
+        CompletableFuture<Void> invokeVacuumMethodFuture3 = new 
CompletableFuture<>();
+
+        HybridTimestamp lowWatermark1 = new HybridTimestamp(2, 2);
+
+        completeFutureOnVacuum(storageUpdateHandler0, 
invokeVacuumMethodFuture2, lowWatermark1);
+        completeFutureOnVacuum(storageUpdateHandler1, 
invokeVacuumMethodFuture3, lowWatermark1);
+
+        gc.updateLowWatermark(lowWatermark1);
+
+        // We expect StorageUpdateHandler#vacuum to be called with the set 
lowWatermark0.
+        assertThat(invokeVacuumMethodFuture2, willCompleteSuccessfully());
+        assertThat(invokeVacuumMethodFuture3, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testStartVacuumOnFailUpdateLowWatermark() {
+        HybridTimestamp firstLowWatermark = new HybridTimestamp(2, 2);
+
+        CompletableFuture<Void> invokeVacuumMethodFuture0 = new 
CompletableFuture<>();
+        CompletableFuture<Void> invokeVacuumMethodFuture1 = new 
CompletableFuture<>();
+
+        StorageUpdateHandler storageUpdateHandler0 = 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, firstLowWatermark);
+        StorageUpdateHandler storageUpdateHandler1 = 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture1, firstLowWatermark);
+
+        gc.addStorage(createTablePartitionId(), storageUpdateHandler0);
+        gc.addStorage(createTablePartitionId(), storageUpdateHandler1);
+
+        gc.updateLowWatermark(firstLowWatermark);
+
+        // We expect StorageUpdateHandler#vacuum to be called with the set 
lowWatermark0.
+        assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
+        assertThat(invokeVacuumMethodFuture1, willCompleteSuccessfully());
+
+        // What happens if we try set same low watermark ?
+        HybridTimestamp sameLowWatermark = new HybridTimestamp(2, 2);
+
+        CompletableFuture<Void> invokeVacuumMethodFutureForSame0 = new 
CompletableFuture<>();
+        CompletableFuture<Void> invokeVacuumMethodFutureForSame1 = new 
CompletableFuture<>();
+
+        completeFutureOnVacuum(storageUpdateHandler0, 
invokeVacuumMethodFutureForSame0, sameLowWatermark);
+        completeFutureOnVacuum(storageUpdateHandler1, 
invokeVacuumMethodFutureForSame1, sameLowWatermark);
+
+        gc.updateLowWatermark(sameLowWatermark);
+
+        // We expect that StorageUpdateHandler#vacuum will not be called.
+        assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
+        assertThat(invokeVacuumMethodFutureForSame1, willTimeoutFast());
+
+        // What happens if we try set same lower watermark ?
+        HybridTimestamp lowerLowWatermark = new HybridTimestamp(1, 1);
+
+        CompletableFuture<Void> invokeVacuumMethodFutureForLower0 = new 
CompletableFuture<>();
+        CompletableFuture<Void> invokeVacuumMethodFutureForLower1 = new 
CompletableFuture<>();
+
+        completeFutureOnVacuum(storageUpdateHandler0, 
invokeVacuumMethodFutureForLower0, lowerLowWatermark);
+        completeFutureOnVacuum(storageUpdateHandler1, 
invokeVacuumMethodFutureForLower1, lowerLowWatermark);
+
+        gc.updateLowWatermark(lowerLowWatermark);
+
+        // We expect that StorageUpdateHandler#vacuum will not be called.
+        assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
+        assertThat(invokeVacuumMethodFutureForSame1, willTimeoutFast());
+        assertThat(invokeVacuumMethodFutureForLower0, willTimeoutFast());
+        assertThat(invokeVacuumMethodFutureForLower1, willTimeoutFast());
+    }
+
+    @Test
+    void testCountInvokeVacuum() throws Exception {
+        CountDownLatch latch = new CountDownLatch(MvGc.GC_BATCH_SIZE + 2);
+
+        StorageUpdateHandler storageUpdateHandler = 
createWithCountDownOnVacuum(latch);
+
+        gc.addStorage(createTablePartitionId(), storageUpdateHandler);
+
+        gc.updateLowWatermark(new HybridTimestamp(2, 2));
+
+        latch.await(1, TimeUnit.SECONDS);
+    }
+
+    @Test
+    void testRemoveStorageNotExist() {
+        assertThat(gc.removeStorage(createTablePartitionId()), 
willCompleteSuccessfully());
+    }
+
+    @Test
+    void testRemoveStorageForCompletedGc() {
+        CompletableFuture<Void> invokeVacuumMethodFuture = new 
CompletableFuture<>();
+
+        TablePartitionId tablePartitionId = createTablePartitionId();
+
+        gc.addStorage(tablePartitionId, 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
+
+        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+        assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
+        assertThat(gc.removeStorage(tablePartitionId), 
willCompleteSuccessfully());
+
+        // What happens if we delete it again?
+        assertThat(gc.removeStorage(tablePartitionId), 
willCompleteSuccessfully());
+    }
+
+    @Test
+    void testRemoveStorageInMiddleGc() {
+        CompletableFuture<Void> startInvokeVacuumMethodFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> finishInvokeVacuumMethodFuture = new 
CompletableFuture<>();
+
+        TablePartitionId tablePartitionId = createTablePartitionId();
+
+        gc.addStorage(tablePartitionId, 
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture, 
finishInvokeVacuumMethodFuture));
+
+        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+        assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
+
+        CompletableFuture<Void> removeStorageFuture = 
gc.removeStorage(tablePartitionId);
+
+        assertThat(removeStorageFuture, willTimeoutFast());
+
+        finishInvokeVacuumMethodFuture.complete(null);
+
+        assertThat(removeStorageFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void testRemoveStorageWithError() {
+        CompletableFuture<Void> startInvokeVacuumMethodFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> finishInvokeVacuumMethodFuture = new 
CompletableFuture<>();
+
+        TablePartitionId tablePartitionId = createTablePartitionId();
+
+        gc.addStorage(tablePartitionId, 
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture, 
finishInvokeVacuumMethodFuture));
+
+        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+        assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
+
+        CompletableFuture<Void> removeStorageFuture = 
gc.removeStorage(tablePartitionId);
+
+        assertThat(removeStorageFuture, willTimeoutFast());
+
+        finishInvokeVacuumMethodFuture.completeExceptionally(new 
RuntimeException("form test"));
+
+        assertThat(removeStorageFuture, willFailFast(RuntimeException.class));
+    }
+
+    @Test
+    void testRemoveStorage() {
+        CompletableFuture<Void> invokeVacuumMethodFuture0 = new 
CompletableFuture<>();
+
+        TablePartitionId tablePartitionId = createTablePartitionId();
+
+        StorageUpdateHandler storageUpdateHandler = 
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, null);
+
+        gc.addStorage(tablePartitionId, storageUpdateHandler);
+
+        gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+        assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
+        assertThat(gc.removeStorage(tablePartitionId), 
willCompleteSuccessfully());
+
+        // What happens if we update the low watermark?
+        CompletableFuture<Void> invokeVacuumMethodFuture1 = new 
CompletableFuture<>();
+
+        completeFutureOnVacuum(storageUpdateHandler, 
invokeVacuumMethodFuture1, null);
+
+        assertThat(invokeVacuumMethodFuture1, willTimeoutFast());
+    }
+
+    @Test
+    void testClose() throws Exception {
+        gc.close();
+
+        assertThrowsClosed(() -> gc.addStorage(createTablePartitionId(), 
mock(StorageUpdateHandler.class)));
+        assertThrowsClosed(() -> gc.removeStorage(createTablePartitionId()));
+        assertThrowsClosed(() -> gc.updateLowWatermark(new HybridTimestamp(1, 
1)));
+
+        assertDoesNotThrow(gc::close);
+    }
+
+    private TablePartitionId createTablePartitionId() {
+        return new TablePartitionId(UUID.randomUUID(), PARTITION_ID);
+    }
+
+    private StorageUpdateHandler 
createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable 
HybridTimestamp exp) {
+        StorageUpdateHandler storageUpdateHandler = 
mock(StorageUpdateHandler.class);
+
+        completeFutureOnVacuum(storageUpdateHandler, future, exp);
+
+        return storageUpdateHandler;
+    }
+
+    private void completeFutureOnVacuum(
+            StorageUpdateHandler storageUpdateHandler,
+            CompletableFuture<Void> future,
+            @Nullable HybridTimestamp exp
+    ) {
+        
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation 
-> {
+            if (exp != null) {
+                try {
+                    assertEquals(exp, invocation.getArgument(0));
+
+                    future.complete(null);
+                } catch (Throwable t) {
+                    future.completeExceptionally(t);
+                }
+            } else {
+                future.complete(null);
+            }
+
+            return false;
+        });
+    }
+
+    private StorageUpdateHandler createWithCountDownOnVacuum(CountDownLatch 
latch) {
+        StorageUpdateHandler storageUpdateHandler = 
mock(StorageUpdateHandler.class);
+
+        
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation 
-> {
+            latch.countDown();
+
+            return latch.getCount() > 0;
+        });
+
+        return storageUpdateHandler;
+    }
+
+    private StorageUpdateHandler 
createWithWaitFinishVacuum(CompletableFuture<Void> startFuture, 
CompletableFuture<Void> finishFuture) {
+        StorageUpdateHandler storageUpdateHandler = 
mock(StorageUpdateHandler.class);
+
+        
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation 
-> {
+            startFuture.complete(null);
+
+            assertThat(finishFuture, willCompleteSuccessfully());
+
+            return false;
+        });
+
+        return storageUpdateHandler;
+    }
+
+    private static void assertThrowsClosed(Executable executable) {
+        IgniteInternalException exception = 
assertThrows(IgniteInternalException.class, executable);
+
+        assertEquals(GarbageCollector.CLOSED_ERR, exception.code());
+    }
+}
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
index 84e328c9d3..b932e672bc 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
@@ -22,12 +22,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -39,7 +37,8 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
-import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import 
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
 import org.junit.jupiter.api.Test;
@@ -67,7 +66,8 @@ public class PartitionAccessImplTest {
                 new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
                 mvTableStorage,
                 txStateTableStorage,
-                List::of
+                mock(StorageUpdateHandler.class),
+                mock(MvGc.class)
         );
 
         assertEquals(0, partitionAccess.minLastAppliedIndex());
@@ -107,7 +107,8 @@ public class PartitionAccessImplTest {
                 new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
                 mvTableStorage,
                 txStateTableStorage,
-                List::of
+                mock(StorageUpdateHandler.class),
+                mock(MvGc.class)
         );
 
         assertEquals(0, partitionAccess.minLastAppliedTerm());
@@ -141,13 +142,14 @@ public class PartitionAccessImplTest {
 
         MvPartitionStorage mvPartitionStorage = 
createMvPartition(mvTableStorage, TEST_PARTITION_ID);
 
-        TableSchemaAwareIndexStorage indexStorage = 
mock(TableSchemaAwareIndexStorage.class);
+        StorageUpdateHandler storageUpdateHandler = 
mock(StorageUpdateHandler.class);
 
         PartitionAccess partitionAccess = new PartitionAccessImpl(
                 new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
                 mvTableStorage,
                 new TestTxStateTableStorage(),
-                () -> List.of(indexStorage)
+                storageUpdateHandler,
+                mock(MvGc.class)
         );
 
         RowId rowId = new RowId(TEST_PARTITION_ID);
@@ -159,18 +161,18 @@ public class PartitionAccessImplTest {
 
         verify(mvPartitionStorage, times(1)).addWrite(eq(rowId), 
eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
 
-        verify(indexStorage, times(1)).put(eq(binaryRow), eq(rowId));
+        verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow), 
eq(rowId));
 
         // Let's check with a null binaryRow.
         binaryRow = null;
 
-        reset(mvPartitionStorage, indexStorage);
+        reset(mvPartitionStorage, storageUpdateHandler);
 
         partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId, 
TEST_PARTITION_ID);
 
         verify(mvPartitionStorage, times(1)).addWrite(eq(rowId), 
eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
 
-        verify(indexStorage, never()).put(eq(binaryRow), eq(rowId));
+        verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow), 
eq(rowId));
     }
 
     @Test
@@ -179,13 +181,14 @@ public class PartitionAccessImplTest {
 
         MvPartitionStorage mvPartitionStorage = 
createMvPartition(mvTableStorage, TEST_PARTITION_ID);
 
-        TableSchemaAwareIndexStorage indexStorage = 
mock(TableSchemaAwareIndexStorage.class);
+        StorageUpdateHandler storageUpdateHandler = 
mock(StorageUpdateHandler.class);
 
         PartitionAccess partitionAccess = new PartitionAccessImpl(
                 new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
                 mvTableStorage,
                 new TestTxStateTableStorage(),
-                () -> List.of(indexStorage)
+                storageUpdateHandler,
+                mock(MvGc.class)
         );
 
         RowId rowId = new RowId(TEST_PARTITION_ID);
@@ -195,18 +198,18 @@ public class PartitionAccessImplTest {
 
         verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId), 
eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
 
-        verify(indexStorage, times(1)).put(eq(binaryRow), eq(rowId));
+        verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow), 
eq(rowId));
 
         // Let's check with a null binaryRow.
         binaryRow = null;
 
-        reset(mvPartitionStorage, indexStorage);
+        reset(mvPartitionStorage, storageUpdateHandler);
 
         partitionAccess.addWriteCommitted(rowId, binaryRow, 
HybridTimestamp.MAX_VALUE);
 
         verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId), 
eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
 
-        verify(indexStorage, never()).put(eq(binaryRow), eq(rowId));
+        verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow), 
eq(rowId));
     }
 
     private static MvPartitionStorage createMvPartition(MvTableStorage 
tableStorage, int partitionId) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 61fed2d6d0..8e66e4bcdd 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -73,7 +73,9 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
 import 
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
@@ -102,6 +104,7 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -135,8 +138,19 @@ public class IncomingSnapshotCopierTest {
 
     private final UUID snapshotId = UUID.randomUUID();
 
+    private final UUID tableId = UUID.randomUUID();
+
     private final RaftGroupConfigurationConverter 
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
 
+    private MvGc mvGc;
+
+    @BeforeEach
+    void setUp() {
+        mvGc = mock(MvGc.class);
+
+        when(mvGc.removeStorage(any(TablePartitionId.class))).then(invocation 
-> completedFuture(null));
+    }
+
     @AfterEach
     void tearDown() {
         shutdownAndAwaitTermination(executorService, 1, TimeUnit.SECONDS);
@@ -182,6 +196,11 @@ public class IncomingSnapshotCopierTest {
 
         assertEquals(Status.OK().getCode(), snapshotCopier.getCode());
 
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
TEST_PARTITION);
+
+        verify(mvGc, times(1)).removeStorage(eq(tablePartitionId));
+        verify(mvGc, times(1)).addStorage(eq(tablePartitionId), 
any(StorageUpdateHandler.class));
+
         MvPartitionStorage incomingMvPartitionStorage = 
incomingMvTableStorage.getMvPartition(TEST_PARTITION);
         TxStateStorage incomingTxStatePartitionStorage = 
incomingTxStateTableStorage.getTxStateStorage(TEST_PARTITION);
 
@@ -272,10 +291,11 @@ public class IncomingSnapshotCopierTest {
                 SnapshotUri.toStringUri(snapshotId, NODE_NAME),
                 mock(RaftOptions.class),
                 spy(new PartitionAccessImpl(
-                        new PartitionKey(UUID.randomUUID(), TEST_PARTITION),
+                        new PartitionKey(tableId, TEST_PARTITION),
                         incomingTableStorage,
                         incomingTxStateTableStorage,
-                        List::of
+                        mock(StorageUpdateHandler.class),
+                        mvGc
                 )),
                 mock(SnapshotMeta.class),
                 executorService
@@ -568,6 +588,11 @@ public class IncomingSnapshotCopierTest {
         assertThat(runAsync(snapshotCopier::join), 
willFailFast(IllegalStateException.class));
 
         verify(partitionSnapshotStorage.partition()).abortRebalance();
+
+        TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
TEST_PARTITION);
+
+        verify(mvGc, times(1)).removeStorage(eq(tablePartitionId));
+        verify(mvGc, times(1)).addStorage(eq(tablePartitionId), 
any(StorageUpdateHandler.class));
     }
 
     private TableConfiguration getTableConfig() {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 26bdbd396c..b91b0580e6 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -178,7 +178,7 @@ class SnapshotAwarePartitionDataStorageTest {
     void delegatesClose() {
         testedStorage.close();
 
-        verify(partitionStorage).close();
+        verify(partitionStorage, never()).close();
     }
 
     @Test
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 022d9c863b..6ae7e89a24 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -124,6 +124,5 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
 
     @Override
     public void close() {
-        partitionStorage.close();
     }
 }

Reply via email to