This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9fa4922485 IGNITE-18895 Implement a background GC process (#1720)
9fa4922485 is described below
commit 9fa492248596658d9af5ddcbac0b8ec640472767
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Mar 2 18:18:48 2023 +0300
IGNITE-18895 Implement a background GC process (#1720)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 11 +
.../configuration/TablesConfigurationSchema.java | 4 +
.../table/distributed/StorageUpdateHandler.java | 13 +-
.../internal/table/distributed/TableManager.java | 127 ++++---
.../table/distributed/gc/GcStorageHandler.java | 47 +++
.../ignite/internal/table/distributed/gc/MvGc.java | 257 +++++++++++++++
.../table/distributed/raft/PartitionListener.java | 1 +
.../raft/snapshot/PartitionAccessImpl.java | 48 +--
.../SnapshotAwarePartitionDataStorage.java | 2 -
.../TableManagerDistributionZonesTest.java | 6 +
.../internal/table/distributed/gc/MvGcTest.java | 364 +++++++++++++++++++++
.../raft/snapshot/PartitionAccessImplTest.java | 33 +-
.../incoming/IncomingSnapshotCopierTest.java | 29 +-
.../SnapshotAwarePartitionDataStorageTest.java | 2 +-
.../distributed/TestPartitionDataStorage.java | 1 -
15 files changed, 854 insertions(+), 91 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index bed0f9beab..07e5e5ca37 100755
--- a/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -415,4 +415,15 @@ public class ErrorGroups {
*/
public static final int UNIT_CONTENT_READ_ERR =
CODE_DEPLOYMENT_ERR_GROUP.registerErrorCode(3);
}
+
+ /**
+ * Garbage collector error group.
+ */
+ public static class GarbageCollector {
+ /** Garbage collector error group. */
+ public static final ErrorGroup GC_ERR_GROUP =
ErrorGroup.newGroup("GC", 14);
+
+ /** Garbage collector closed error. */
+ public static final int CLOSED_ERR = GC_ERR_GROUP.registerErrorCode(1);
+ }
}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
index 14493433a0..98b1ea234c 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
@@ -49,4 +49,8 @@ public class TablesConfigurationSchema {
@ExistingDataStorage
@Value(hasDefault = true)
public String defaultDataStorage = "aipersist";
+
+ /** Number of garbage collector threads. */
+ @Value(hasDefault = true)
+ public int gcThreads = Runtime.getRuntime().availableProcessors();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 16e21d1825..b842038cae 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -396,7 +396,10 @@ public class StorageUpdateHandler {
return true;
}
- private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
+ /**
+ * Adds a binary row to the indexes, if the tombstone then skips such
operation.
+ */
+ public void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
if (binaryRow == null) { // skip removes
return;
}
@@ -405,4 +408,12 @@ public class StorageUpdateHandler {
index.put(binaryRow, rowId);
}
}
+
+ /**
+ * Waits for indexes to be created.
+ */
+ // TODO: IGNITE-18619 Fix it, we should have already waited for the
indexes to be created
+ public void waitIndexes() {
+ indexes.get();
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 62a30dece5..a6b4cf639d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -58,6 +58,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -122,6 +123,7 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.table.distributed.message.HasDataRequest;
import org.apache.ignite.internal.table.distributed.message.HasDataResponse;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
@@ -309,6 +311,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/** Meta storage listener for switch reduce assignments. */
private final WatchListener assignmentsSwitchRebalanceListener;
+ private final MvGc mvGc;
+
/**
* Creates a new table manager.
*
@@ -451,11 +455,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
stableAssignmentsRebalanceListener =
createStableAssignmentsRebalanceListener();
assignmentsSwitchRebalanceListener =
createAssignmentsSwitchRebalanceListener();
+
+ mvGc = new MvGc(nodeName, tablesCfg);
}
- /** {@inheritDoc} */
@Override
public void start() {
+ mvGc.start();
+
tablesCfg.tables().any().replicas().listen(this::onUpdateReplicas);
// TODO: IGNITE-18694 - Recovery for the case when zones watch
listener processed event but assignments were not updated.
@@ -742,12 +749,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTbl, partId));
CompletableFuture<StorageUpdateHandler>
storageUpdateHandlerFut = partitionDataStorageFut
- .thenApply(storage -> new StorageUpdateHandler(
- partId,
- storage,
- table.indexStorageAdapters(partId),
- tblCfg.dataStorage()
- ));
+ .thenApply(storage -> {
+ StorageUpdateHandler storageUpdateHandler = new
StorageUpdateHandler(
+ partId,
+ storage,
+ table.indexStorageAdapters(partId),
+ tblCfg.dataStorage()
+ );
+
+ mvGc.addStorage(replicaGrpId,
storageUpdateHandler);
+
+ return storageUpdateHandler;
+ });
CompletableFuture<Void> startGroupFut;
@@ -803,7 +816,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTbl.storage(),
internalTbl.txStateStorage(),
partitionKey(internalTbl,
partId),
- table
+ storageUpdateHandler
);
Peer serverPeer =
newConfiguration.peer(localMemberName);
@@ -957,7 +970,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
PartitionKey partitionKey,
- TableImpl tableImpl
+ StorageUpdateHandler storageUpdateHandler
) {
RaftGroupOptions raftGroupOptions;
@@ -977,7 +990,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
partitionKey,
mvTableStorage,
txStateTableStorage,
- () ->
tableImpl.indexStorageAdapters(partitionKey.partitionId()).get().values()
+ storageUpdateHandler,
+ mvGc
),
incomingSnapshotsExecutor
));
@@ -1027,7 +1041,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
List<Runnable> stopping = new ArrayList<>();
- AtomicReference<Exception> exception = new AtomicReference<>();
+ AtomicReference<Throwable> throwable = new AtomicReference<>();
AtomicBoolean nodeStoppingEx = new AtomicBoolean();
@@ -1037,32 +1051,27 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
stopping.add(() -> {
try {
raftMgr.stopRaftNodes(replicationGroupId);
- } catch (Exception e) {
- if (!exception.compareAndSet(null, e)) {
- if (!(e instanceof NodeStoppingException) ||
!nodeStoppingEx.get()) {
- exception.get().addSuppressed(e);
- }
- }
-
- if (e instanceof NodeStoppingException) {
- nodeStoppingEx.set(true);
- }
+ } catch (Throwable t) {
+ handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
}
});
stopping.add(() -> {
try {
replicaMgr.stopReplica(replicationGroupId);
- } catch (Exception e) {
- if (!exception.compareAndSet(null, e)) {
- if (!(e instanceof NodeStoppingException) ||
!nodeStoppingEx.get()) {
- exception.get().addSuppressed(e);
- }
- }
+ } catch (Throwable t) {
+ handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
+ }
+ });
- if (e instanceof NodeStoppingException) {
- nodeStoppingEx.set(true);
- }
+ CompletableFuture<Void> removeFromGcFuture =
mvGc.removeStorage(replicationGroupId);
+
+ stopping.add(() -> {
+ try {
+ // Should be done fairly quickly.
+ removeFromGcFuture.join();
+ } catch (Throwable t) {
+ handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
}
});
}
@@ -1075,14 +1084,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
table.internalTable().txStateStorage(),
table.internalTable()
);
- } catch (Exception e) {
- if (!exception.compareAndSet(null, e)) {
- exception.get().addSuppressed(e);
- }
+ } catch (Throwable t) {
+ handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
}
- if (exception.get() != null) {
- LOG.info("Unable to stop table [name={}, tableId={}]",
exception.get(), table.name(), table.tableId());
+ if (throwable.get() != null) {
+ LOG.error("Unable to stop table [name={}, tableId={}]",
throwable.get(), table.name(), table.tableId());
}
}
}
@@ -1246,12 +1253,16 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
try {
int partitions = assignment.size();
+ CompletableFuture<?>[] removeStorageFromGcFutures = new
CompletableFuture<?>[partitions];
+
for (int p = 0; p < partitions; p++) {
TablePartitionId replicationGroupId = new
TablePartitionId(tblId, p);
raftMgr.stopRaftNodes(replicationGroupId);
replicaMgr.stopReplica(replicationGroupId);
+
+ removeStorageFromGcFutures[p] =
mvGc.removeStorage(replicationGroupId);
}
tablesByIdVv.update(causalityToken, (previousVal, e) ->
inBusyLock(busyLock, () -> {
@@ -1273,10 +1284,11 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
// TODO: IGNITE-18703 Destroy raft log and meta
- CompletableFuture<Void> destroyTableStoragesFuture = allOf(
- table.internalTable().storage().destroy(),
- runAsync(() ->
table.internalTable().txStateStorage().destroy(), ioExecutor)
- );
+ CompletableFuture<Void> destroyTableStoragesFuture =
allOf(removeStorageFromGcFutures)
+ .thenCompose(unused -> allOf(
+ table.internalTable().storage().destroy(),
+ runAsync(() ->
table.internalTable().txStateStorage().destroy(), ioExecutor))
+ );
CompletableFuture<?> dropSchemaRegistryFuture =
schemaManager.dropRegistry(causalityToken, table.tableId())
.thenCompose(
@@ -2002,7 +2014,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTable.storage(),
internalTable.txStateStorage(),
partitionKey(internalTable, partId),
- tbl
+ storageUpdateHandler
);
RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -2182,13 +2194,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
/**
- * Creates or gets partition stores. If one of the storages has not
completed the rebalance, then the storages are cleared.
+ * Creates partition stores. If one of the storages has not completed the
rebalance, then the storages are cleared.
*
* @param table Table.
* @param partitionId Partition ID.
* @return Future of creating or getting partition stores.
*/
// TODO: IGNITE-18619 Maybe we should wait here to create indexes, if you
add now, then the tests start to hang
+ // TODO: IGNITE-18939 Create storages only once, then only get them
private CompletableFuture<PartitionStorages>
getOrCreatePartitionStorages(TableImpl table, int partitionId) {
InternalTable internalTable = table.internalTable();
@@ -2260,11 +2273,33 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
// TODO: IGNITE-18703 Destroy raft log and meta
// Should be done fairly quickly.
- allOf(
- internalTable.storage().destroyPartition(partitionId),
- runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
- ).join();
+ mvGc.removeStorage(tablePartitionId)
+ .thenCompose(unused -> allOf(
+
internalTable.storage().destroyPartition(partitionId),
+ runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
+ ))
+ .join();
}
});
}
+
+ private static void handleExceptionOnCleanUpTablesResources(
+ Throwable t,
+ AtomicReference<Throwable> throwable,
+ AtomicBoolean nodeStoppingEx
+ ) {
+ if (t instanceof CompletionException || t instanceof
ExecutionException) {
+ t = t.getCause();
+ }
+
+ if (!throwable.compareAndSet(null, t)) {
+ if (!(t instanceof NodeStoppingException) ||
!nodeStoppingEx.get()) {
+ throwable.get().addSuppressed(t);
+ }
+ }
+
+ if (t instanceof NodeStoppingException) {
+ nodeStoppingEx.set(true);
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
new file mode 100644
index 0000000000..d8396e115b
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcStorageHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+
+/**
+ * Container for handling storage by the garbage collector.
+ */
+class GcStorageHandler {
+ /**
+ * Handler of multi-versioned partition storage and its indexes for
garbage collection.
+ *
+ * @see StorageUpdateHandler#vacuum(HybridTimestamp)
+ */
+ final StorageUpdateHandler storageUpdateHandler;
+
+ /**
+ * Reference to the future of garbage collection batch for multi-versioned
partition storage and its indexes.
+ *
+ * <p>Before the garbage collection batch begins, the new future must be
inserted, after the garbage collection batch ends, the
+ * inserted future must be completed and removed.
+ */
+ final AtomicReference<CompletableFuture<Void>> gcInProgressFuture = new
AtomicReference<>();
+
+ GcStorageHandler(StorageUpdateHandler storageUpdateHandler) {
+ this.storageUpdateHandler = storageUpdateHandler;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
new file mode 100644
index 0000000000..91b7867dec
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ErrorGroups.GarbageCollector;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Garbage collector for multi-versioned storages and their indexes in the
background.
+ *
+ * @see MvPartitionStorage#pollForVacuum(HybridTimestamp)
+ */
+public class MvGc implements ManuallyCloseable {
+ private static final IgniteLogger LOG = Loggers.forClass(MvGc.class);
+
+ /** GC batch size for the storage. */
+ static final int GC_BATCH_SIZE = 5;
+
+ /** Node name. */
+ private final String nodeName;
+
+ /** Tables configuration. */
+ private final TablesConfiguration tablesConfig;
+
+ /** Garbage collection thread pool. */
+ private volatile ExecutorService executor;
+
+ /** Prevents double closing. */
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ /** Busy lock to close synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Low watermark. */
+ private final AtomicReference<HybridTimestamp> lowWatermarkReference = new
AtomicReference<>();
+
+ /** Storage handler by table partition ID for which garbage will be
collected. */
+ private final ConcurrentMap<TablePartitionId, GcStorageHandler>
storageHandlerByPartitionId = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param tablesConfig Tables configuration.
+ */
+ public MvGc(String nodeName, TablesConfiguration tablesConfig) {
+ this.nodeName = nodeName;
+ this.tablesConfig = tablesConfig;
+ }
+
+ /**
+ * Starts the garbage collector.
+ */
+ public void start() {
+ int threadCount = tablesConfig.gcThreads().value();
+
+ executor = new ThreadPoolExecutor(
+ threadCount,
+ threadCount,
+ 30,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory(nodeName, LOG)
+ );
+ }
+
+ /**
+ * Adds storage for background garbage collection when updating a low
watermark.
+ *
+ * @param tablePartitionId Table partition ID.
+ * @param storageUpdateHandler Storage update handler.
+ * @throws IgniteInternalException with {@link
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
+ */
+ public void addStorage(TablePartitionId tablePartitionId,
StorageUpdateHandler storageUpdateHandler) {
+ inBusyLock(() -> {
+ GcStorageHandler previous =
storageHandlerByPartitionId.putIfAbsent(
+ tablePartitionId,
+ new GcStorageHandler(storageUpdateHandler)
+ );
+
+ // TODO: IGNITE-18939 Should be called once, you need to check
that previous == null
+ if (previous == null && lowWatermarkReference.get() != null) {
+ scheduleGcForStorage(tablePartitionId);
+ }
+ });
+ }
+
+ /**
+ * Removes storage for background garbage collection and completes the
garbage collection for it.
+ *
+ * <p>Should be called before rebalancing/closing/destroying the storage.
+ *
+ * @param tablePartitionId Table partition ID.
+ * @return Storage garbage collection completion future.
+ * @throws IgniteInternalException with {@link
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
+ */
+ public CompletableFuture<Void> removeStorage(TablePartitionId
tablePartitionId) {
+ return inBusyLock(() -> {
+ GcStorageHandler removed =
storageHandlerByPartitionId.remove(tablePartitionId);
+
+ if (removed == null) {
+ return completedFuture(null);
+ }
+
+ CompletableFuture<Void> gcInProgressFuture =
removed.gcInProgressFuture.get();
+
+ return gcInProgressFuture == null ? completedFuture(null) :
gcInProgressFuture;
+ });
+ }
+
+ /**
+ * Updates the new watermark only if it is larger than the current low
watermark.
+ *
+ * <p>If the update is successful, it will schedule a new garbage
collection for all storages.
+ *
+ * @param newLwm New low watermark.
+ * @throws IgniteInternalException with {@link
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
+ */
+ public void updateLowWatermark(HybridTimestamp newLwm) {
+ inBusyLock(() -> {
+ HybridTimestamp updatedLwm =
lowWatermarkReference.updateAndGet(currentLwm -> {
+ if (currentLwm == null) {
+ return newLwm;
+ }
+
+ // Update only if the new one is greater than the current one.
+ return newLwm.compareTo(currentLwm) > 0 ? newLwm : currentLwm;
+ });
+
+ // If the new watermark is smaller than the current one or has
been updated in parallel, then we do nothing.
+ if (updatedLwm != newLwm) {
+ return;
+ }
+
+ executor.submit(() -> inBusyLock(this::initNewGcBusy));
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ if (executor != null) {
+ shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
+ }
+ }
+
+ private void initNewGcBusy() {
+
storageHandlerByPartitionId.keySet().forEach(this::scheduleGcForStorage);
+ }
+
+ private void scheduleGcForStorage(TablePartitionId tablePartitionId) {
+ executor.submit(() -> inBusyLock(() -> {
+ GcStorageHandler storageHandler =
storageHandlerByPartitionId.get(tablePartitionId);
+
+ if (storageHandler == null) {
+ // Storage has been removed from garbage collection.
+ return;
+ }
+
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ if (!storageHandler.gcInProgressFuture.compareAndSet(null,
future)) {
+ // In parallel, another task has already begun collecting
garbage.
+ return;
+ }
+
+ try {
+ for (int i = 0; i < GC_BATCH_SIZE; i++) {
+ HybridTimestamp lowWatermark = lowWatermarkReference.get();
+
+ assert lowWatermark != null : tablePartitionId;
+
+ // If storage has been deleted or there is no garbage,
then for now we will stop collecting garbage for this storage.
+ if
(!storageHandlerByPartitionId.containsKey(tablePartitionId)
+ ||
!storageHandler.storageUpdateHandler.vacuum(lowWatermark)) {
+ return;
+ }
+ }
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+
+ return;
+ } finally {
+ if (!future.isCompletedExceptionally()) {
+ future.complete(null);
+ }
+
+ storageHandler.gcInProgressFuture.set(null);
+ }
+
+ scheduleGcForStorage(tablePartitionId);
+ }));
+ }
+
+ private <T> T inBusyLock(Supplier<T> supplier) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(GarbageCollector.CLOSED_ERR);
+ }
+
+ try {
+ return supplier.get();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private void inBusyLock(Runnable runnable) {
+ inBusyLock(() -> {
+ runnable.run();
+
+ return null;
+ });
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index cfa716db62..6106693e96 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -427,6 +427,7 @@ public class PartitionListener implements RaftGroupListener
{
@Override
public void onShutdown() {
+ storage.close();
}
/**
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
index bf97b1fb23..4d5b09294e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java
@@ -17,19 +17,19 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot;
-import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
-import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -48,28 +48,33 @@ public class PartitionAccessImpl implements PartitionAccess
{
private final TxStateTableStorage txStateTableStorage;
- private final Supplier<Collection<TableSchemaAwareIndexStorage>> indexes;
-
private final RaftGroupConfigurationConverter
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
+ private final StorageUpdateHandler storageUpdateHandler;
+
+ private final MvGc mvGc;
+
/**
* Constructor.
*
* @param partitionKey Partition key.
* @param mvTableStorage Multi version table storage.
* @param txStateTableStorage Table transaction state storage.
- * @param indexes Index storages supplier.
+ * @param storageUpdateHandler Storage update handler.
+ * @param mvGc Garbage collector for multi-versioned storages and their
indexes in the background.
*/
public PartitionAccessImpl(
PartitionKey partitionKey,
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
- Supplier<Collection<TableSchemaAwareIndexStorage>> indexes
+ StorageUpdateHandler storageUpdateHandler,
+ MvGc mvGc
) {
this.partitionKey = partitionKey;
this.mvTableStorage = mvTableStorage;
this.txStateTableStorage = txStateTableStorage;
- this.indexes = indexes;
+ this.storageUpdateHandler = storageUpdateHandler;
+ this.mvGc = mvGc;
}
@Override
@@ -119,7 +124,7 @@ public class PartitionAccessImpl implements PartitionAccess
{
mvPartitionStorage.runConsistently(() -> {
mvPartitionStorage.addWrite(rowId, row, txId, commitTableId,
commitPartitionId);
- addToIndexes(row, rowId);
+ storageUpdateHandler.addToIndexes(row, rowId);
return null;
});
@@ -132,7 +137,7 @@ public class PartitionAccessImpl implements PartitionAccess
{
mvPartitionStorage.runConsistently(() -> {
mvPartitionStorage.addWriteCommitted(rowId, row, commitTimestamp);
- addToIndexes(row, rowId);
+ storageUpdateHandler.addToIndexes(row, rowId);
return null;
});
@@ -173,14 +178,15 @@ public class PartitionAccessImpl implements
PartitionAccess {
@Override
public CompletableFuture<Void> startRebalance() {
// TODO: IGNITE-18619 Fix it, we should have already waited for the
indexes to be created
- indexes.get();
+ storageUpdateHandler.waitIndexes();
TxStateStorage txStateStorage = getTxStateStorage(partitionId());
- return CompletableFuture.allOf(
- mvTableStorage.startRebalancePartition(partitionId()),
- txStateStorage.startRebalance()
- );
+ return mvGc.removeStorage(toTablePartitionId(partitionKey))
+ .thenCompose(unused -> CompletableFuture.allOf(
+ mvTableStorage.startRebalancePartition(partitionId()),
+ txStateStorage.startRebalance()
+ ));
}
@Override
@@ -190,7 +196,7 @@ public class PartitionAccessImpl implements PartitionAccess
{
return CompletableFuture.allOf(
mvTableStorage.abortRebalancePartition(partitionId()),
txStateStorage.abortRebalance()
- );
+ ).thenAccept(unused ->
mvGc.addStorage(toTablePartitionId(partitionKey), storageUpdateHandler));
}
@Override
@@ -202,7 +208,7 @@ public class PartitionAccessImpl implements PartitionAccess
{
return CompletableFuture.allOf(
mvTableStorage.finishRebalancePartition(partitionId(),
lastAppliedIndex, lastAppliedTerm, configBytes),
txStateStorage.finishRebalance(lastAppliedIndex,
lastAppliedTerm)
- );
+ ).thenAccept(unused ->
mvGc.addStorage(toTablePartitionId(partitionKey), storageUpdateHandler));
}
private MvPartitionStorage getMvPartitionStorage(int partitionId) {
@@ -221,11 +227,7 @@ public class PartitionAccessImpl implements
PartitionAccess {
return txStateStorage;
}
- private void addToIndexes(@Nullable BinaryRow binaryRow, RowId rowId) {
- if (binaryRow == null) {
- return;
- }
-
- indexes.get().forEach(index -> index.put(binaryRow, rowId));
+ private static TablePartitionId toTablePartitionId(PartitionKey
partitionKey) {
+ return new TablePartitionId(partitionKey.tableId(),
partitionKey.partitionId());
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
index f6458d5579..28161e3c0a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java
@@ -175,8 +175,6 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
@Override
public void close() {
cleanupSnapshots();
-
- partitionStorage.close();
}
private void cleanupSnapshots() {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
index 64e3fed14a..9294016373 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerDistributionZonesTest.java
@@ -481,5 +481,11 @@ public class TableManagerDistributionZonesTest extends
IgniteAbstractTest {
when(tableCfg.assignments()).thenReturn(mock(ConfigurationValue.class));
when(tablesConfiguration.tables()).thenReturn(tables);
+
+ ConfigurationValue<Integer> gcThreads = mock(ConfigurationValue.class);
+
+ when(gcThreads.value()).thenReturn(1);
+
+ when(tablesConfiguration.gcThreads()).thenReturn(gcThreads);
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
new file mode 100644
index 0000000000..070d22f1b8
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.gc;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.lang.ErrorGroups.GarbageCollector;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.function.Executable;
+
+/**
+ * For testing {@link MvGc}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class MvGcTest {
+ private static final int PARTITION_ID = 0;
+
+ private MvGc gc;
+
+ @BeforeEach
+ void setUp(
+ @InjectConfiguration("mock.gcThreads = 1")
+ TablesConfiguration tablesConfig
+ ) {
+ gc = new MvGc("test", tablesConfig);
+
+ gc.start();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ closeAllManually(gc);
+ }
+
+ @Test
+ void testAddStorageWithoutLowWatermark() {
+ CompletableFuture<Void> invokeVacuumMethodFuture = new
CompletableFuture<>();
+
+ gc.addStorage(createTablePartitionId(),
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
+
+ // We expect that StorageUpdateHandler#vacuum will not be called.
+ assertThat(invokeVacuumMethodFuture, willTimeoutFast());
+ }
+
+ @Test
+ void testAddStorageWithLowWatermark() {
+ HybridTimestamp lowWatermark = new HybridTimestamp(1, 1);
+
+ gc.updateLowWatermark(lowWatermark);
+
+ CompletableFuture<Void> invokeVacuumMethodFuture = new
CompletableFuture<>();
+
+ gc.addStorage(createTablePartitionId(),
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, lowWatermark));
+
+ // We expect StorageUpdateHandler#vacuum to be called with the set low
watermark.
+ assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testStartVacuumOnSuccessfulUpdateLowWatermark() {
+ CompletableFuture<Void> invokeVacuumMethodFuture0 = new
CompletableFuture<>();
+ CompletableFuture<Void> invokeVacuumMethodFuture1 = new
CompletableFuture<>();
+
+ HybridTimestamp lowWatermark0 = new HybridTimestamp(1, 1);
+
+ StorageUpdateHandler storageUpdateHandler0 =
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, lowWatermark0);
+ StorageUpdateHandler storageUpdateHandler1 =
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture1, lowWatermark0);
+
+ gc.addStorage(createTablePartitionId(), storageUpdateHandler0);
+ gc.addStorage(createTablePartitionId(), storageUpdateHandler1);
+
+ gc.updateLowWatermark(lowWatermark0);
+
+ // We expect StorageUpdateHandler#vacuum to be called with the set
lowWatermark0.
+ assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
+ assertThat(invokeVacuumMethodFuture1, willCompleteSuccessfully());
+
+ // What happens if we increase low watermark ?
+ CompletableFuture<Void> invokeVacuumMethodFuture2 = new
CompletableFuture<>();
+ CompletableFuture<Void> invokeVacuumMethodFuture3 = new
CompletableFuture<>();
+
+ HybridTimestamp lowWatermark1 = new HybridTimestamp(2, 2);
+
+ completeFutureOnVacuum(storageUpdateHandler0,
invokeVacuumMethodFuture2, lowWatermark1);
+ completeFutureOnVacuum(storageUpdateHandler1,
invokeVacuumMethodFuture3, lowWatermark1);
+
+ gc.updateLowWatermark(lowWatermark1);
+
+ // We expect StorageUpdateHandler#vacuum to be called with the set
lowWatermark0.
+ assertThat(invokeVacuumMethodFuture2, willCompleteSuccessfully());
+ assertThat(invokeVacuumMethodFuture3, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testStartVacuumOnFailUpdateLowWatermark() {
+ HybridTimestamp firstLowWatermark = new HybridTimestamp(2, 2);
+
+ CompletableFuture<Void> invokeVacuumMethodFuture0 = new
CompletableFuture<>();
+ CompletableFuture<Void> invokeVacuumMethodFuture1 = new
CompletableFuture<>();
+
+ StorageUpdateHandler storageUpdateHandler0 =
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, firstLowWatermark);
+ StorageUpdateHandler storageUpdateHandler1 =
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture1, firstLowWatermark);
+
+ gc.addStorage(createTablePartitionId(), storageUpdateHandler0);
+ gc.addStorage(createTablePartitionId(), storageUpdateHandler1);
+
+ gc.updateLowWatermark(firstLowWatermark);
+
+ // We expect StorageUpdateHandler#vacuum to be called with the set
lowWatermark0.
+ assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
+ assertThat(invokeVacuumMethodFuture1, willCompleteSuccessfully());
+
+ // What happens if we try set same low watermark ?
+ HybridTimestamp sameLowWatermark = new HybridTimestamp(2, 2);
+
+ CompletableFuture<Void> invokeVacuumMethodFutureForSame0 = new
CompletableFuture<>();
+ CompletableFuture<Void> invokeVacuumMethodFutureForSame1 = new
CompletableFuture<>();
+
+ completeFutureOnVacuum(storageUpdateHandler0,
invokeVacuumMethodFutureForSame0, sameLowWatermark);
+ completeFutureOnVacuum(storageUpdateHandler1,
invokeVacuumMethodFutureForSame1, sameLowWatermark);
+
+ gc.updateLowWatermark(sameLowWatermark);
+
+ // We expect that StorageUpdateHandler#vacuum will not be called.
+ assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
+ assertThat(invokeVacuumMethodFutureForSame1, willTimeoutFast());
+
+ // What happens if we try set same lower watermark ?
+ HybridTimestamp lowerLowWatermark = new HybridTimestamp(1, 1);
+
+ CompletableFuture<Void> invokeVacuumMethodFutureForLower0 = new
CompletableFuture<>();
+ CompletableFuture<Void> invokeVacuumMethodFutureForLower1 = new
CompletableFuture<>();
+
+ completeFutureOnVacuum(storageUpdateHandler0,
invokeVacuumMethodFutureForLower0, lowerLowWatermark);
+ completeFutureOnVacuum(storageUpdateHandler1,
invokeVacuumMethodFutureForLower1, lowerLowWatermark);
+
+ gc.updateLowWatermark(lowerLowWatermark);
+
+ // We expect that StorageUpdateHandler#vacuum will not be called.
+ assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
+ assertThat(invokeVacuumMethodFutureForSame1, willTimeoutFast());
+ assertThat(invokeVacuumMethodFutureForLower0, willTimeoutFast());
+ assertThat(invokeVacuumMethodFutureForLower1, willTimeoutFast());
+ }
+
+ @Test
+ void testCountInvokeVacuum() throws Exception {
+ CountDownLatch latch = new CountDownLatch(MvGc.GC_BATCH_SIZE + 2);
+
+ StorageUpdateHandler storageUpdateHandler =
createWithCountDownOnVacuum(latch);
+
+ gc.addStorage(createTablePartitionId(), storageUpdateHandler);
+
+ gc.updateLowWatermark(new HybridTimestamp(2, 2));
+
+ latch.await(1, TimeUnit.SECONDS);
+ }
+
+ @Test
+ void testRemoveStorageNotExist() {
+ assertThat(gc.removeStorage(createTablePartitionId()),
willCompleteSuccessfully());
+ }
+
+ @Test
+ void testRemoveStorageForCompletedGc() {
+ CompletableFuture<Void> invokeVacuumMethodFuture = new
CompletableFuture<>();
+
+ TablePartitionId tablePartitionId = createTablePartitionId();
+
+ gc.addStorage(tablePartitionId,
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
+
+ gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+ assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
+ assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
+
+ // What happens if we delete it again?
+ assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
+ }
+
+ @Test
+ void testRemoveStorageInMiddleGc() {
+ CompletableFuture<Void> startInvokeVacuumMethodFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> finishInvokeVacuumMethodFuture = new
CompletableFuture<>();
+
+ TablePartitionId tablePartitionId = createTablePartitionId();
+
+ gc.addStorage(tablePartitionId,
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture,
finishInvokeVacuumMethodFuture));
+
+ gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+ assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
+
+ CompletableFuture<Void> removeStorageFuture =
gc.removeStorage(tablePartitionId);
+
+ assertThat(removeStorageFuture, willTimeoutFast());
+
+ finishInvokeVacuumMethodFuture.complete(null);
+
+ assertThat(removeStorageFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testRemoveStorageWithError() {
+ CompletableFuture<Void> startInvokeVacuumMethodFuture = new
CompletableFuture<>();
+ CompletableFuture<Void> finishInvokeVacuumMethodFuture = new
CompletableFuture<>();
+
+ TablePartitionId tablePartitionId = createTablePartitionId();
+
+ gc.addStorage(tablePartitionId,
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture,
finishInvokeVacuumMethodFuture));
+
+ gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+ assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
+
+ CompletableFuture<Void> removeStorageFuture =
gc.removeStorage(tablePartitionId);
+
+ assertThat(removeStorageFuture, willTimeoutFast());
+
+ finishInvokeVacuumMethodFuture.completeExceptionally(new
RuntimeException("form test"));
+
+ assertThat(removeStorageFuture, willFailFast(RuntimeException.class));
+ }
+
+ @Test
+ void testRemoveStorage() {
+ CompletableFuture<Void> invokeVacuumMethodFuture0 = new
CompletableFuture<>();
+
+ TablePartitionId tablePartitionId = createTablePartitionId();
+
+ StorageUpdateHandler storageUpdateHandler =
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture0, null);
+
+ gc.addStorage(tablePartitionId, storageUpdateHandler);
+
+ gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+ assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
+ assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
+
+ // What happens if we update the low watermark?
+ CompletableFuture<Void> invokeVacuumMethodFuture1 = new
CompletableFuture<>();
+
+ completeFutureOnVacuum(storageUpdateHandler,
invokeVacuumMethodFuture1, null);
+
+ assertThat(invokeVacuumMethodFuture1, willTimeoutFast());
+ }
+
+ @Test
+ void testClose() throws Exception {
+ gc.close();
+
+ assertThrowsClosed(() -> gc.addStorage(createTablePartitionId(),
mock(StorageUpdateHandler.class)));
+ assertThrowsClosed(() -> gc.removeStorage(createTablePartitionId()));
+ assertThrowsClosed(() -> gc.updateLowWatermark(new HybridTimestamp(1,
1)));
+
+ assertDoesNotThrow(gc::close);
+ }
+
+ private TablePartitionId createTablePartitionId() {
+ return new TablePartitionId(UUID.randomUUID(), PARTITION_ID);
+ }
+
+ private StorageUpdateHandler
createWithCompleteFutureOnVacuum(CompletableFuture<Void> future, @Nullable
HybridTimestamp exp) {
+ StorageUpdateHandler storageUpdateHandler =
mock(StorageUpdateHandler.class);
+
+ completeFutureOnVacuum(storageUpdateHandler, future, exp);
+
+ return storageUpdateHandler;
+ }
+
+ private void completeFutureOnVacuum(
+ StorageUpdateHandler storageUpdateHandler,
+ CompletableFuture<Void> future,
+ @Nullable HybridTimestamp exp
+ ) {
+
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation
-> {
+ if (exp != null) {
+ try {
+ assertEquals(exp, invocation.getArgument(0));
+
+ future.complete(null);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ } else {
+ future.complete(null);
+ }
+
+ return false;
+ });
+ }
+
+ private StorageUpdateHandler createWithCountDownOnVacuum(CountDownLatch
latch) {
+ StorageUpdateHandler storageUpdateHandler =
mock(StorageUpdateHandler.class);
+
+
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation
-> {
+ latch.countDown();
+
+ return latch.getCount() > 0;
+ });
+
+ return storageUpdateHandler;
+ }
+
+ private StorageUpdateHandler
createWithWaitFinishVacuum(CompletableFuture<Void> startFuture,
CompletableFuture<Void> finishFuture) {
+ StorageUpdateHandler storageUpdateHandler =
mock(StorageUpdateHandler.class);
+
+
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation
-> {
+ startFuture.complete(null);
+
+ assertThat(finishFuture, willCompleteSuccessfully());
+
+ return false;
+ });
+
+ return storageUpdateHandler;
+ }
+
+ private static void assertThrowsClosed(Executable executable) {
+ IgniteInternalException exception =
assertThrows(IgniteInternalException.class, executable);
+
+ assertEquals(GarbageCollector.CLOSED_ERR, exception.code());
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
index 84e328c9d3..b932e672bc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImplTest.java
@@ -22,12 +22,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -39,7 +37,8 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
-import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.junit.jupiter.api.Test;
@@ -67,7 +66,8 @@ public class PartitionAccessImplTest {
new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
mvTableStorage,
txStateTableStorage,
- List::of
+ mock(StorageUpdateHandler.class),
+ mock(MvGc.class)
);
assertEquals(0, partitionAccess.minLastAppliedIndex());
@@ -107,7 +107,8 @@ public class PartitionAccessImplTest {
new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
mvTableStorage,
txStateTableStorage,
- List::of
+ mock(StorageUpdateHandler.class),
+ mock(MvGc.class)
);
assertEquals(0, partitionAccess.minLastAppliedTerm());
@@ -141,13 +142,14 @@ public class PartitionAccessImplTest {
MvPartitionStorage mvPartitionStorage =
createMvPartition(mvTableStorage, TEST_PARTITION_ID);
- TableSchemaAwareIndexStorage indexStorage =
mock(TableSchemaAwareIndexStorage.class);
+ StorageUpdateHandler storageUpdateHandler =
mock(StorageUpdateHandler.class);
PartitionAccess partitionAccess = new PartitionAccessImpl(
new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
mvTableStorage,
new TestTxStateTableStorage(),
- () -> List.of(indexStorage)
+ storageUpdateHandler,
+ mock(MvGc.class)
);
RowId rowId = new RowId(TEST_PARTITION_ID);
@@ -159,18 +161,18 @@ public class PartitionAccessImplTest {
verify(mvPartitionStorage, times(1)).addWrite(eq(rowId),
eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
- verify(indexStorage, times(1)).put(eq(binaryRow), eq(rowId));
+ verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow),
eq(rowId));
// Let's check with a null binaryRow.
binaryRow = null;
- reset(mvPartitionStorage, indexStorage);
+ reset(mvPartitionStorage, storageUpdateHandler);
partitionAccess.addWrite(rowId, binaryRow, txId, commitTableId,
TEST_PARTITION_ID);
verify(mvPartitionStorage, times(1)).addWrite(eq(rowId),
eq(binaryRow), eq(txId), eq(commitTableId), eq(TEST_PARTITION_ID));
- verify(indexStorage, never()).put(eq(binaryRow), eq(rowId));
+ verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow),
eq(rowId));
}
@Test
@@ -179,13 +181,14 @@ public class PartitionAccessImplTest {
MvPartitionStorage mvPartitionStorage =
createMvPartition(mvTableStorage, TEST_PARTITION_ID);
- TableSchemaAwareIndexStorage indexStorage =
mock(TableSchemaAwareIndexStorage.class);
+ StorageUpdateHandler storageUpdateHandler =
mock(StorageUpdateHandler.class);
PartitionAccess partitionAccess = new PartitionAccessImpl(
new PartitionKey(UUID.randomUUID(), TEST_PARTITION_ID),
mvTableStorage,
new TestTxStateTableStorage(),
- () -> List.of(indexStorage)
+ storageUpdateHandler,
+ mock(MvGc.class)
);
RowId rowId = new RowId(TEST_PARTITION_ID);
@@ -195,18 +198,18 @@ public class PartitionAccessImplTest {
verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId),
eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
- verify(indexStorage, times(1)).put(eq(binaryRow), eq(rowId));
+ verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow),
eq(rowId));
// Let's check with a null binaryRow.
binaryRow = null;
- reset(mvPartitionStorage, indexStorage);
+ reset(mvPartitionStorage, storageUpdateHandler);
partitionAccess.addWriteCommitted(rowId, binaryRow,
HybridTimestamp.MAX_VALUE);
verify(mvPartitionStorage, times(1)).addWriteCommitted(eq(rowId),
eq(binaryRow), eq(HybridTimestamp.MAX_VALUE));
- verify(indexStorage, never()).put(eq(binaryRow), eq(rowId));
+ verify(storageUpdateHandler, times(1)).addToIndexes(eq(binaryRow),
eq(rowId));
}
private static MvPartitionStorage createMvPartition(MvTableStorage
tableStorage, int partitionId) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 61fed2d6d0..8e66e4bcdd 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -73,7 +73,9 @@ import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
+import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.gc.MvGc;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfiguration;
import
org.apache.ignite.internal.table.distributed.raft.RaftGroupConfigurationConverter;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionAccessImpl;
@@ -102,6 +104,7 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.option.SnapshotCopierOptions;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -135,8 +138,19 @@ public class IncomingSnapshotCopierTest {
private final UUID snapshotId = UUID.randomUUID();
+ private final UUID tableId = UUID.randomUUID();
+
private final RaftGroupConfigurationConverter
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
+ private MvGc mvGc;
+
+ @BeforeEach
+ void setUp() {
+ mvGc = mock(MvGc.class);
+
+ when(mvGc.removeStorage(any(TablePartitionId.class))).then(invocation
-> completedFuture(null));
+ }
+
@AfterEach
void tearDown() {
shutdownAndAwaitTermination(executorService, 1, TimeUnit.SECONDS);
@@ -182,6 +196,11 @@ public class IncomingSnapshotCopierTest {
assertEquals(Status.OK().getCode(), snapshotCopier.getCode());
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
TEST_PARTITION);
+
+ verify(mvGc, times(1)).removeStorage(eq(tablePartitionId));
+ verify(mvGc, times(1)).addStorage(eq(tablePartitionId),
any(StorageUpdateHandler.class));
+
MvPartitionStorage incomingMvPartitionStorage =
incomingMvTableStorage.getMvPartition(TEST_PARTITION);
TxStateStorage incomingTxStatePartitionStorage =
incomingTxStateTableStorage.getTxStateStorage(TEST_PARTITION);
@@ -272,10 +291,11 @@ public class IncomingSnapshotCopierTest {
SnapshotUri.toStringUri(snapshotId, NODE_NAME),
mock(RaftOptions.class),
spy(new PartitionAccessImpl(
- new PartitionKey(UUID.randomUUID(), TEST_PARTITION),
+ new PartitionKey(tableId, TEST_PARTITION),
incomingTableStorage,
incomingTxStateTableStorage,
- List::of
+ mock(StorageUpdateHandler.class),
+ mvGc
)),
mock(SnapshotMeta.class),
executorService
@@ -568,6 +588,11 @@ public class IncomingSnapshotCopierTest {
assertThat(runAsync(snapshotCopier::join),
willFailFast(IllegalStateException.class));
verify(partitionSnapshotStorage.partition()).abortRebalance();
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
TEST_PARTITION);
+
+ verify(mvGc, times(1)).removeStorage(eq(tablePartitionId));
+ verify(mvGc, times(1)).addStorage(eq(tablePartitionId),
any(StorageUpdateHandler.class));
}
private TableConfiguration getTableConfig() {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
index 26bdbd396c..b91b0580e6 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorageTest.java
@@ -178,7 +178,7 @@ class SnapshotAwarePartitionDataStorageTest {
void delegatesClose() {
testedStorage.close();
- verify(partitionStorage).close();
+ verify(partitionStorage, never()).close();
}
@Test
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 022d9c863b..6ae7e89a24 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -124,6 +124,5 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
@Override
public void close() {
- partitionStorage.close();
}
}