This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e965b2e42dc IGNITE-25463 Fix flaky RocksDbFlusherTest (#5887)
e965b2e42dc is described below

commit e965b2e42dc36833508eb3605a2bd2f39ca4f227
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Fri May 23 15:33:23 2025 +0300

    IGNITE-25463 Fix flaky RocksDbFlusherTest (#5887)
---
 .../internal/rocksdb/flush/RocksDbFlusher.java     |  6 +--
 .../internal/rocksdb/flush/RocksDbFlusherTest.java | 54 +++++++++++++---------
 2 files changed, 36 insertions(+), 24 deletions(-)

diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index a6ddae39b27..39e877feae7 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.IntSupplier;
@@ -67,7 +67,7 @@ public class RocksDbFlusher {
     private final ScheduledExecutorService scheduledPool;
 
     /** Thread pool to execute flush and complete flush completion futures. */
-    final ExecutorService threadPool;
+    final Executor threadPool;
 
     /** Supplier of delay values to batch independent flush requests. */
     private final IntSupplier delaySupplier;
@@ -122,7 +122,7 @@ public class RocksDbFlusher {
             String name,
             IgniteSpinBusyLock busyLock,
             ScheduledExecutorService scheduledPool,
-            ExecutorService threadPool,
+            Executor threadPool,
             IntSupplier delaySupplier,
             LogSyncer logSyncer,
             FailureProcessor failureProcessor,
diff --git 
a/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
 
b/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
index 1c67b090a25..74a00e9b0a1 100644
--- 
a/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
+++ 
b/modules/rocksdb-common/src/test/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusherTest.java
@@ -21,16 +21,20 @@ import static java.util.concurrent.CompletableFuture.allOf;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.components.NoOpLogSyncer;
-import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
-import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.logging.log4j.LogManager;
@@ -41,12 +45,10 @@ import org.apache.logging.log4j.core.filter.AbstractFilter;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
-@ExtendWith(ExecutorServiceExtension.class)
 class RocksDbFlusherTest extends IgniteAbstractTest {
     private RocksDbFlusher flusher;
 
@@ -54,21 +56,31 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
 
     private RocksDB db;
 
-    private final CompletableFuture<Throwable> failureProcessorError = new 
CompletableFuture<>();
+    private final AtomicReference<Throwable> failureProcessorError = new 
AtomicReference<>();
 
     @BeforeEach
-    void setUp(
-            @InjectExecutorService ScheduledExecutorService scheduledExecutor,
-            @InjectExecutorService ExecutorService executor
-    ) throws RocksDBException {
+    void setUp() throws RocksDBException {
+        ScheduledExecutorService sameThreadExecutor = 
mock(ScheduledExecutorService.class);
+
+        when(sameThreadExecutor.schedule(any(Callable.class), anyLong(), 
any()))
+                .thenAnswer(invocation -> {
+                    invocation.getArgument(0, Callable.class).call();
+
+                    return null;
+                });
+
         flusher = new RocksDbFlusher(
                 "RocksDbFlusherTest",
                 new IgniteSpinBusyLock(),
-                scheduledExecutor,
-                executor,
+                sameThreadExecutor,
+                Runnable::run,
                 () -> 0,
                 new NoOpLogSyncer(),
-                failureCtx -> 
failureProcessorError.completeExceptionally(failureCtx.error()),
+                failureCtx -> {
+                    failureProcessorError.set(failureCtx.error());
+
+                    return true;
+                },
                 () -> {}
         );
 
@@ -85,8 +97,8 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
 
     /**
      * Sets a filter that removes warning messages produced by the flusher. 
This is needed, because the CI server has been configured to
-     * fail the build if these warnings are found in the logs. In this test we 
intentionally create a situation where such warnings will
-     * be produced.
+     * fail the build if these warnings are found in the logs. In this test we 
intentionally create a situation where such warnings will be
+     * produced.
      */
     private static void setUpLogFilter() {
         var filter = new AbstractFilter() {
@@ -117,14 +129,12 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
     @Test
     void testFlushRetryOnWriteThrottling() {
         CompletableFuture<?>[] flushFutures = IntStream.range(0, 200)
+                .parallel()
                 .mapToObj(ByteUtils::intToBytes)
                 .map(bytes -> {
                     try {
-                        // Wait a little bit, because otherwise batching still 
works and less flushes will be issued.
-                        Thread.sleep(1);
-
                         db.put(bytes, bytes);
-                    } catch (RocksDBException | InterruptedException e) {
+                    } catch (RocksDBException e) {
                         throw new AssertionError(e);
                     }
 
@@ -134,8 +144,10 @@ class RocksDbFlusherTest extends IgniteAbstractTest {
 
         assertThat(allOf(flushFutures), willCompleteSuccessfully());
 
-        if (failureProcessorError.isCompletedExceptionally()) {
-            failureProcessorError.join();
+        Throwable error = failureProcessorError.get();
+
+        if (error != null) {
+            fail(error);
         }
     }
 }

Reply via email to