This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9279e9e6529 IGNITE-26545 Fix shutdown TableManager#rebalanceScheduler
(#6668)
9279e9e6529 is described below
commit 9279e9e65293331877806f89493d917598b41a58
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Sep 30 12:08:18 2025 +0300
IGNITE-26545 Fix shutdown TableManager#rebalanceScheduler (#6668)
---
.../internal/rebalance/ItRebalanceDistributedTest.java | 12 ++----------
.../internal/distributionzones/rebalance/RebalanceUtil.java | 3 ---
.../ignite/internal/partition/replicator/fixtures/Node.java | 12 ++----------
.../ignite/internal/runner/app/ItIgniteNodeRestartTest.java | 10 ++--------
.../main/java/org/apache/ignite/internal/app/IgniteImpl.java | 10 ++--------
.../org/apache/ignite/internal/app/ThreadPoolsManager.java | 11 +++++++++++
.../ignite/internal/table/distributed/TableManager.java | 1 -
7 files changed, 19 insertions(+), 40 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9418e41262f..f1dd90ab055 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -38,7 +38,6 @@ import static
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePa
import static
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_DEFAULT;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_MS;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
@@ -97,7 +96,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -258,7 +256,6 @@ import
org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -1247,8 +1244,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
/** Failure processor. */
private final FailureManager failureManager;
- private final ScheduledExecutorService rebalanceScheduler;
-
private final LogStorageFactory logStorageFactory;
private final LogStorageFactory cmgLogStorageFactory;
@@ -1518,9 +1513,6 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
metricManager
);
- rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
- IgniteThreadFactory.create(name,
"test-rebalance-scheduler", logger()));
-
replicaManager = spy(new ReplicaManager(
name,
clusterService,
@@ -1596,7 +1588,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
failureManager,
nodeProperties,
threadPoolsManager.tableIoExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
clockService,
placementDriver,
@@ -1628,7 +1620,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
schemaManager,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.commonScheduler(),
clockService,
outgoingSnapshotManager,
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 04c109ae71c..74b03b41394 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -156,9 +156,6 @@ public class RebalanceUtil {
}
}
- /** Rebalance scheduler pool size. */
- public static final int REBALANCE_SCHEDULER_POOL_SIZE = 1;
-
/**
* Update keys that related to rebalance algorithm in Meta Storage. Keys
are specific for partition.
*
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 33a19374ebc..147070ef5f7 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -23,7 +23,6 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.BaseIgniteRestartTest.createVault;
import static
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -49,7 +48,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.LongSupplier;
@@ -180,7 +178,6 @@ import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOn
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -703,11 +700,6 @@ public class Node {
lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
params ->
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
params).newLowWatermark()));
- ScheduledExecutorService rebalanceScheduler =
Executors.newScheduledThreadPool(
- REBALANCE_SCHEDULER_POOL_SIZE,
- IgniteThreadFactory.create(name, "test-rebalance-scheduler",
LOG)
- );
-
SystemDistributedConfiguration systemDistributedConfiguration =
clusterConfigRegistry.getConfiguration(SystemDistributedExtensionConfiguration.KEY).system();
@@ -743,7 +735,7 @@ public class Node {
failureManager,
nodeProperties,
threadPoolsManager.tableIoExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
clockService,
placementDriverManager.placementDriver(),
@@ -787,7 +779,7 @@ public class Node {
schemaManager,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.commonScheduler(),
clockService,
outgoingSnapshotsManager,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 4760067c5ab..07995a28d42 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -27,7 +27,6 @@ import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
import static
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getZoneIdStrict;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
@@ -73,7 +72,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -224,7 +222,6 @@ import
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import
org.apache.ignite.internal.tx.configuration.TransactionExtensionConfiguration;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -600,9 +597,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
id -> null
);
- ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
- IgniteThreadFactory.create(name, "test-rebalance-scheduler",
logger()));
-
ReplicaManager replicaMgr = new ReplicaManager(
name,
clusterSvc,
@@ -763,7 +757,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
failureProcessor,
nodeProperties,
threadPoolsManager.tableIoExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
clockService,
placementDriverManager.placementDriver(),
@@ -795,7 +789,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
schemaManager,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.commonScheduler(),
clockService,
outgoingSnapshotManager,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ad0c23571e0..cabc8e7437d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.configuration.IgnitePaths.cmgPath;
import static
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePath;
import static
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
import static org.apache.ignite.internal.configuration.IgnitePaths.vaultPath;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
@@ -50,8 +49,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import java.util.function.LongSupplier;
@@ -908,9 +905,6 @@ public class IgniteImpl implements Ignite {
LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier =
partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
- ScheduledExecutorService rebalanceScheduler = new
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
- IgniteThreadFactory.create(name, "rebalance-scheduler", LOG));
-
// TODO: IGNITE-22222 this instantiation should be moved inside
ReplicaManager's constructor
Marshaller raftMarshaller = new
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry());
@@ -1091,7 +1085,7 @@ public class IgniteImpl implements Ignite {
failureManager,
nodeProperties,
threadPoolsManager.tableIoExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.partitionOperationsExecutor(),
clockService,
placementDriverMgr.placementDriver(),
@@ -1141,7 +1135,7 @@ public class IgniteImpl implements Ignite {
schemaManager,
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
- rebalanceScheduler,
+ threadPoolsManager.rebalanceScheduler(),
threadPoolsManager.commonScheduler(),
clockService,
outgoingSnapshotsManager,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index b3e85136e64..e64e9bfedd8 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -60,6 +60,9 @@ public class ThreadPoolsManager implements IgniteComponent {
private final ScheduledExecutorService commonScheduler;
+ /** Executor for scheduling rebalance routine. */
+ private final ScheduledExecutorService rebalanceScheduler;
+
private final MetricManager metricManager;
private final List<ThreadPoolMetricSource> metricSources;
@@ -91,6 +94,8 @@ public class ThreadPoolsManager implements IgniteComponent {
commonScheduler =
Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(nodeName,
"common-scheduler", LOG));
+ rebalanceScheduler =
Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(nodeName,
"rebalance-scheduler", LOG));
+
this.metricManager = metricManager;
metricSources = new ArrayList<>();
@@ -117,6 +122,7 @@ public class ThreadPoolsManager implements IgniteComponent {
IgniteUtils.shutdownAndAwaitTermination(tableIoExecutor, 10, SECONDS);
IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor,
10, SECONDS);
IgniteUtils.shutdownAndAwaitTermination(commonScheduler, 10, SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(rebalanceScheduler, 10,
SECONDS);
return nullCompletedFuture();
}
@@ -141,4 +147,9 @@ public class ThreadPoolsManager implements IgniteComponent {
public ScheduledExecutorService commonScheduler() {
return commonScheduler;
}
+
+ /** Returns executor for scheduling rebalance routine. */
+ public ScheduledExecutorService rebalanceScheduler() {
+ return rebalanceScheduler;
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index cafe16a47bb..f48e53c6c4a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1699,7 +1699,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
fullStateTransferIndexChooser,
() -> shutdownAndAwaitTermination(scanRequestExecutor,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() ->
shutdownAndAwaitTermination(incomingSnapshotsExecutor, shutdownTimeoutSeconds,
TimeUnit.SECONDS),
- () -> shutdownAndAwaitTermination(rebalanceScheduler,
shutdownTimeoutSeconds, TimeUnit.SECONDS),
() -> {
ScheduledExecutorService streamerFlushExecutor;