This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e1a4ded604 IGNITE-18968 Possible race between updating a low watermark
and processing the last batch for storage in a background GC (#1760)
e1a4ded604 is described below
commit e1a4ded604c735209fa5188c8fe1da9a0559e406
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Mar 7 11:40:22 2023 +0300
IGNITE-18968 Possible race between updating a low watermark and processing
the last batch for storage in a background GC (#1760)
---
.../ignite/internal/table/distributed/gc/MvGc.java | 41 ++++++++++++----
.../internal/table/distributed/gc/MvGcTest.java | 54 +++++++++++++++++++++-
2 files changed, 86 insertions(+), 9 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index 91b7867dec..a29f4e6d36 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table.distributed.gc;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.thread.NamedThreadFactory.threadPrefix;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.util.concurrent.CompletableFuture;
@@ -42,6 +43,7 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.ErrorGroups.GarbageCollector;
import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.TestOnly;
/**
* Garbage collector for multi-versioned storages and their indexes in the
background.
@@ -98,7 +100,7 @@ public class MvGc implements ManuallyCloseable {
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
- new NamedThreadFactory(nodeName, LOG)
+ new NamedThreadFactory(threadPrefix(nodeName, "mv-gc"), LOG)
);
}
@@ -193,17 +195,34 @@ public class MvGc implements ManuallyCloseable {
private void scheduleGcForStorage(TablePartitionId tablePartitionId) {
executor.submit(() -> inBusyLock(() -> {
- GcStorageHandler storageHandler =
storageHandlerByPartitionId.get(tablePartitionId);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ GcStorageHandler storageHandler =
storageHandlerByPartitionId.compute(tablePartitionId, (tablePartId,
gcStorageHandler) -> {
+ if (gcStorageHandler == null) {
+ // Storage has been removed from garbage collection.
+ return null;
+ }
+
+ CompletableFuture<Void> inProgressFuture =
gcStorageHandler.gcInProgressFuture.get();
+
+ if (inProgressFuture == null || inProgressFuture.isDone()) {
+ boolean casResult =
gcStorageHandler.gcInProgressFuture.compareAndSet(inProgressFuture, future);
+
+ assert casResult : tablePartId;
+ } else {
+ inProgressFuture.whenComplete((unused, throwable) ->
scheduleGcForStorage(tablePartitionId));
+ }
+
+ return gcStorageHandler;
+ });
if (storageHandler == null) {
// Storage has been removed from garbage collection.
return;
}
- CompletableFuture<Void> future = new CompletableFuture<>();
-
- if (!storageHandler.gcInProgressFuture.compareAndSet(null,
future)) {
- // In parallel, another task has already begun collecting
garbage.
+ if (storageHandler.gcInProgressFuture.get() != future) {
+ // Someone in parallel is already collecting garbage, we will
try once again after completion of gcInProgressFuture.
return;
}
@@ -227,8 +246,6 @@ public class MvGc implements ManuallyCloseable {
if (!future.isCompletedExceptionally()) {
future.complete(null);
}
-
- storageHandler.gcInProgressFuture.set(null);
}
scheduleGcForStorage(tablePartitionId);
@@ -254,4 +271,12 @@ public class MvGc implements ManuallyCloseable {
return null;
});
}
+
+ /**
+ * Schedule a new garbage collection for all storages.
+ */
+ @TestOnly
+ void scheduleGcForAllStorages() {
+ inBusyLock(this::initNewGcBusy);
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
index 070d22f1b8..7df77e99f5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.gc;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
@@ -25,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -193,7 +195,7 @@ public class MvGcTest {
gc.updateLowWatermark(new HybridTimestamp(2, 2));
- latch.await(1, TimeUnit.SECONDS);
+ assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
}
@Test
@@ -296,6 +298,43 @@ public class MvGcTest {
assertDoesNotThrow(gc::close);
}
+ @Test
+ void testParallelUpdateLowWatermark(
+ @InjectConfiguration
+ TablesConfiguration tablesConfig
+ ) throws Exception {
+ // By default, in the tests we work in one thread, we don’t have
enough this, we will add more.
+
assertThat(tablesConfig.gcThreads().update(Runtime.getRuntime().availableProcessors()),
willCompleteSuccessfully());
+
+ gc.close();
+
+ gc = new MvGc("test", tablesConfig);
+
+ gc.start();
+
+ gc.updateLowWatermark(new HybridTimestamp(1, 1));
+
+ for (int i = 0; i < 100; i++) {
+ CountDownLatch latch = new CountDownLatch(5);
+
+ TablePartitionId tablePartitionId = createTablePartitionId();
+
+ gc.addStorage(tablePartitionId,
createWithCountDownOnVacuumWithoutNextBatch(latch));
+
+ runRace(
+ () -> gc.scheduleGcForAllStorages(),
+ () -> gc.scheduleGcForAllStorages(),
+ () -> gc.scheduleGcForAllStorages(),
+ () -> gc.scheduleGcForAllStorages()
+ );
+
+ // We will check that we will call the vacuum on each update of
the low watermark.
+ assertTrue(latch.await(200, TimeUnit.MILLISECONDS), "remaining=" +
latch.getCount());
+
+ assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
+ }
+ }
+
private TablePartitionId createTablePartitionId() {
return new TablePartitionId(UUID.randomUUID(), PARTITION_ID);
}
@@ -361,4 +400,17 @@ public class MvGcTest {
assertEquals(GarbageCollector.CLOSED_ERR, exception.code());
}
+
+ private StorageUpdateHandler
createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
+ StorageUpdateHandler storageUpdateHandler =
mock(StorageUpdateHandler.class);
+
+
when(storageUpdateHandler.vacuum(any(HybridTimestamp.class))).then(invocation
-> {
+ latch.countDown();
+
+ // So that there is no processing of the next batch.
+ return false;
+ });
+
+ return storageUpdateHandler;
+ }
}