This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 16a312dcf9 IGNITE-23615 Use shared threadpools in RocksDbFlusher for
RocksDbKeyValueStorage (#4713)
16a312dcf9 is described below
commit 16a312dcf9a55ebb111bb4f120fb964749a35192
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 14 15:31:21 2024 +0300
IGNITE-23615 Use shared threadpools in RocksDbFlusher for
RocksDbKeyValueStorage (#4713)
---
.../catalog/CatalogManagerRecoveryTest.java | 15 +-
.../metastore/DeploymentUnitStoreImplTest.java | 15 +-
modules/core/build.gradle | 1 +
.../ExecutorServiceExtensionTest.java | 238 +++++++++++++++++++
.../testframework/ExecutorServiceExtension.java | 257 +++++++++++++++++++++
.../testframework/InjectExecutorService.java | 66 ++++++
...niteDistributionZoneManagerNodeRestartTest.java | 9 +-
.../IndexAvailabilityControllerRestorerTest.java | 23 +-
.../ignite/internal/index/IndexManagerTest.java | 15 +-
.../impl/ItIdempotentCommandCacheTest.java | 15 +-
.../impl/ItMetaStorageManagerImplTest.java | 10 +-
.../ItMetaStorageMultipleNodesRocksDbTest.java | 16 +-
...tMetaStorageSafeTimePropagationRocksDbTest.java | 15 +-
.../impl/ItMetaStorageServicePersistenceTest.java | 11 +-
.../metastorage/impl/ItMetaStorageWatchTest.java | 10 +-
.../server/persistence/RocksDbKeyValueStorage.java | 16 +-
.../impl/MetaStorageRocksDbRangeTest.java | 16 +-
.../RocksDbCompactionKeyValueStorageTest.java | 11 +-
.../server/RocksDbKeyValueStorageTest.java | 14 +-
.../replicator/ItReplicaLifecycleTest.java | 10 +-
.../internal/rocksdb/flush/RocksDbFlusher.java | 6 +-
.../runner/app/ItIgniteNodeRestartTest.java | 18 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
.../rebalance/ItRebalanceDistributedTest.java | 10 +-
.../distributed/TableManagerRecoveryTest.java | 15 +-
.../index/IndexMetaStorageRecoveryTest.java | 15 +-
26 files changed, 806 insertions(+), 44 deletions(-)
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
index f429b6568e..2078ca0624 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerRecoveryTest.java
@@ -45,6 +45,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.catalog.CatalogTestUtils.TestUpdateHandlerInterceptor;
import org.apache.ignite.internal.catalog.storage.SnapshotEntry;
@@ -59,6 +60,8 @@ import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.AfterEach;
@@ -68,12 +71,16 @@ import org.mockito.Mockito;
/** For {@link CatalogManager} testing on recovery. */
@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class CatalogManagerRecoveryTest extends BaseIgniteAbstractTest {
private static final String NODE_NAME = "test-node-name";
@WorkDirectory
private Path workDir;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
private final HybridClock clock = new HybridClockImpl();
private MetaStorageManager metaStorageManager;
@@ -193,7 +200,13 @@ public class CatalogManagerRecoveryTest extends
BaseIgniteAbstractTest {
private void createComponents() {
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
- var keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir,
new NoOpFailureManager(), readOperationForCompactionTracker);
+ var keyValueStorage = new RocksDbKeyValueStorage(
+ NODE_NAME,
+ workDir,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
metaStorageManager =
spy(StandaloneMetaStorageManager.create(keyValueStorage,
readOperationForCompactionTracker));
diff --git
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index cce084e764..042c9e22ba 100644
---
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.deployment.version.Version;
import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
import
org.apache.ignite.internal.deployunit.metastore.ClusterStatusWatchListener;
@@ -51,6 +52,8 @@ import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.AfterEach;
@@ -62,6 +65,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
* Test suite for {@link DeploymentUnitStoreImpl}.
*/
@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest {
private static final String LOCAL_NODE = "localNode";
@@ -90,6 +94,9 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
@WorkDirectory
private Path workDir;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
@BeforeEach
public void setup() {
nodeHistory.clear();
@@ -97,7 +104,13 @@ public class DeploymentUnitStoreImplTest extends
BaseIgniteAbstractTest {
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
- var storage = new RocksDbKeyValueStorage(LOCAL_NODE, workDir, new
NoOpFailureManager(), readOperationForCompactionTracker);
+ var storage = new RocksDbKeyValueStorage(
+ LOCAL_NODE,
+ workDir,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
MetaStorageManager metaStorageManager =
StandaloneMetaStorageManager.create(storage, readOperationForCompactionTracker);
metastore = new DeploymentUnitStoreImpl(metaStorageManager);
diff --git a/modules/core/build.gradle b/modules/core/build.gradle
index a22ac653ec..4b7d93bdc1 100644
--- a/modules/core/build.gradle
+++ b/modules/core/build.gradle
@@ -56,5 +56,6 @@ test {
//Don't run parametrized tests from inner static classes which should
not run.
excludeTestsMatching "*VariableSourceTest\$*"
excludeTestsMatching "*WorkDirectoryExtensionTest\$*"
+ excludeTestsMatching "*ExecutorServiceExtensionTest\$*"
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
new file mode 100644
index 0000000000..249127816c
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/testframework/ExecutorServiceExtensionTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static
org.apache.ignite.internal.testframework.JunitExtensionTestUtils.assertExecutesSuccessfully;
+import static
org.apache.ignite.internal.testframework.JunitExtensionTestUtils.assertExecutesWithFailure;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
+import static
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS;
+import static org.apache.ignite.internal.thread.ThreadOperation.WAIT;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+import static
org.junit.platform.testkit.engine.TestExecutionResultConditions.instanceOf;
+import static
org.junit.platform.testkit.engine.TestExecutionResultConditions.message;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.ThreadOperation;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** For {@link ExecutorServiceExtension} testing. */
+public class ExecutorServiceExtensionTest {
+ private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+ @ExtendWith(ExecutorServiceExtension.class)
+ static class NormalFieldInjectionTest {
+ private static final String DEFAULT_THREAD_PREFIX_FORMAT =
"test-NormalFieldInjectionTest-%s";
+
+ private static final String DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT =
"test-NormalFieldInjectionTest-%s-%s";
+
+ @InjectExecutorService
+ private static ExecutorService staticExecutorServiceWithDefaults;
+
+ @InjectExecutorService(threadCount = 2, threadPrefix =
"test-foo-static-executor", allowedOperations = TX_STATE_STORAGE_ACCESS)
+ private static ExecutorService staticExecutorService;
+
+ @InjectExecutorService
+ private static ScheduledExecutorService
staticScheduledExecutorServiceWithDefaults;
+
+ @InjectExecutorService(
+ threadCount = 3,
+ threadPrefix = "test-bar-static-executor",
+ allowedOperations = {STORAGE_READ, STORAGE_WRITE}
+ )
+ private static ScheduledExecutorService staticScheduledExecutorService;
+
+ @InjectExecutorService
+ private ExecutorService instanceExecutorServiceWithDefaults;
+
+ @InjectExecutorService(threadCount = 4, threadPrefix =
"test-foo-instance-executor", allowedOperations = STORAGE_WRITE)
+ private ExecutorService instanceExecutorService;
+
+ @InjectExecutorService
+ private ScheduledExecutorService
instanceScheduledExecutorServiceWithDefaults;
+
+ @InjectExecutorService(threadCount = 5, threadPrefix =
"test-bar-instance-executor", allowedOperations = PROCESS_RAFT_REQ)
+ private ScheduledExecutorService instanceScheduledExecutorService;
+
+ @BeforeAll
+ static void beforeAll(
+ @InjectExecutorService
+ ExecutorService staticParameterExecutorServiceWithDefaults,
+ @InjectExecutorService(threadCount = 6, threadPrefix =
"test-foo-static-param-executor", allowedOperations = WAIT)
+ ExecutorService staticParameterExecutorService,
+ @InjectExecutorService
+ ScheduledExecutorService
staticParameterScheduledExecutorServiceWithDefaults,
+ @InjectExecutorService(threadCount = 7, threadPrefix =
"test-bar-static-param-executor", allowedOperations = WAIT)
+ ScheduledExecutorService
staticParameterScheduledExecutorService
+ ) {
+ checkExecutorService(
+ staticParameterExecutorServiceWithDefaults,
+ CPUS,
+ String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT,
"beforeAll", "arg0")
+ );
+ checkScheduledExecutorService(
+ staticParameterScheduledExecutorServiceWithDefaults,
+ 1,
+ String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT,
"beforeAll", "arg2")
+ );
+
+ checkExecutorService(staticParameterExecutorService, 6,
"test-foo-static-param-executor", WAIT);
+
checkScheduledExecutorService(staticParameterScheduledExecutorService, 7,
"test-bar-static-param-executor", WAIT);
+ }
+
+ @Test
+ void test(
+ @InjectExecutorService
+ ExecutorService parameterExecutorServiceWithDefaults,
+ @InjectExecutorService(threadCount = 8, threadPrefix =
"test-foo-param-executor", allowedOperations = WAIT)
+ ExecutorService parameterExecutorService,
+ @InjectExecutorService
+ ScheduledExecutorService
parameterScheduledExecutorServiceWithDefaults,
+ @InjectExecutorService(threadCount = 9, threadPrefix =
"test-bar-param-executor", allowedOperations = WAIT)
+ ScheduledExecutorService parameterScheduledExecutorService
+ ) {
+ checkExecutorService(
+ staticExecutorServiceWithDefaults,
+ CPUS,
+ String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"staticExecutorServiceWithDefaults")
+ );
+ checkScheduledExecutorService(
+ staticScheduledExecutorServiceWithDefaults,
+ 1,
+ String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"staticScheduledExecutorServiceWithDefaults")
+ );
+ checkExecutorService(
+ instanceExecutorServiceWithDefaults,
+ CPUS,
+ String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"instanceExecutorServiceWithDefaults")
+ );
+ checkScheduledExecutorService(
+ instanceScheduledExecutorServiceWithDefaults,
+ 1,
+ String.format(DEFAULT_THREAD_PREFIX_FORMAT,
"instanceScheduledExecutorServiceWithDefaults")
+ );
+ checkExecutorService(
+ parameterExecutorServiceWithDefaults,
+ CPUS,
+ String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT,
"test", "arg0")
+ );
+ checkScheduledExecutorService(
+ parameterScheduledExecutorServiceWithDefaults,
+ 1,
+ String.format(DEFAULT_THREAD_PREFIX_FOR_METHOD_FORMAT,
"test", "arg2")
+ );
+
+ checkExecutorService(staticExecutorService, 2,
"test-foo-static-executor", TX_STATE_STORAGE_ACCESS);
+ checkScheduledExecutorService(staticScheduledExecutorService, 3,
"test-bar-static-executor", STORAGE_READ, STORAGE_WRITE);
+ checkExecutorService(instanceExecutorService, 4,
"test-foo-instance-executor", STORAGE_WRITE);
+ checkScheduledExecutorService(instanceScheduledExecutorService, 5,
"test-bar-instance-executor", PROCESS_RAFT_REQ);
+ checkExecutorService(parameterExecutorService, 8,
"test-foo-param-executor", WAIT);
+ checkScheduledExecutorService(parameterScheduledExecutorService,
9, "test-bar-param-executor", WAIT);
+ }
+ }
+
+ @ExtendWith(ExecutorServiceExtension.class)
+ static class ErrorFieldInjectionTest {
+ @InjectExecutorService
+ private static Integer staticWrongType;
+
+ @InjectExecutorService
+ private String instanceWrongType;
+
+ @Test
+ public void test(
+ @InjectExecutorService
+ Boolean parameterWrongType
+ ) {
+ fail("Should not reach here");
+ }
+ }
+
+ @Test
+ void testFieldInjection() {
+ assertExecutesSuccessfully(NormalFieldInjectionTest.class);
+ }
+
+ @Test
+ void testWrongTypeInjection() {
+ assertExecutesWithFailure(
+ ErrorFieldInjectionTest.class,
+ instanceOf(IllegalStateException.class),
+ message(m -> m.contains("Unsupported field type"))
+ );
+ }
+
+ private static void checkExecutorService(
+ ExecutorService service,
+ int expCorePoolSize,
+ String expThreadPrefix,
+ ThreadOperation... expThreadOperations
+ ) {
+ assertThat(service, instanceOf(ThreadPoolExecutor.class));
+
+ checkThreadPoolExecutor((ThreadPoolExecutor) service, expCorePoolSize,
expThreadPrefix, expThreadOperations);
+ }
+
+ private static void checkScheduledExecutorService(
+ ScheduledExecutorService service,
+ int expCorePoolSize,
+ String expThreadPrefix,
+ ThreadOperation... expThreadOperations
+ ) {
+ assertThat(service, instanceOf(ScheduledThreadPoolExecutor.class));
+
+ checkThreadPoolExecutor((ScheduledThreadPoolExecutor) service,
expCorePoolSize, expThreadPrefix, expThreadOperations);
+ }
+
+ private static void checkThreadPoolExecutor(
+ ThreadPoolExecutor executor,
+ int expCorePoolSize,
+ String expThreadPrefix,
+ ThreadOperation... expThreadOperations
+ ) {
+ assertEquals(executor.getCorePoolSize(), expCorePoolSize);
+
+ assertThat(
+ runAsync(() -> {
+ Thread thread = Thread.currentThread();
+
+ assertThat(thread, instanceOf(IgniteThread.class));
+
+ IgniteThread thread1 = (IgniteThread) thread;
+
+ assertThat(thread1.getName(), startsWith(expThreadPrefix));
+ assertThat(thread1.allowedOperations(),
containsInAnyOrder(expThreadOperations));
+ }, executor),
+ willCompleteSuccessfully()
+ );
+ }
+}
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
new file mode 100644
index 0000000000..6963a57519
--- /dev/null
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/ExecutorServiceExtension.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import static java.lang.reflect.Modifier.isStatic;
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.ThreadOperation;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolutionException;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.platform.commons.support.AnnotationSupport;
+import org.junit.platform.commons.support.HierarchyTraversalMode;
+
+/**
+ * JUnit extension for injecting temporary {@link ExecutorService}'s into test
classes.
+ *
+ * @see InjectExecutorService
+ */
+public class ExecutorServiceExtension implements BeforeAllCallback,
AfterAllCallback, BeforeEachCallback, AfterEachCallback,
+ ParameterResolver {
+ private static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+ private static final Namespace NAMESPACE =
Namespace.create(ExecutorServiceExtension.class);
+
+ private static final Set<Class<?>> SUPPORTED_FIELD_TYPES =
Set.of(ScheduledExecutorService.class, ExecutorService.class);
+
+ private static final Object STATIC_EXECUTORS_KEY = new Object();
+
+ private static final Object INSTANCE_EXECUTORS_KEY = new Object();
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ injectFields(context, true);
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ shutdownExecutors(context, true);
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ injectFields(context, false);
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ shutdownExecutors(context, false);
+ }
+
+ @Override
+ public boolean supportsParameter(
+ ParameterContext parameterContext,
+ ExtensionContext extensionContext
+ ) throws ParameterResolutionException {
+ return parameterContext.isAnnotated(InjectExecutorService.class)
+ &&
isFieldTypeSupported(parameterContext.getParameter().getType());
+ }
+
+ @Override
+ public Object resolveParameter(
+ ParameterContext parameterContext,
+ ExtensionContext extensionContext
+ ) throws ParameterResolutionException {
+ if (!supportsParameter(parameterContext, extensionContext)) {
+ throw new ParameterResolutionException("Unknown parameter:" +
parameterContext.getParameter());
+ }
+
+ boolean forStatic =
isStatic(parameterContext.getParameter().getDeclaringExecutable().getModifiers());
+
+ List<ExecutorService> executorServices =
getOrCreateExecutorServiceListInStore(extensionContext, forStatic);
+
+ InjectExecutorService injectExecutorService =
parameterContext.findAnnotation(InjectExecutorService.class).orElse(null);
+
+ assert injectExecutorService != null : parameterContext.getParameter();
+
+ ExecutorService executorService = createExecutorService(
+ injectExecutorService,
+ parameterContext.getParameter().getName(),
+ parameterContext.getParameter().getType(),
+ extensionContext.getRequiredTestClass(),
+
parameterContext.getParameter().getDeclaringExecutable().getName()
+ );
+
+ executorServices.add(executorService);
+
+ return executorService;
+ }
+
+ private static void injectFields(ExtensionContext context, boolean
forStatic) throws Exception {
+ Class<?> testClass = context.getRequiredTestClass();
+ Object testInstance = context.getTestInstance().orElse(null);
+
+ List<Field> fields = collectFields(testClass, forStatic);
+
+ if (fields.isEmpty()) {
+ return;
+ }
+
+ List<ExecutorService> executorServices =
getOrCreateExecutorServiceListInStore(context, forStatic);
+
+ for (Field field : fields) {
+ checkFieldTypeIsSupported(field);
+
+ ExecutorService executorService = createExecutorService(field);
+
+ executorServices.add(executorService);
+
+ field.setAccessible(true);
+
+ field.set(forStatic ? null : testInstance, executorService);
+ }
+ }
+
+ private static void shutdownExecutors(ExtensionContext context, boolean
forStatic) throws Exception {
+ List<ExecutorService> removed = (List<ExecutorService>)
context.getStore(NAMESPACE).remove(storeKey(forStatic));
+
+ if (removed == null || removed.isEmpty()) {
+ return;
+ }
+
+ Stream<AutoCloseable> autoCloseableStream = removed.stream()
+ .map(executorService -> () ->
IgniteUtils.shutdownAndAwaitTermination(executorService, 10, TimeUnit.SECONDS));
+
+ IgniteUtils.closeAll(autoCloseableStream);
+ }
+
+ private static List<Field> collectFields(Class<?> testClass, boolean
forStatic) {
+ return AnnotationSupport.findAnnotatedFields(
+ testClass,
+ InjectExecutorService.class,
+ field -> isStatic(field.getModifiers()) == forStatic,
+ HierarchyTraversalMode.TOP_DOWN
+ );
+ }
+
+ private static void checkFieldTypeIsSupported(Field field) {
+ if (!isFieldTypeSupported(field.getType())) {
+ throw new IllegalStateException(
+ String.format("Unsupported field type: [field=%s,
supportedFieldTypes=%s]", field, SUPPORTED_FIELD_TYPES)
+ );
+ }
+ }
+
+ private static boolean isFieldTypeSupported(Class<?> fieldType) {
+ for (Class<?> supportedFieldType : SUPPORTED_FIELD_TYPES) {
+ if (fieldType.equals(supportedFieldType)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static ExecutorService createExecutorService(Field field) {
+ InjectExecutorService injectExecutorService =
field.getAnnotation(InjectExecutorService.class);
+
+ assert injectExecutorService != null : field;
+
+ return createExecutorService(injectExecutorService, field.getName(),
field.getType(), field.getDeclaringClass(), null);
+ }
+
+ private static ExecutorService createExecutorService(
+ InjectExecutorService injectExecutorService,
+ String fieldName,
+ Class<?> fieldType,
+ Class<?> testClass,
+ @Nullable String methodName
+ ) {
+ int threadCount = injectExecutorService.threadCount();
+ String threadPrefix = threadPrefix(injectExecutorService, testClass,
fieldName, methodName);
+ ThreadOperation[] allowedOperations =
injectExecutorService.allowedOperations();
+
+ ThreadFactory threadFactory =
IgniteThreadFactory.withPrefix(threadPrefix, Loggers.forClass(testClass),
allowedOperations);
+
+ if (fieldType.equals(ScheduledExecutorService.class)) {
+ return newScheduledThreadPool(threadCount == 0 ? 1 : threadCount,
threadFactory);
+ } else if (fieldType.equals(ExecutorService.class)) {
+ return newFixedThreadPool(threadCount == 0 ? CPUS : threadCount,
threadFactory);
+ }
+
+ throw new AssertionError(
+ String.format("Unsupported field type: [field=%s,
supportedFieldTypes=%s]", fieldName, SUPPORTED_FIELD_TYPES)
+ );
+ }
+
+ private static String threadPrefix(
+ InjectExecutorService injectExecutorService,
+ Class<?> testClass,
+ String fieldName,
+ @Nullable String methodName
+ ) {
+ String threadPrefix = injectExecutorService.threadPrefix();
+
+ if (threadPrefix != null && !"".equals(threadPrefix)) {
+ return threadPrefix;
+ }
+
+ if (methodName == null) {
+ return String.format("test-%s-%s", testClass.getSimpleName(),
fieldName);
+ }
+
+ return String.format("test-%s-%s-%s", testClass.getSimpleName(),
methodName, fieldName);
+ }
+
+ private static Object storeKey(boolean forStatic) {
+ return forStatic ? STATIC_EXECUTORS_KEY : INSTANCE_EXECUTORS_KEY;
+ }
+
+ private static List<ExecutorService>
getOrCreateExecutorServiceListInStore(ExtensionContext context, boolean
forStatic) {
+ List<ExecutorService> executorServices = (List<ExecutorService>)
context.getStore(NAMESPACE).get(storeKey(forStatic));
+
+ if (executorServices == null) {
+ executorServices = new CopyOnWriteArrayList<>();
+
+ context.getStore(NAMESPACE).put(storeKey(forStatic),
executorServices);
+ }
+
+ return executorServices;
+ }
+}
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/InjectExecutorService.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/InjectExecutorService.java
new file mode 100644
index 0000000000..25c563fadc
--- /dev/null
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/InjectExecutorService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.ignite.internal.thread.ThreadOperation;
+
+/**
+ * Annotation for injecting {@link ExecutorService} instances into tests.
+ *
+ * <p>Executor is created based on the type of field into which it will need
to be injected.</p>
+ *
+ * <p>For each field a new executor will be created and will have a fixed
number of threads.</p>
+ *
+ * <p>Supported field types:</p>
+ * <ul>
+ * <li>{@link ExecutorService}.</li>
+ * <li>{@link ScheduledExecutorService}.</li>
+ * </ul>
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.PARAMETER})
+public @interface InjectExecutorService {
+ /**
+ * Number of threads in the executor.
+ *
+ * <p>By default it will depend on the type of executor:</p>
+ * <ul>
+ * <li>{@link ScheduledExecutorService} - 1 thread.</li>
+ * <li>{@link ExecutorService} - {@link
Runtime#availableProcessors()}.</li>
+ * </ul>
+ */
+ int threadCount() default 0;
+
+ /**
+ * Prefix of thread names in the executor.
+ *
+ * <p>By default the prefix will be in the format
"test-class_name-field_name", for example "test-FooTest-commonExecutor" for
class
+ * fields and in the format "test-class_name-method_name-param_name", for
example "test-FooTest-beforeAll-commonExecutor" for class
+ * methods.</p>
+ */
+ String threadPrefix() default "";
+
+ /** Operations that are allowed to be executed on threads. By default,
nothing is available. */
+ ThreadOperation[] allowedOperations() default {};
+}
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
index fb186cd012..518b054875 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/distributionzones/ItIgniteDistributionZoneManagerNodeRestartTest.java
@@ -123,6 +123,8 @@ import
org.apache.ignite.internal.network.configuration.NetworkExtensionConfigur
import org.apache.ignite.internal.network.recovery.VaultStaleIds;
import
org.apache.ignite.internal.network.scalecube.TestScaleCubeClusterServiceFactory;
import
org.apache.ignite.internal.security.authentication.validator.AuthenticationProvidersValidatorImpl;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.vault.VaultManager;
@@ -140,6 +142,7 @@ import org.junit.jupiter.params.provider.ValueSource;
* Tests for checking {@link DistributionZoneManager} behavior after node's
restart.
*/
@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class ItIgniteDistributionZoneManagerNodeRestartTest extends
BaseIgniteRestartTest {
private static final LogicalNode A = new LogicalNode(
new ClusterNodeImpl(randomUUID(), "A", new
NetworkAddress("localhost", 123)),
@@ -172,6 +175,9 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
private volatile boolean startGlobalStateUpdateBlocking;
+ @InjectExecutorService
+ private ScheduledExecutorService commonScheduledExecutorService;
+
/**
* Start some of Ignite components that are able to serve as Ignite node
for test purposes.
*
@@ -245,7 +251,8 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest
extends BaseIgniteRe
name,
workDir.resolve("metastorage"),
new NoOpFailureManager(),
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ commonScheduledExecutorService
);
var clock = new HybridClockImpl();
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
index eb989f1e08..87ef5f4452 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerRestorerTest.java
@@ -49,6 +49,7 @@ import static org.mockito.Mockito.when;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -63,6 +64,8 @@ import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.TopologyService;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterNode;
@@ -73,10 +76,14 @@ import org.junit.jupiter.api.extension.ExtendWith;
/** For {@link IndexAvailabilityController} testing on node recovery. */
@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class IndexAvailabilityControllerRestorerTest extends
BaseIgniteAbstractTest {
@WorkDirectory
private Path workDir;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
private final HybridClock clock = new HybridClockImpl();
private final ClusterService clusterService = mock(ClusterService.class);
@@ -93,7 +100,13 @@ public class IndexAvailabilityControllerRestorerTest
extends BaseIgniteAbstractT
void setUp() throws Exception {
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
- keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new
NoOpFailureManager(), readOperationForCompactionTracker);
+ keyValueStorage = new RocksDbKeyValueStorage(
+ NODE_NAME,
+ workDir,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
metaStorageManager =
StandaloneMetaStorageManager.create(keyValueStorage, clock,
readOperationForCompactionTracker);
@@ -220,7 +233,13 @@ public class IndexAvailabilityControllerRestorerTest
extends BaseIgniteAbstractT
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
- keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new
NoOpFailureManager(), readOperationForCompactionTracker);
+ keyValueStorage = new RocksDbKeyValueStorage(
+ NODE_NAME,
+ workDir,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
metaStorageManager =
StandaloneMetaStorageManager.create(keyValueStorage, clock,
readOperationForCompactionTracker);
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index e51cfc618d..6bc1d18c71 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -49,6 +49,7 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -76,6 +77,8 @@ import
org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.schema.ConstantSchemaVersions;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.LockManager;
@@ -89,12 +92,16 @@ import org.junit.jupiter.api.extension.ExtendWith;
/** Test class to verify {@link IndexManager}. */
@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class IndexManagerTest extends BaseIgniteAbstractTest {
private final HybridClock clock = new HybridClockImpl();
@WorkDirectory
private Path workDir;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
private TableManager mockTableManager;
private SchemaManager mockSchemaManager;
@@ -229,7 +236,13 @@ public class IndexManagerTest extends
BaseIgniteAbstractTest {
private void createAndStartComponents() {
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
- var storage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new
NoOpFailureManager(), readOperationForCompactionTracker);
+ var storage = new RocksDbKeyValueStorage(
+ NODE_NAME,
+ workDir,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
metaStorageManager = StandaloneMetaStorageManager.create(storage,
clock, readOperationForCompactionTracker);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 07de8f9a13..6134ef19b2 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
@@ -96,7 +97,9 @@ import
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.ActionResponse;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
@@ -113,6 +116,7 @@ import org.junit.jupiter.params.provider.MethodSource;
* Integration tests for idempotency of {@link
org.apache.ignite.internal.metastorage.command.IdempotentCommand}.
*/
@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class ItIdempotentCommandCacheTest extends IgniteAbstractTest {
private static final MetaStorageCommandsFactory CMD_FACTORY = new
MetaStorageCommandsFactory();
@@ -135,6 +139,9 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
@InjectConfiguration("mock.idleSyncTimeInterval = 100")
private MetaStorageConfiguration metaStorageConfiguration;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
private List<Node> nodes;
private static class Node implements AutoCloseable {
@@ -161,7 +168,8 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
RaftConfiguration raftConfiguration,
MetaStorageConfiguration metaStorageConfiguration,
Path workDir,
- int index
+ int index,
+ ScheduledExecutorService scheduledExecutorService
) {
List<NetworkAddress> addrs = new ArrayList<>();
@@ -214,7 +222,8 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
clusterService.nodeName(),
metastorageWorkDir.dbPath(),
new NoOpFailureManager(),
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ scheduledExecutorService
));
metaStorageManager = new MetaStorageManagerImpl(
@@ -543,7 +552,7 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
nodes = new ArrayList<>();
for (int i = 0; i < NODES_COUNT; i++) {
- Node node = new Node(testInfo, raftConfiguration,
metaStorageConfiguration, workDir, i);
+ Node node = new Node(testInfo, raftConfiguration,
metaStorageConfiguration, workDir, i, scheduledExecutorService);
nodes.add(node);
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 842e4ab47f..670f5c8b00 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -50,6 +50,7 @@ import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
@@ -93,7 +94,9 @@ import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFacto
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.ReadActionRequest;
@@ -113,6 +116,7 @@ import org.mockito.ArgumentCaptor;
* Integration tests for {@link MetaStorageManagerImpl}.
*/
@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
private static final ByteArray FOO_KEY = new ByteArray("foo");
@@ -133,6 +137,9 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
@InjectConfiguration
private RaftConfiguration raftConfiguration;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
private final ReadOperationForCompactionTracker
readOperationForCompactionTracker = new ReadOperationForCompactionTracker();
@BeforeEach
@@ -185,7 +192,8 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
clusterService.nodeName(),
metastorageWorkDir.dbPath(),
new NoOpFailureManager(),
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ scheduledExecutorService
);
metaStorageManager = new MetaStorageManagerImpl(
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
index 4695862dff..682f103f2a 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesRocksDbTest.java
@@ -18,15 +18,29 @@
package org.apache.ignite.internal.metastorage.impl;
import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.extension.ExtendWith;
/** {@link ItMetaStorageMultipleNodesVsStorageTest} with {@link
RocksDbKeyValueStorage} implementation. */
+@ExtendWith(ExecutorServiceExtension.class)
public class ItMetaStorageMultipleNodesRocksDbTest extends
ItMetaStorageMultipleNodesVsStorageTest {
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
@Override
public KeyValueStorage createStorage(String nodeName, Path path,
ReadOperationForCompactionTracker readOperationForCompactionTracker) {
- return new RocksDbKeyValueStorage(nodeName, path.resolve("ms"), new
NoOpFailureManager(), readOperationForCompactionTracker);
+ return new RocksDbKeyValueStorage(
+ nodeName,
+ path.resolve("ms"),
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
}
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
index f7ef2eb21d..35ed3342f3 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageSafeTimePropagationRocksDbTest.java
@@ -18,22 +18,35 @@
package org.apache.ignite.internal.metastorage.impl;
import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.extension.ExtendWith;
/** {@link ItMetaStorageSafeTimePropagationAbstractTest} with {@link
RocksDbKeyValueStorage} implementation. */
@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class ItMetaStorageSafeTimePropagationRocksDbTest extends
ItMetaStorageSafeTimePropagationAbstractTest {
@WorkDirectory
private Path workDir;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
@Override
public KeyValueStorage createStorage() {
- return new RocksDbKeyValueStorage(NODE_NAME, workDir, new
NoOpFailureManager(), new ReadOperationForCompactionTracker());
+ return new RocksDbKeyValueStorage(
+ NODE_NAME,
+ workDir,
+ new NoOpFailureManager(),
+ new ReadOperationForCompactionTracker(),
+ scheduledExecutorService
+ );
}
}
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
index 4c0784af4e..8655e87344 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BooleanSupplier;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -47,15 +48,19 @@ import
org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Persistent (rocksdb-based) meta storage raft group snapshots tests.
*/
+@ExtendWith(ExecutorServiceExtension.class)
public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnapshotTest<MetaStorageListener> {
private static final ByteArray FIRST_KEY = ByteArray.fromString("first");
@@ -65,6 +70,9 @@ public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnaps
private static final byte[] SECOND_VALUE =
"secondValue".getBytes(StandardCharsets.UTF_8);
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
private MetaStorageServiceImpl metaStorage;
private final Map<String, RocksDbKeyValueStorage> storageByName = new
HashMap<>();
@@ -154,7 +162,8 @@ public class ItMetaStorageServicePersistenceTest extends
ItAbstractListenerSnaps
name,
listenerPersistencePath,
new NoOpFailureManager(),
- new ReadOperationForCompactionTracker()
+ new ReadOperationForCompactionTracker(),
+ scheduledExecutorService
);
s.start();
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index d3f01b1949..9c9009ee7a 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -45,6 +45,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
@@ -92,7 +93,9 @@ import
org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.util.SharedLogStorageFactoryUtils;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
@@ -108,6 +111,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
* Tests for Meta Storage Watches.
*/
@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class ItMetaStorageWatchTest extends IgniteAbstractTest {
@InjectConfiguration
@@ -119,6 +123,9 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
@InjectConfiguration
private static MetaStorageConfiguration metaStorageConfiguration;
+ @InjectExecutorService
+ private static ScheduledExecutorService scheduledExecutorService;
+
private static class Node {
private final List<IgniteComponent> components = new ArrayList<>();
@@ -229,7 +236,8 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
name(),
metastorageWorkDir.dbPath(),
new NoOpFailureManager(),
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ scheduledExecutorService
);
this.metaStorageManager = new MetaStorageManagerImpl(
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 810e8c5401..4e26de9ae7 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -191,7 +191,9 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
/** Executor for storage operations. */
private final ExecutorService executor;
- /** Scheduled executor for storage operations. */
+ /**
+ * Scheduled executor. Needed only for asynchronous start of scheduled
operations without performing blocking, long or IO operations.
+ */
private final ScheduledExecutorService scheduledExecutor;
/** Path to the rocksdb database. */
@@ -285,12 +287,15 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
* @param dbPath RocksDB path.
* @param failureManager Failure processor that is used to handle critical
errors.
* @param readOperationForCompactionTracker Read operation tracker for
metastorage compaction.
+ * @param scheduledExecutor Scheduled executor. Needed only for
asynchronous start of scheduled operations without performing blocking,
+ * long or IO operations.
*/
public RocksDbKeyValueStorage(
String nodeName,
Path dbPath,
FailureManager failureManager,
- ReadOperationForCompactionTracker readOperationForCompactionTracker
+ ReadOperationForCompactionTracker
readOperationForCompactionTracker,
+ ScheduledExecutorService scheduledExecutor
) {
super(
nodeName,
@@ -299,16 +304,12 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
);
this.dbPath = dbPath;
+ this.scheduledExecutor = scheduledExecutor;
executor = Executors.newFixedThreadPool(
2,
NamedThreadFactory.create(nodeName,
"metastorage-rocksdb-kv-storage-executor", log)
);
-
- // TODO: IGNITE-23615 Use a common pool, e.g.
ThreadPoolsManager#commonScheduler
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
- NamedThreadFactory.create(nodeName,
"metastorage-rocksdb-kv-storage-scheduler", log)
- );
}
@Override
@@ -497,7 +498,6 @@ public class RocksDbKeyValueStorage extends
AbstractKeyValueStorage {
flusher.stop();
IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
- IgniteUtils.shutdownAndAwaitTermination(scheduledExecutor, 10,
TimeUnit.SECONDS);
rwLock.writeLock().lock();
try {
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
index 0251dc8b86..651a690492 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRocksDbRangeTest.java
@@ -18,16 +18,30 @@
package org.apache.ignite.internal.metastorage.impl;
import java.nio.file.Path;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* {@link MetaStorageRangeTest} implementation using {@link
RocksDbKeyValueStorage}.
*/
+@ExtendWith(ExecutorServiceExtension.class)
public class MetaStorageRocksDbRangeTest extends MetaStorageRangeTest {
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
@Override
KeyValueStorage getStorage(Path path) {
- return new RocksDbKeyValueStorage("test", path, new
NoOpFailureManager(), readOperationForCompactionTracker);
+ return new RocksDbKeyValueStorage(
+ "test",
+ path,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
}
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
index e542469d91..442bfb2225 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbCompactionKeyValueStorageTest.java
@@ -20,20 +20,29 @@ package org.apache.ignite.internal.metastorage.server;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
/** Compaction test for the RocksDB implementation of {@link KeyValueStorage}.
*/
+@ExtendWith(ExecutorServiceExtension.class)
public class RocksDbCompactionKeyValueStorageTest extends
AbstractCompactionKeyValueStorageTest {
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
@Override
public KeyValueStorage createStorage() {
return new RocksDbKeyValueStorage(
NODE_NAME,
workDir.resolve("storage"),
new NoOpFailureManager(),
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ scheduledExecutorService
);
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 0f0d7b5320..a92962abb4 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.zip.Checksum;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.ByteArray;
@@ -41,20 +42,28 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.raft.jraft.util.CRC64;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests for RocksDB key-value storage implementation.
*/
+@ExtendWith(ExecutorServiceExtension.class)
public class RocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStorageTest {
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
@Override
public KeyValueStorage createStorage() {
return new RocksDbKeyValueStorage(
NODE_NAME,
workDir.resolve("storage"),
new NoOpFailureManager(),
- new ReadOperationForCompactionTracker()
+ new ReadOperationForCompactionTracker(),
+ scheduledExecutorService
);
}
@@ -82,7 +91,8 @@ public class RocksDbKeyValueStorageTest extends
BasicOperationsKeyValueStorageTe
NODE_NAME,
workDir.resolve("storage"),
new NoOpFailureManager(),
- new ReadOperationForCompactionTracker()
+ new ReadOperationForCompactionTracker(),
+ scheduledExecutorService
);
storage.start();
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index cdc051c283..f50c9b7a58 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -190,6 +190,8 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -229,7 +231,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Replica lifecycle test.
*/
-@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class,
ExecutorServiceExtension.class})
@Timeout(60)
// TODO: https://issues.apache.org/jira/browse/IGNITE-22522 remove this test
after the switching to zone-based replication
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23252")
@@ -293,6 +295,9 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
private static String featureFlagOldValue =
System.getProperty(FEATURE_FLAG_NAME);
+ @InjectExecutorService
+ private static ScheduledExecutorService scheduledExecutorService;
+
@BeforeAll
static void beforeAll() {
System.setProperty(FEATURE_FLAG_NAME, "true");
@@ -1087,7 +1092,8 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
name,
resolveDir(dir, "metaStorageTestKeyValue"),
failureManager,
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ scheduledExecutorService
);
var topologyAwareRaftGroupServiceFactory = new
TopologyAwareRaftGroupServiceFactory(
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
index e4a1b8c4d2..05e211aa7c 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/flush/RocksDbFlusher.java
@@ -62,7 +62,7 @@ public class RocksDbFlusher {
/** Scheduled pool to schedule flushes. */
private final ScheduledExecutorService scheduledPool;
- /** Thread pool to complete flush completion futures. */
+ /** Thread pool to execute flush and complete flush completion futures. */
final ExecutorService threadPool;
/** Supplier of delay values to batch independent flush requests. */
@@ -104,7 +104,7 @@ public class RocksDbFlusher {
* @param name RocksDB instance name, for logging purposes.
* @param busyLock Busy lock.
* @param scheduledPool Scheduled pool the schedule flushes.
- * @param threadPool Thread pool to run flush completion closure, provided
by {@code onFlushCompleted} parameter.
+ * @param threadPool Thread pool to execute flush and to run flush
completion closure, provided by {@code onFlushCompleted} parameter.
* @param delaySupplier Supplier of delay values to batch independent
flush requests. When {@link #awaitFlush(boolean)} is called with
* {@code true} parameter, the flusher waits given number of
milliseconds (using {@code scheduledPool}) and then executes flush
* only if there were no other {@code awaitFlush(true)} calls.
Otherwise, it does nothing after the timeout. This guarantees that
@@ -240,7 +240,7 @@ public class RocksDbFlusher {
latestFlushClosure = newClosure;
- scheduledPool.schedule(newClosure, delaySupplier.getAsInt(),
TimeUnit.MILLISECONDS);
+ scheduledPool.schedule(() -> runAsync(newClosure, threadPool),
delaySupplier.getAsInt(), TimeUnit.MILLISECONDS);
}
/**
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 540d152f8f..4f38803316 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -41,6 +41,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeN
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.sql.ColumnType.INT32;
@@ -68,7 +69,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -209,10 +209,10 @@ import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import
org.apache.ignite.internal.tx.configuration.TransactionExtensionConfiguration;
@@ -256,6 +256,7 @@ import org.junit.jupiter.params.provider.ValueSource;
* These tests check node restart scenarios.
*/
@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
@Timeout(120)
public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest {
/** Value producer for table data, is used to create data and check it
later. */
@@ -308,9 +309,11 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
*/
private final Map<Integer, Supplier<CompletableFuture<Set<String>>>>
dataNodesMockByNode = new ConcurrentHashMap<>();
- private final ExecutorService storageExecutor =
Executors.newSingleThreadExecutor(
- IgniteThreadFactory.create("test", "storage-test-pool-iinrt", log,
ThreadOperation.STORAGE_READ)
- );
+ @InjectExecutorService(threadCount = 1, threadPrefix =
"storage-test-pool-iinrt", allowedOperations = STORAGE_READ)
+ private ExecutorService storageExecutor;
+
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
@BeforeEach
public void beforeTest() {
@@ -482,7 +485,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
name,
dir.resolve("metastorage"),
new NoOpFailureManager(),
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ scheduledExecutorService
);
InvokeInterceptor metaStorageInvokeInterceptor =
metaStorageInvokeInterceptorByNode.get(idx);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index da8ccaaa32..5886172a6d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -690,7 +690,8 @@ public class IgniteImpl implements Ignite {
name,
metastorageWorkDir.dbPath(),
failureManager,
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ threadPoolsManager.commonScheduler()
);
metaStorageMgr = new MetaStorageManagerImpl(
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 2f2b922c0e..38151f55cd 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -217,6 +217,8 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.Outgo
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -255,7 +257,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Test suite for rebalance process, when replicas' number changed.
*/
-@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
+@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class,
ExecutorServiceExtension.class})
@Timeout(120)
public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest {
private static final IgniteLogger LOG =
Loggers.forClass(ItRebalanceDistributedTest.class);
@@ -314,6 +316,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
@WorkDirectory
private Path workDir;
+ @InjectExecutorService
+ private static ScheduledExecutorService commonScheduledExecutorService;
+
private StaticNodeFinder finder;
private List<Node> nodes;
@@ -1219,7 +1224,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
name,
resolveDir(dir, "metaStorage"),
failureManager,
- readOperationForCompactionTracker
+ readOperationForCompactionTracker,
+ commonScheduledExecutorService
);
var topologyAwareRaftGroupServiceFactory = new
TopologyAwareRaftGroupServiceFactory(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 8269867073..f78f041f33 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -120,7 +120,9 @@ import
org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorServiceImpl;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.table.distributed.schema.AlwaysSyncedSchemaSyncService;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
@@ -145,7 +147,7 @@ import org.mockito.quality.Strictness;
/**
* Table manager recovery scenarios.
*/
-@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
+@ExtendWith({MockitoExtension.class, ConfigurationExtension.class,
ExecutorServiceExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public class TableManagerRecoveryTest extends IgniteAbstractTest {
private static final String NODE_NAME = "testNode1";
@@ -191,6 +193,9 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
private final DataStorageModule dataStorageModule =
createDataStorageModule();
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
@AfterEach
void after() throws Exception {
stopComponents();
@@ -270,7 +275,13 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
- var storage = new RocksDbKeyValueStorage(NODE_NAME, workDir, new
NoOpFailureManager(), readOperationForCompactionTracker);
+ var storage = new RocksDbKeyValueStorage(
+ NODE_NAME,
+ workDir,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
clock = new HybridClockImpl();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
index f17638efe8..a6c94f6ecc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorageRecoveryTest.java
@@ -47,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.RunnableX;
@@ -55,6 +56,8 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -63,17 +66,27 @@ import org.junit.jupiter.api.extension.ExtendWith;
/** For testing recovery of {@link IndexMetaStorage}. */
@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
public class IndexMetaStorageRecoveryTest extends BaseIndexMetaStorageTest {
@WorkDirectory
private Path workDir;
+ @InjectExecutorService
+ private ScheduledExecutorService scheduledExecutorService;
+
private final TestUpdateHandlerInterceptor interceptor = new
TestUpdateHandlerInterceptor();
@Override
MetaStorageManager createMetastore() {
var readOperationForCompactionTracker = new
ReadOperationForCompactionTracker();
- var keyValueStorage = new RocksDbKeyValueStorage(NODE_NAME, workDir,
new NoOpFailureManager(), readOperationForCompactionTracker);
+ var keyValueStorage = new RocksDbKeyValueStorage(
+ NODE_NAME,
+ workDir,
+ new NoOpFailureManager(),
+ readOperationForCompactionTracker,
+ scheduledExecutorService
+ );
return StandaloneMetaStorageManager.create(keyValueStorage,
readOperationForCompactionTracker);
}