This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new e965b2e42dc IGNITE-25463 Fix flaky RocksDbFlusherTest (#5887)
e965b2e42dc is described below
commit e965b2e42dc36833508eb3605a2bd2f39ca4f227
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Fri May 23 15:33:23 2025 +0300
IGNITE-25463 Fix flaky RocksDbFlusherTest (#5887)
---
.../internal/rocksdb/flush/RocksDbFlusher.java | 6 +--
.../internal/rocksdb/flush/RocksDbFlusherTest.java | 54 +++++++++++++---------
2 files changed, 36 insertions(+), 24 deletions(-)
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index a6ddae39b27..39e877feae7 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;
@@ -67,7 +67,7 @@ public class RocksDbFlusher {
private final ScheduledExecutorService scheduledPool;
/** Thread pool to execute flush and complete flush completion futures. */
- final ExecutorService threadPool;
+ final Executor threadPool;
/** Supplier of delay values to batch independent flush requests. */
private final IntSupplier delaySupplier;
@@ -122,7 +122,7 @@ public class RocksDbFlusher {
String name,
IgniteSpinBusyLock busyLock,
ScheduledExecutorService scheduledPool,
- ExecutorService threadPool,
+ Executor threadPool,
IntSupplier delaySupplier,
LogSyncer logSyncer,
FailureProcessor failureProcessor,
diff --git
a/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
b/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
index 1c67b090a25..74a00e9b0a1 100644
---
a/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
+++
b/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
@@ -21,16 +21,20 @@ import static java.util.concurrent.CompletableFuture.allOf;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.ignite.internal.components.NoOpLogSyncer;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.logging.log4j.LogManager;
@@ -41,12 +45,10 @@ import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
-@ExtendWith(ExecutorServiceExtension.class)
class RocksDbFlusherTest extends IgniteAbstractTest {
private RocksDbFlusher flusher;
@@ -54,21 +56,31 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
private RocksDB db;
- private final CompletableFuture<Throwable> failureProcessorError = new
CompletableFuture<>();
+ private final AtomicReference<Throwable> failureProcessorError = new
AtomicReference<>();
@BeforeEach
- void setUp(
- @InjectExecutorService ScheduledExecutorService scheduledExecutor,
- @InjectExecutorService ExecutorService executor
- ) throws RocksDBException {
+ void setUp() throws RocksDBException {
+ ScheduledExecutorService sameThreadExecutor =
mock(ScheduledExecutorService.class);
+
+ when(sameThreadExecutor.schedule(any(Callable.class), anyLong(),
any()))
+ .thenAnswer(invocation -> {
+ invocation.getArgument(0, Callable.class).call();
+
+ return null;
+ });
+
flusher = new RocksDbFlusher(
"RocksDbFlusherTest",
new IgniteSpinBusyLock(),
- scheduledExecutor,
- executor,
+ sameThreadExecutor,
+ Runnable::run,
() -> 0,
new NoOpLogSyncer(),
- failureCtx ->
failureProcessorError.completeExceptionally(failureCtx.error()),
+ failureCtx -> {
+ failureProcessorError.set(failureCtx.error());
+
+ return true;
+ },
() -> {}
);
@@ -85,8 +97,8 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
/**
* Sets a filter that removes warning messages produced by the flusher.
This is needed, because the CI server has been configured to
- * fail the build if these warnings are found in the logs. In this test we
intentionally create a situation where such warnings will
- * be produced.
+ * fail the build if these warnings are found in the logs. In this test we
intentionally create a situation where such warnings will be
+ * produced.
*/
private static void setUpLogFilter() {
var filter = new AbstractFilter() {
@@ -117,14 +129,12 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
@Test
void testFlushRetryOnWriteThrottling() {
CompletableFuture<?>[] flushFutures = IntStream.range(0, 200)
+ .parallel()
.mapToObj(ByteUtils::intToBytes)
.map(bytes -> {
try {
- // Wait a little bit, because otherwise batching still
works and less flushes will be issued.
- Thread.sleep(1);
-
db.put(bytes, bytes);
- } catch (RocksDBException | InterruptedException e) {
+ } catch (RocksDBException e) {
throw new AssertionError(e);
}
@@ -134,8 +144,10 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
assertThat(allOf(flushFutures), willCompleteSuccessfully());
- if (failureProcessorError.isCompletedExceptionally()) {
- failureProcessorError.join();
+ Throwable error = failureProcessorError.get();
+
+ if (error != null) {
+ fail(error);
}
}
}