This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 16a312dcf9 IGNITE-23615 Use shared threadpools in RocksDbFlusher for 
RocksDbKeyValueStorage (#4713)
16a312dcf9 is described below

commit 16a312dcf9a55ebb111bb4f120fb964749a35192
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 14 15:31:21 2024 +0300

    IGNITE-23615 Use shared threadpools in RocksDbFlusher for 
RocksDbKeyValueStorage (#4713)
---
 .../catalog/CatalogManagerRecoveryTest.java        |  15 +-
 .../metastore/DeploymentUnitStoreImplTest.java     |  15 +-
 modules/core/build.gradle                          |   1 +
 .../ExecutorServiceExtensionTest.java              | 238 +++++++++++++++++++
 .../testframework/ExecutorServiceExtension.java    | 257 +++++++++++++++++++++
 .../testframework/InjectExecutorService.java       |  66 ++++++
 ...niteDistributionZoneManagerNodeRestartTest.java |   9 +-
 .../IndexAvailabilityControllerRestorerTest.java   |  23 +-
 .../ignite/internal/index/IndexManagerTest.java    |  15 +-
 .../impl/ItIdempotentCommandCacheTest.java         |  15 +-
 .../impl/ItMetaStorageManagerImplTest.java         |  10 +-
 .../ItMetaStorageMultipleNodesRocksDbTest.java     |  16 +-
 ...tMetaStorageSafeTimePropagationRocksDbTest.java |  15 +-
 .../impl/ItMetaStorageServicePersistenceTest.java  |  11 +-
 .../metastorage/impl/ItMetaStorageWatchTest.java   |  10 +-
 .../server/persistence/RocksDbKeyValueStorage.java |  16 +-
 .../impl/MetaStorageRocksDbRangeTest.java          |  16 +-
 .../RocksDbCompactionKeyValueStorageTest.java      |  11 +-
 .../server/RocksDbKeyValueStorageTest.java         |  14 +-
 .../replicator/ItReplicaLifecycleTest.java         |  10 +-
 .../internal/rocksdb/flush/RocksDbFlusher.java     |   6 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  18 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../rebalance/ItRebalanceDistributedTest.java      |  10 +-
 .../distributed/TableManagerRecoveryTest.java      |  15 +-
 .../index/IndexMetaStorageRecoveryTest.java        |  15 +-
 26 files changed, 806 insertions(+), 44 deletions(-)

diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
index f429b6568e..2078ca0624 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
@@ -45,6 +45,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.catalog.CatalogTestUtils.TestUpdateHandlerInterceptor;
 import org.apache.ignite.internal.catalog.storage.SnapshotEntry;
@@ -59,6 +60,8 @@ import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.AfterEach;
@@ -68,12 +71,16 @@ import org.mockito.Mockito;
 
 /** For {@link CatalogManager} testing on recovery. */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class CatalogManagerRecoveryTest extends BaseIgniteAbstractTest {
     private static final String NODE_NAME = "test-node-name";
 
     @WorkDirectory
     private Path workDir;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     private final HybridClock clock = new HybridClockImpl();
 
     private MetaStorageManager metaStorageManager;
@@ -193,7 +200,13 @@ public class CatalogManagerRecoveryTest extends 
BaseIgniteAbstractTest {
     private void createComponents() {
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
-        var keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir, 
new NoOpFailureManager(), readOperationForCompactionTracker);
+        var keyValueStorage = new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
 
         metaStorageManager = 
spy(StandaloneMetaStorageManager.create(keyValueStorage, 
readOperationForCompactionTracker));
 
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index cce084e764..042c9e22ba 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.deployment.version.Version;
 import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
 import 
org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener;
@@ -51,6 +52,8 @@ import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.AfterEach;
@@ -62,6 +65,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
  * Test suite for {@link DeploymentUnitStoreImpl}.
  */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest {
     private static final String LOCAL_NODE = "localNode";
 
@@ -90,6 +94,9 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
     @WorkDirectory
     private Path workDir;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     @BeforeEach
     public void setup() {
         nodeHistory.clear();
@@ -97,7 +104,13 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
 
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
-        var storage = new RocksDbKeyValueStorage(LOCAL_NODE, workDir, new 
NoOpFailureManager(), readOperationForCompactionTracker);
+        var storage = new RocksDbKeyValueStorage(
+                LOCAL_NODE,
+                workDir,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
 
         MetaStorageManager metaStorageManager = 
StandaloneMetaStorageManager.create(storage, readOperationForCompactionTracker);
         metastore = new DeploymentUnitStoreImpl(metaStorageManager);
diff --git a/modules/core/build.gradle b/modules/core/build.gradle
index a22ac653ec..4b7d93bdc1 100644
--- a/modules/core/build.gradle
+++ b/modules/core/build.gradle
@@ -56,5 +56,6 @@ test {
         //Don't run parametrized tests from inner static classes which should 
not run.
         excludeTestsMatching "*VariableSourceTest\$*"
         excludeTestsMatching "*WorkDirectoryExtensionTest\$*"
+        excludeTestsMatching "*ExecutorServiceExtensionTest\$*"
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
new file mode 100644
index 0000000000..249127816c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.ignite.internal.testframework.JunitExtensionTestUtils.assertExecutesSuccessfully;
+import static 
org.apache.ignite.internal.testframework.JunitExtensionTestUtils.assertExecutesWithFailure;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static 
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static 
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS;
+import static org.apache.ignite.internal.thread.ThreadOperation.WAIT;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+import static 
org.junit.platform.testkit.engine.TestExecutionResultConditions.instanceOf;
+import static 
org.junit.platform.testkit.engine.TestExecutionResultConditions.message;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.ThreadOperation;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** For {@link ExecutorServiceExtension} testing. */
+public class ExecutorServiceExtensionTest {
+    private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+    @ExtendWith(ExecutorServiceExtension.class)
+    static class NormalFieldInjectionTest {
+        private static final String DEFAULT_THREAD_PREFIX_FORMAT = 
"test-NormalFieldInjectionTest-%s";
+
+        private static final String DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT = 
"test-NormalFieldInjectionTest-%s-%s";
+
+        @InjectExecutorService
+        private static ExecutorService staticExecutorServiceWithDefaults;
+
+        @InjectExecutorService(threadCount = 2, threadPrefix = 
"test-foo-static-executor", allowedOperations = TX_STATE_STORAGE_ACCESS)
+        private static ExecutorService staticExecutorService;
+
+        @InjectExecutorService
+        private static ScheduledExecutorService 
staticScheduledExecutorServiceWithDefaults;
+
+        @InjectExecutorService(
+                threadCount = 3,
+                threadPrefix = "test-bar-static-executor",
+                allowedOperations = {STORAGE_READ, STORAGE_WRITE}
+        )
+        private static ScheduledExecutorService staticScheduledExecutorService;
+
+        @InjectExecutorService
+        private ExecutorService instanceExecutorServiceWithDefaults;
+
+        @InjectExecutorService(threadCount = 4, threadPrefix = 
"test-foo-instance-executor", allowedOperations = STORAGE_WRITE)
+        private ExecutorService instanceExecutorService;
+
+        @InjectExecutorService
+        private ScheduledExecutorService 
instanceScheduledExecutorServiceWithDefaults;
+
+        @InjectExecutorService(threadCount = 5, threadPrefix = 
"test-bar-instance-executor", allowedOperations = PROCESS_RAFT_REQ)
+        private ScheduledExecutorService instanceScheduledExecutorService;
+
+        @BeforeAll
+        static void beforeAll(
+                @InjectExecutorService
+                ExecutorService staticParameterExecutorServiceWithDefaults,
+                @InjectExecutorService(threadCount = 6, threadPrefix = 
"test-foo-static-param-executor", allowedOperations = WAIT)
+                ExecutorService staticParameterExecutorService,
+                @InjectExecutorService
+                ScheduledExecutorService 
staticParameterScheduledExecutorServiceWithDefaults,
+                @InjectExecutorService(threadCount = 7, threadPrefix = 
"test-bar-static-param-executor", allowedOperations = WAIT)
+                ScheduledExecutorService 
staticParameterScheduledExecutorService
+        ) {
+            checkExecutorService(
+                    staticParameterExecutorServiceWithDefaults,
+                    CPUS,
+                    String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT, 
"beforeAll", "arg0")
+            );
+            checkScheduledExecutorService(
+                    staticParameterScheduledExecutorServiceWithDefaults,
+                    1,
+                    String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT, 
"beforeAll", "arg2")
+            );
+
+            checkExecutorService(staticParameterExecutorService, 6, 
"test-foo-static-param-executor", WAIT);
+            
checkScheduledExecutorService(staticParameterScheduledExecutorService, 7, 
"test-bar-static-param-executor", WAIT);
+        }
+
+        @Test
+        void test(
+                @InjectExecutorService
+                ExecutorService parameterExecutorServiceWithDefaults,
+                @InjectExecutorService(threadCount = 8, threadPrefix = 
"test-foo-param-executor", allowedOperations = WAIT)
+                ExecutorService parameterExecutorService,
+                @InjectExecutorService
+                ScheduledExecutorService 
parameterScheduledExecutorServiceWithDefaults,
+                @InjectExecutorService(threadCount = 9, threadPrefix = 
"test-bar-param-executor", allowedOperations = WAIT)
+                ScheduledExecutorService parameterScheduledExecutorService
+        ) {
+            checkExecutorService(
+                    staticExecutorServiceWithDefaults,
+                    CPUS,
+                    String.format(DEFAULT_THREAD_PREFIX_FORMAT, 
"staticExecutorServiceWithDefaults")
+            );
+            checkScheduledExecutorService(
+                    staticScheduledExecutorServiceWithDefaults,
+                    1,
+                    String.format(DEFAULT_THREAD_PREFIX_FORMAT, 
"staticScheduledExecutorServiceWithDefaults")
+            );
+            checkExecutorService(
+                    instanceExecutorServiceWithDefaults,
+                    CPUS,
+                    String.format(DEFAULT_THREAD_PREFIX_FORMAT, 
"instanceExecutorServiceWithDefaults")
+            );
+            checkScheduledExecutorService(
+                    instanceScheduledExecutorServiceWithDefaults,
+                    1,
+                    String.format(DEFAULT_THREAD_PREFIX_FORMAT, 
"instanceScheduledExecutorServiceWithDefaults")
+            );
+            checkExecutorService(
+                    parameterExecutorServiceWithDefaults,
+                    CPUS,
+                    String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT, 
"test", "arg0")
+            );
+            checkScheduledExecutorService(
+                    parameterScheduledExecutorServiceWithDefaults,
+                    1,
+                    String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT, 
"test", "arg2")
+            );
+
+            checkExecutorService(staticExecutorService, 2, 
"test-foo-static-executor", TX_STATE_STORAGE_ACCESS);
+            checkScheduledExecutorService(staticScheduledExecutorService, 3, 
"test-bar-static-executor", STORAGE_READ, STORAGE_WRITE);
+            checkExecutorService(instanceExecutorService, 4, 
"test-foo-instance-executor", STORAGE_WRITE);
+            checkScheduledExecutorService(instanceScheduledExecutorService, 5, 
"test-bar-instance-executor", PROCESS_RAFT_REQ);
+            checkExecutorService(parameterExecutorService, 8, 
"test-foo-param-executor", WAIT);
+            checkScheduledExecutorService(parameterScheduledExecutorService, 
9, "test-bar-param-executor", WAIT);
+        }
+    }
+
+    @ExtendWith(ExecutorServiceExtension.class)
+    static class ErrorFieldInjectionTest {
+        @InjectExecutorService
+        private static Integer staticWrongType;
+
+        @InjectExecutorService
+        private String instanceWrongType;
+
+        @Test
+        public void test(
+                @InjectExecutorService
+                Boolean parameterWrongType
+        ) {
+            fail("Should not reach here");
+        }
+    }
+
+    @Test
+    void testFieldInjection() {
+        assertExecutesSuccessfully(NormalFieldInjectionTest.class);
+    }
+
+    @Test
+    void testWrongTypeInjection() {
+        assertExecutesWithFailure(
+                ErrorFieldInjectionTest.class,
+                instanceOf(IllegalStateException.class),
+                message(m -> m.contains("Unsupported field type"))
+        );
+    }
+
+    private static void checkExecutorService(
+            ExecutorService service,
+            int expCorePoolSize,
+            String expThreadPrefix,
+            ThreadOperation... expThreadOperations
+    ) {
+        assertThat(service, instanceOf(ThreadPoolExecutor.class));
+
+        checkThreadPoolExecutor((ThreadPoolExecutor) service, expCorePoolSize, 
expThreadPrefix, expThreadOperations);
+    }
+
+    private static void checkScheduledExecutorService(
+            ScheduledExecutorService service,
+            int expCorePoolSize,
+            String expThreadPrefix,
+            ThreadOperation... expThreadOperations
+    ) {
+        assertThat(service, instanceOf(ScheduledThreadPoolExecutor.class));
+
+        checkThreadPoolExecutor((ScheduledThreadPoolExecutor) service, 
expCorePoolSize, expThreadPrefix, expThreadOperations);
+    }
+
+    private static void checkThreadPoolExecutor(
+            ThreadPoolExecutor executor,
+            int expCorePoolSize,
+            String expThreadPrefix,
+            ThreadOperation... expThreadOperations
+    ) {
+        assertEquals(executor.getCorePoolSize(), expCorePoolSize);
+
+        assertThat(
+                runAsync(() -> {
+                    Thread thread = Thread.currentThread();
+
+                    assertThat(thread, instanceOf(IgniteThread.class));
+
+                    IgniteThread thread1 = (IgniteThread) thread;
+
+                    assertThat(thread1.getName(), startsWith(expThreadPrefix));
+                    assertThat(thread1.allowedOperations(), 
containsInAnyOrder(expThreadOperations));
+                }, executor),
+                willCompleteSuccessfully()
+        );
+    }
+}
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
new file mode 100644
index 0000000000..6963a57519
--- /dev/null
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import static java.lang.reflect.Modifier.isStatic;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.ThreadOperation;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.platform.commons.support.AnnotationSupport;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+
+/**
+ * JUnit extension for injecting temporary {@link ExecutorService}'s into test 
classes.
+ *
+ * @see InjectExecutorService
+ */
+public class ExecutorServiceExtension implements BeforeAllCallback, 
AfterAllCallback, BeforeEachCallback, AfterEachCallback,
+        ParameterResolver {
+    private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+    private static final Namespace NAMESPACE = 
Namespace.create(ExecutorServiceExtension.class);
+
+    private static final Set<Class<?>> SUPPORTED_FIELD_TYPES = 
Set.of(ScheduledExecutorService.class, ExecutorService.class);
+
+    private static final Object STATIC_EXECUTORS_KEY = new Object();
+
+    private static final Object INSTANCE_EXECUTORS_KEY = new Object();
+
+    @Override
+    public void beforeAll(ExtensionContext context) throws Exception {
+        injectFields(context, true);
+    }
+
+    @Override
+    public void afterAll(ExtensionContext context) throws Exception {
+        shutdownExecutors(context, true);
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        injectFields(context, false);
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) throws Exception {
+        shutdownExecutors(context, false);
+    }
+
+    @Override
+    public boolean supportsParameter(
+            ParameterContext parameterContext,
+            ExtensionContext extensionContext
+    ) throws ParameterResolutionException {
+        return parameterContext.isAnnotated(InjectExecutorService.class)
+                && 
isFieldTypeSupported(parameterContext.getParameter().getType());
+    }
+
+    @Override
+    public Object resolveParameter(
+            ParameterContext parameterContext,
+            ExtensionContext extensionContext
+    ) throws ParameterResolutionException {
+        if (!supportsParameter(parameterContext, extensionContext)) {
+            throw new ParameterResolutionException("Unknown parameter:" + 
parameterContext.getParameter());
+        }
+
+        boolean forStatic = 
isStatic(parameterContext.getParameter().getDeclaringExecutable().getModifiers());
+
+        List<ExecutorService> executorServices = 
getOrCreateExecutorServiceListInStore(extensionContext, forStatic);
+
+        InjectExecutorService injectExecutorService = 
parameterContext.findAnnotation(InjectExecutorService.class).orElse(null);
+
+        assert injectExecutorService != null : parameterContext.getParameter();
+
+        ExecutorService executorService = createExecutorService(
+                injectExecutorService,
+                parameterContext.getParameter().getName(),
+                parameterContext.getParameter().getType(),
+                extensionContext.getRequiredTestClass(),
+                
parameterContext.getParameter().getDeclaringExecutable().getName()
+        );
+
+        executorServices.add(executorService);
+
+        return executorService;
+    }
+
+    private static void injectFields(ExtensionContext context, boolean 
forStatic) throws Exception {
+        Class<?> testClass = context.getRequiredTestClass();
+        Object testInstance = context.getTestInstance().orElse(null);
+
+        List<Field> fields = collectFields(testClass, forStatic);
+
+        if (fields.isEmpty()) {
+            return;
+        }
+
+        List<ExecutorService> executorServices = 
getOrCreateExecutorServiceListInStore(context, forStatic);
+
+        for (Field field : fields) {
+            checkFieldTypeIsSupported(field);
+
+            ExecutorService executorService = createExecutorService(field);
+
+            executorServices.add(executorService);
+
+            field.setAccessible(true);
+
+            field.set(forStatic ? null : testInstance, executorService);
+        }
+    }
+
+    private static void shutdownExecutors(ExtensionContext context, boolean 
forStatic) throws Exception {
+        List<ExecutorService> removed = (List<ExecutorService>) 
context.getStore(NAMESPACE).remove(storeKey(forStatic));
+
+        if (removed == null || removed.isEmpty()) {
+            return;
+        }
+
+        Stream<AutoCloseable> autoCloseableStream = removed.stream()
+                .map(executorService -> () -> 
IgniteUtils.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS));
+
+        IgniteUtils.closeAll(autoCloseableStream);
+    }
+
+    private static List<Field> collectFields(Class<?> testClass, boolean 
forStatic) {
+        return AnnotationSupport.findAnnotatedFields(
+                testClass,
+                InjectExecutorService.class,
+                field -> isStatic(field.getModifiers()) == forStatic,
+                HierarchyTraversalMode.TOP_DOWN
+        );
+    }
+
+    private static void checkFieldTypeIsSupported(Field field) {
+        if (!isFieldTypeSupported(field.getType())) {
+            throw new IllegalStateException(
+                    String.format("Unsupported field type: [field=%s, 
supportedFieldTypes=%s]", field, SUPPORTED_FIELD_TYPES)
+            );
+        }
+    }
+
+    private static boolean isFieldTypeSupported(Class<?> fieldType) {
+        for (Class<?> supportedFieldType : SUPPORTED_FIELD_TYPES) {
+            if (fieldType.equals(supportedFieldType)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static ExecutorService createExecutorService(Field field) {
+        InjectExecutorService injectExecutorService = 
field.getAnnotation(InjectExecutorService.class);
+
+        assert injectExecutorService != null : field;
+
+        return createExecutorService(injectExecutorService, field.getName(), 
field.getType(), field.getDeclaringClass(), null);
+    }
+
+    private static ExecutorService createExecutorService(
+            InjectExecutorService injectExecutorService,
+            String fieldName,
+            Class<?> fieldType,
+            Class<?> testClass,
+            @Nullable String methodName
+    ) {
+        int threadCount = injectExecutorService.threadCount();
+        String threadPrefix = threadPrefix(injectExecutorService, testClass, 
fieldName, methodName);
+        ThreadOperation[] allowedOperations = 
injectExecutorService.allowedOperations();
+
+        ThreadFactory threadFactory = 
IgniteThreadFactory.withPrefix(threadPrefix, Loggers.forClass(testClass), 
allowedOperations);
+
+        if (fieldType.equals(ScheduledExecutorService.class)) {
+            return newScheduledThreadPool(threadCount == 0 ? 1 : threadCount, 
threadFactory);
+        } else if (fieldType.equals(ExecutorService.class)) {
+            return newFixedThreadPool(threadCount == 0 ? CPUS : threadCount, 
threadFactory);
+        }
+
+        throw new AssertionError(
+                String.format("Unsupported field type: [field=%s, 
supportedFieldTypes=%s]", fieldName, SUPPORTED_FIELD_TYPES)
+        );
+    }
+
+    private static String threadPrefix(
+            InjectExecutorService injectExecutorService,
+            Class<?> testClass,
+            String fieldName,
+            @Nullable String methodName
+    ) {
+        String threadPrefix = injectExecutorService.threadPrefix();
+
+        if (threadPrefix != null && !"".equals(threadPrefix)) {
+            return threadPrefix;
+        }
+
+        if (methodName == null) {
+            return String.format("test-%s-%s", testClass.getSimpleName(), 
fieldName);
+        }
+
+        return String.format("test-%s-%s-%s", testClass.getSimpleName(), 
methodName, fieldName);
+    }
+
+    private static Object storeKey(boolean forStatic) {
+        return forStatic ? STATIC_EXECUTORS_KEY : INSTANCE_EXECUTORS_KEY;
+    }
+
+    private static List<ExecutorService> 
getOrCreateExecutorServiceListInStore(ExtensionContext context, boolean 
forStatic) {
+        List<ExecutorService> executorServices = (List<ExecutorService>) 
context.getStore(NAMESPACE).get(storeKey(forStatic));
+
+        if (executorServices == null) {
+            executorServices = new CopyOnWriteArrayList<>();
+
+            context.getStore(NAMESPACE).put(storeKey(forStatic), 
executorServices);
+        }
+
+        return executorServices;
+    }
+}
diff --git 
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/InjectExecutorService.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/InjectExecutorService.java
new file mode 100644
index 0000000000..25c563fadc
--- /dev/null
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/InjectExecutorService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.thread.ThreadOperation;
+
+/**
+ * Annotation for injecting {@link ExecutorService} instances into tests.
+ *
+ * <p>Executor is created based on the type of field into which it will need 
to be injected.</p>
+ *
+ * <p>For each field a new executor will be created and will have a fixed 
number of threads.</p>
+ *
+ * <p>Supported field types:</p>
+ * <ul>
+ *     <li>{@link ExecutorService}.</li>
+ *     <li>{@link ScheduledExecutorService}.</li>
+ * </ul>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.PARAMETER})
+public @interface InjectExecutorService {
+    /**
+     * Number of threads in the executor.
+     *
+     * <p>By default it will depend on the type of executor:</p>
+     * <ul>
+     *     <li>{@link ScheduledExecutorService} - 1 thread.</li>
+     *     <li>{@link ExecutorService} - {@link 
Runtime#availableProcessors()}.</li>
+     * </ul>
+     */
+    int threadCount() default 0;
+
+    /**
+     * Prefix of thread names in the executor.
+     *
+     * <p>By default the prefix will be in the format 
"test-class_name-field_name", for example "test-FooTest-commonExecutor" for 
class
+     * fields and in the format "test-class_name-method_name-param_name", for 
example "test-FooTest-beforeAll-commonExecutor" for class
+     * methods.</p>
+     */
+    String threadPrefix() default "";
+
+    /** Operations that are allowed to be executed on threads. By default, 
nothing is available. */
+    ThreadOperation[] allowedOperations() default {};
+}
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index fb186cd012..518b054875 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -123,6 +123,8 @@ import 
org.apache.ignite.internal.network.configuration.NetworkExtensionConfigur
 import org.apache.ignite.internal.network.recovery.VaultStaleIds;
 import 
org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceFactory;
 import 
org.apache.ignite.internal.security.authentication.validator.AuthenticationProvidersValidatorImpl;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -140,6 +142,7 @@ import org.junit.jupiter.params.provider.ValueSource;
  * Tests for checking {@link DistributionZoneManager} behavior after node's 
restart.
  */
 @ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class ItIgniteDistributionZoneManagerNodeRestartTest extends 
BaseIgniteRestartTest {
     private static final LogicalNode A = new LogicalNode(
             new ClusterNodeImpl(randomUUID(), "A", new 
NetworkAddress("localhost", 123)),
@@ -172,6 +175,9 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
 
     private volatile boolean startGlobalStateUpdateBlocking;
 
+    @InjectExecutorService
+    private ScheduledExecutorService commonScheduledExecutorService;
+
     /**
      * Start some of Ignite components that are able to serve as Ignite node 
for test purposes.
      *
@@ -245,7 +251,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest 
extends BaseIgniteRe
                 name,
                 workDir.resolve("metastorage"),
                 new NoOpFailureManager(),
-                readOperationForCompactionTracker
+                readOperationForCompactionTracker,
+                commonScheduledExecutorService
         );
 
         var clock = new HybridClockImpl();
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
index eb989f1e08..87ef5f4452 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
@@ -49,6 +49,7 @@ import static org.mockito.Mockito.when;
 
 import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -63,6 +64,8 @@ import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.TopologyService;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.network.ClusterNode;
@@ -73,10 +76,14 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 /** For {@link IndexAvailabilityController} testing on node recovery. */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class IndexAvailabilityControllerRestorerTest extends 
BaseIgniteAbstractTest {
     @WorkDirectory
     private Path workDir;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     private final HybridClock clock = new HybridClockImpl();
 
     private final ClusterService clusterService = mock(ClusterService.class);
@@ -93,7 +100,13 @@ public class IndexAvailabilityControllerRestorerTest 
extends BaseIgniteAbstractT
     void setUp() throws Exception {
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
-        keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new 
NoOpFailureManager(), readOperationForCompactionTracker);
+        keyValueStorage = new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
 
         metaStorageManager = 
StandaloneMetaStorageManager.create(keyValueStorage, clock, 
readOperationForCompactionTracker);
 
@@ -220,7 +233,13 @@ public class IndexAvailabilityControllerRestorerTest 
extends BaseIgniteAbstractT
 
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
-        keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new 
NoOpFailureManager(), readOperationForCompactionTracker);
+        keyValueStorage = new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
 
         metaStorageManager = 
StandaloneMetaStorageManager.create(keyValueStorage, clock, 
readOperationForCompactionTracker);
 
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index e51cfc618d..6bc1d18c71 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -49,6 +49,7 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -76,6 +77,8 @@ import 
org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.tx.LockManager;
@@ -89,12 +92,16 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 /** Test class to verify {@link IndexManager}. */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class IndexManagerTest extends BaseIgniteAbstractTest {
     private final HybridClock clock = new HybridClockImpl();
 
     @WorkDirectory
     private Path workDir;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     private TableManager mockTableManager;
 
     private SchemaManager mockSchemaManager;
@@ -229,7 +236,13 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
     private void createAndStartComponents() {
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
-        var storage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new 
NoOpFailureManager(), readOperationForCompactionTracker);
+        var storage = new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
 
         metaStorageManager = StandaloneMetaStorageManager.create(storage, 
clock, readOperationForCompactionTracker);
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 07de8f9a13..6134ef19b2 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiPredicate;
@@ -96,7 +97,9 @@ import 
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.rpc.ActionResponse;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
@@ -113,6 +116,7 @@ import org.junit.jupiter.params.provider.MethodSource;
  * Integration tests for idempotency of {@link 
org.apache.ignite.internal.metastorage.command.IdempotentCommand}.
  */
 @ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class ItIdempotentCommandCacheTest extends IgniteAbstractTest {
     private static final MetaStorageCommandsFactory CMD_FACTORY = new 
MetaStorageCommandsFactory();
 
@@ -135,6 +139,9 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
     @InjectConfiguration("mock.idleSyncTimeInterval = 100")
     private MetaStorageConfiguration metaStorageConfiguration;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     private List<Node> nodes;
 
     private static class Node implements AutoCloseable {
@@ -161,7 +168,8 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
                 RaftConfiguration raftConfiguration,
                 MetaStorageConfiguration metaStorageConfiguration,
                 Path workDir,
-                int index
+                int index,
+                ScheduledExecutorService scheduledExecutorService
         ) {
             List<NetworkAddress> addrs = new ArrayList<>();
 
@@ -214,7 +222,8 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
                     clusterService.nodeName(),
                     metastorageWorkDir.dbPath(),
                     new NoOpFailureManager(),
-                    readOperationForCompactionTracker
+                    readOperationForCompactionTracker,
+                    scheduledExecutorService
             ));
 
             metaStorageManager = new MetaStorageManagerImpl(
@@ -543,7 +552,7 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
         nodes = new ArrayList<>();
 
         for (int i = 0; i < NODES_COUNT; i++) {
-            Node node = new Node(testInfo, raftConfiguration, 
metaStorageConfiguration, workDir, i);
+            Node node = new Node(testInfo, raftConfiguration, 
metaStorageConfiguration, workDir, i, scheduledExecutorService);
             nodes.add(node);
         }
 
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 842e4ab47f..670f5c8b00 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -93,7 +94,9 @@ import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFacto
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.rpc.ReadActionRequest;
@@ -113,6 +116,7 @@ import org.mockito.ArgumentCaptor;
  * Integration tests for {@link MetaStorageManagerImpl}.
  */
 @ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
     private static final ByteArray FOO_KEY = new ByteArray("foo");
 
@@ -133,6 +137,9 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private RaftConfiguration raftConfiguration;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     private final ReadOperationForCompactionTracker 
readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
 
     @BeforeEach
@@ -185,7 +192,8 @@ public class ItMetaStorageManagerImplTest extends 
IgniteAbstractTest {
                 clusterService.nodeName(),
                 metastorageWorkDir.dbPath(),
                 new NoOpFailureManager(),
-                readOperationForCompactionTracker
+                readOperationForCompactionTracker,
+                scheduledExecutorService
         );
 
         metaStorageManager = new MetaStorageManagerImpl(
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
index 4695862dff..682f103f2a 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
@@ -18,15 +18,29 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /** {@link ItMetaStorageMultipleNodesVsStorageTest} with {@link 
RocksDbKeyValueStorage} implementation. */
+@ExtendWith(ExecutorServiceExtension.class)
 public class ItMetaStorageMultipleNodesRocksDbTest extends 
ItMetaStorageMultipleNodesVsStorageTest {
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     @Override
     public KeyValueStorage createStorage(String nodeName, Path path, 
ReadOperationForCompactionTracker readOperationForCompactionTracker) {
-        return new RocksDbKeyValueStorage(nodeName, path.resolve("ms"), new 
NoOpFailureManager(), readOperationForCompactionTracker);
+        return new RocksDbKeyValueStorage(
+                nodeName,
+                path.resolve("ms"),
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
     }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
index f7ef2eb21d..35ed3342f3 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
@@ -18,22 +18,35 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /** {@link ItMetaStorageSafeTimePropagationAbstractTest} with {@link 
RocksDbKeyValueStorage} implementation. */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class ItMetaStorageSafeTimePropagationRocksDbTest extends 
ItMetaStorageSafeTimePropagationAbstractTest {
     @WorkDirectory
     private Path workDir;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     @Override
     public KeyValueStorage createStorage() {
-        return new RocksDbKeyValueStorage(NODE_NAME, workDir, new 
NoOpFailureManager(), new ReadOperationForCompactionTracker());
+        return new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir,
+                new NoOpFailureManager(),
+                new ReadOperationForCompactionTracker(),
+                scheduledExecutorService
+        );
     }
 }
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 4c0784af4e..8655e87344 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -47,15 +48,19 @@ import 
org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.TestReplicationGroupId;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Persistent (rocksdb-based) meta storage raft group snapshots tests.
  */
+@ExtendWith(ExecutorServiceExtension.class)
 public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnapshotTest<MetaStorageListener> {
     private static final ByteArray FIRST_KEY = ByteArray.fromString("first");
 
@@ -65,6 +70,9 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
 
     private static final byte[] SECOND_VALUE = 
"secondValue".getBytes(StandardCharsets.UTF_8);
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     private MetaStorageServiceImpl metaStorage;
 
     private final Map<String, RocksDbKeyValueStorage> storageByName = new 
HashMap<>();
@@ -154,7 +162,8 @@ public class ItMetaStorageServicePersistenceTest extends 
ItAbstractListenerSnaps
                     name,
                     listenerPersistencePath,
                     new NoOpFailureManager(),
-                    new ReadOperationForCompactionTracker()
+                    new ReadOperationForCompactionTracker(),
+                    scheduledExecutorService
             );
 
             s.start();
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index d3f01b1949..9c9009ee7a 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -45,6 +45,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.stream.Stream;
@@ -92,7 +93,9 @@ import 
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
@@ -108,6 +111,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
  * Tests for Meta Storage Watches.
  */
 @ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class ItMetaStorageWatchTest extends IgniteAbstractTest {
 
     @InjectConfiguration
@@ -119,6 +123,9 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private static MetaStorageConfiguration metaStorageConfiguration;
 
+    @InjectExecutorService
+    private static ScheduledExecutorService scheduledExecutorService;
+
     private static class Node {
         private final List<IgniteComponent> components = new ArrayList<>();
 
@@ -229,7 +236,8 @@ public class ItMetaStorageWatchTest extends 
IgniteAbstractTest {
                     name(),
                     metastorageWorkDir.dbPath(),
                     new NoOpFailureManager(),
-                    readOperationForCompactionTracker
+                    readOperationForCompactionTracker,
+                    scheduledExecutorService
             );
 
             this.metaStorageManager = new MetaStorageManagerImpl(
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 810e8c5401..4e26de9ae7 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -191,7 +191,9 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
     /** Executor for storage operations. */
     private final ExecutorService executor;
 
-    /** Scheduled executor for storage operations. */
+    /**
+     * Scheduled executor. Needed only for asynchronous start of scheduled 
operations without performing blocking, long or IO operations.
+     */
     private final ScheduledExecutorService scheduledExecutor;
 
     /** Path to the rocksdb database. */
@@ -285,12 +287,15 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
      * @param dbPath RocksDB path.
      * @param failureManager Failure processor that is used to handle critical 
errors.
      * @param readOperationForCompactionTracker Read operation tracker for 
metastorage compaction.
+     * @param scheduledExecutor Scheduled executor. Needed only for 
asynchronous start of scheduled operations without performing blocking,
+     *      long or IO operations.
      */
     public RocksDbKeyValueStorage(
             String nodeName,
             Path dbPath,
             FailureManager failureManager,
-            ReadOperationForCompactionTracker readOperationForCompactionTracker
+            ReadOperationForCompactionTracker 
readOperationForCompactionTracker,
+            ScheduledExecutorService scheduledExecutor
     ) {
         super(
                 nodeName,
@@ -299,16 +304,12 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         );
 
         this.dbPath = dbPath;
+        this.scheduledExecutor = scheduledExecutor;
 
         executor = Executors.newFixedThreadPool(
                 2,
                 NamedThreadFactory.create(nodeName, 
"metastorage-rocksdb-kv-storage-executor", log)
         );
-
-        // TODO: IGNITE-23615 Use a common pool, e.g. 
ThreadPoolsManager#commonScheduler
-        scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
-                NamedThreadFactory.create(nodeName, 
"metastorage-rocksdb-kv-storage-scheduler", log)
-        );
     }
 
     @Override
@@ -497,7 +498,6 @@ public class RocksDbKeyValueStorage extends 
AbstractKeyValueStorage {
         flusher.stop();
 
         IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
-        IgniteUtils.shutdownAndAwaitTermination(scheduledExecutor, 10, 
TimeUnit.SECONDS);
 
         rwLock.writeLock().lock();
         try {
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
index 0251dc8b86..651a690492 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
@@ -18,16 +18,30 @@
 package org.apache.ignite.internal.metastorage.impl;
 
 import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * {@link MetaStorageRangeTest} implementation using {@link 
RocksDbKeyValueStorage}.
  */
+@ExtendWith(ExecutorServiceExtension.class)
 public class MetaStorageRocksDbRangeTest extends MetaStorageRangeTest {
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     @Override
     KeyValueStorage getStorage(Path path) {
-        return new RocksDbKeyValueStorage("test", path, new 
NoOpFailureManager(), readOperationForCompactionTracker);
+        return new RocksDbKeyValueStorage(
+                "test",
+                path,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
     }
 }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
index e542469d91..442bfb2225 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
@@ -20,20 +20,29 @@ package org.apache.ignite.internal.metastorage.server;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /** Compaction test for the RocksDB implementation of {@link KeyValueStorage}. 
*/
+@ExtendWith(ExecutorServiceExtension.class)
 public class RocksDbCompactionKeyValueStorageTest extends 
AbstractCompactionKeyValueStorageTest {
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     @Override
     public KeyValueStorage createStorage() {
         return new RocksDbKeyValueStorage(
                 NODE_NAME,
                 workDir.resolve("storage"),
                 new NoOpFailureManager(),
-                readOperationForCompactionTracker
+                readOperationForCompactionTracker,
+                scheduledExecutorService
         );
     }
 
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 0f0d7b5320..a92962abb4 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.zip.Checksum;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.ByteArray;
@@ -41,20 +42,28 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
 import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.raft.jraft.util.CRC64;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * Tests for RocksDB key-value storage implementation.
  */
+@ExtendWith(ExecutorServiceExtension.class)
 public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTest {
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     @Override
     public KeyValueStorage createStorage() {
         return new RocksDbKeyValueStorage(
                 NODE_NAME,
                 workDir.resolve("storage"),
                 new NoOpFailureManager(),
-                new ReadOperationForCompactionTracker()
+                new ReadOperationForCompactionTracker(),
+                scheduledExecutorService
         );
     }
 
@@ -82,7 +91,8 @@ public class RocksDbKeyValueStorageTest extends 
BasicOperationsKeyValueStorageTe
                 NODE_NAME,
                 workDir.resolve("storage"),
                 new NoOpFailureManager(),
-                new ReadOperationForCompactionTracker()
+                new ReadOperationForCompactionTracker(),
+                scheduledExecutorService
         );
 
         storage.start();
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index cdc051c283..f50c9b7a58 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -190,6 +190,8 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -229,7 +231,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 /**
  * Replica lifecycle test.
  */
-@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class, 
ExecutorServiceExtension.class})
 @Timeout(60)
 // TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test 
after the switching to zone-based replication
 @Disabled("https://issues.apache.org/jira/browse/IGNITE-23252";)
@@ -293,6 +295,9 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
     private static String featureFlagOldValue = 
System.getProperty(FEATURE_FLAG_NAME);
 
+    @InjectExecutorService
+    private static ScheduledExecutorService scheduledExecutorService;
+
     @BeforeAll
     static void beforeAll() {
         System.setProperty(FEATURE_FLAG_NAME, "true");
@@ -1087,7 +1092,8 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     name,
                     resolveDir(dir, "metaStorageTestKeyValue"),
                     failureManager,
-                    readOperationForCompactionTracker
+                    readOperationForCompactionTracker,
+                    scheduledExecutorService
             );
 
             var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index e4a1b8c4d2..05e211aa7c 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -62,7 +62,7 @@ public class RocksDbFlusher {
     /** Scheduled pool to schedule flushes. */
     private final ScheduledExecutorService scheduledPool;
 
-    /** Thread pool to complete flush completion futures. */
+    /** Thread pool to execute flush and complete flush completion futures. */
     final ExecutorService threadPool;
 
     /** Supplier of delay values to batch independent flush requests. */
@@ -104,7 +104,7 @@ public class RocksDbFlusher {
      * @param name RocksDB instance name, for logging purposes.
      * @param busyLock Busy lock.
      * @param scheduledPool Scheduled pool the schedule flushes.
-     * @param threadPool Thread pool to run flush completion closure, provided 
by {@code onFlushCompleted} parameter.
+     * @param threadPool Thread pool to execute flush and to run flush 
completion closure, provided by {@code onFlushCompleted} parameter.
      * @param delaySupplier Supplier of delay values to batch independent 
flush requests. When {@link #awaitFlush(boolean)} is called with
      *      {@code true} parameter, the flusher waits given number of 
milliseconds (using {@code scheduledPool}) and then executes flush
      *      only if there were no other {@code awaitFlush(true)} calls. 
Otherwise, it does nothing after the timeout. This guarantees that
@@ -240,7 +240,7 @@ public class RocksDbFlusher {
 
         latestFlushClosure = newClosure;
 
-        scheduledPool.schedule(newClosure, delaySupplier.getAsInt(), 
TimeUnit.MILLISECONDS);
+        scheduledPool.schedule(() -> runAsync(newClosure, threadPool), 
delaySupplier.getAsInt(), TimeUnit.MILLISECONDS);
     }
 
     /**
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 540d152f8f..4f38803316 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -41,6 +41,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.sql.ColumnType.INT32;
@@ -68,7 +69,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -209,10 +209,10 @@ import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.ThreadOperation;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import 
org.apache.ignite.internal.tx.configuration.TransactionExtensionConfiguration;
@@ -256,6 +256,7 @@ import org.junit.jupiter.params.provider.ValueSource;
  * These tests check node restart scenarios.
  */
 @ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 @Timeout(120)
 public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
     /** Value producer for table data, is used to create data and check it 
later. */
@@ -308,9 +309,11 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
      */
     private final Map<Integer, Supplier<CompletableFuture<Set<String>>>> 
dataNodesMockByNode = new ConcurrentHashMap<>();
 
-    private final ExecutorService storageExecutor = 
Executors.newSingleThreadExecutor(
-            IgniteThreadFactory.create("test", "storage-test-pool-iinrt", log, 
ThreadOperation.STORAGE_READ)
-    );
+    @InjectExecutorService(threadCount = 1, threadPrefix = 
"storage-test-pool-iinrt", allowedOperations = STORAGE_READ)
+    private ExecutorService storageExecutor;
+
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
 
     @BeforeEach
     public void beforeTest() {
@@ -482,7 +485,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 name,
                 dir.resolve("metastorage"),
                 new NoOpFailureManager(),
-                readOperationForCompactionTracker
+                readOperationForCompactionTracker,
+                scheduledExecutorService
         );
 
         InvokeInterceptor metaStorageInvokeInterceptor = 
metaStorageInvokeInterceptorByNode.get(idx);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index da8ccaaa32..5886172a6d 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -690,7 +690,8 @@ public class IgniteImpl implements Ignite {
                 name,
                 metastorageWorkDir.dbPath(),
                 failureManager,
-                readOperationForCompactionTracker
+                readOperationForCompactionTracker,
+                threadPoolsManager.commonScheduler()
         );
 
         metaStorageMgr = new MetaStorageManagerImpl(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 2f2b922c0e..38151f55cd 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -217,6 +217,8 @@ import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -255,7 +257,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 /**
  * Test suite for rebalance process, when replicas' number changed.
  */
-@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class, 
ExecutorServiceExtension.class})
 @Timeout(120)
 public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
     private static final IgniteLogger LOG = 
Loggers.forClass(ItRebalanceDistributedTest.class);
@@ -314,6 +316,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     @WorkDirectory
     private Path workDir;
 
+    @InjectExecutorService
+    private static ScheduledExecutorService commonScheduledExecutorService;
+
     private StaticNodeFinder finder;
 
     private List<Node> nodes;
@@ -1219,7 +1224,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                             name,
                             resolveDir(dir, "metaStorage"),
                             failureManager,
-                            readOperationForCompactionTracker
+                            readOperationForCompactionTracker,
+                            commonScheduledExecutorService
                     );
 
             var topologyAwareRaftGroupServiceFactory = new 
TopologyAwareRaftGroupServiceFactory(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 8269867073..f78f041f33 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -120,7 +120,9 @@ import 
org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
 import 
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
 import 
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
@@ -145,7 +147,7 @@ import org.mockito.quality.Strictness;
 /**
  * Table manager recovery scenarios.
  */
-@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class, 
ExecutorServiceExtension.class})
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class TableManagerRecoveryTest extends IgniteAbstractTest {
     private static final String NODE_NAME = "testNode1";
@@ -191,6 +193,9 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
     private final DataStorageModule dataStorageModule = 
createDataStorageModule();
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     @AfterEach
     void after() throws Exception {
         stopComponents();
@@ -270,7 +275,13 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
 
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
-        var storage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new 
NoOpFailureManager(), readOperationForCompactionTracker);
+        var storage = new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
 
         clock = new HybridClockImpl();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
index f17638efe8..a6c94f6ecc 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
@@ -47,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import java.nio.file.Path;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
 import org.apache.ignite.internal.lang.RunnableX;
@@ -55,6 +56,8 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
 import 
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -63,17 +66,27 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 /** For testing recovery of {@link IndexMetaStorage}. */
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
 public class IndexMetaStorageRecoveryTest extends BaseIndexMetaStorageTest {
     @WorkDirectory
     private Path workDir;
 
+    @InjectExecutorService
+    private ScheduledExecutorService scheduledExecutorService;
+
     private final TestUpdateHandlerInterceptor interceptor = new 
TestUpdateHandlerInterceptor();
 
     @Override
     MetaStorageManager createMetastore() {
         var readOperationForCompactionTracker = new 
ReadOperationForCompactionTracker();
 
-        var keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir, 
new NoOpFailureManager(), readOperationForCompactionTracker);
+        var keyValueStorage = new RocksDbKeyValueStorage(
+                NODE_NAME,
+                workDir,
+                new NoOpFailureManager(),
+                readOperationForCompactionTracker,
+                scheduledExecutorService
+        );
 
         return StandaloneMetaStorageManager.create(keyValueStorage, 
readOperationForCompactionTracker);
     }

Reply via email to