This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9279e9e6529 IGNITE-26545 Fix shutdown TableManager#rebalanceScheduler 
(#6668)
9279e9e6529 is described below

commit 9279e9e65293331877806f89493d917598b41a58
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Sep 30 12:08:18 2025 +0300

    IGNITE-26545 Fix shutdown TableManager#rebalanceScheduler (#6668)
---
 .../internal/rebalance/ItRebalanceDistributedTest.java       | 12 ++----------
 .../internal/distributionzones/rebalance/RebalanceUtil.java  |  3 ---
 .../ignite/internal/partition/replicator/fixtures/Node.java  | 12 ++----------
 .../ignite/internal/runner/app/ItIgniteNodeRestartTest.java  | 10 ++--------
 .../main/java/org/apache/ignite/internal/app/IgniteImpl.java | 10 ++--------
 .../org/apache/ignite/internal/app/ThreadPoolsManager.java   | 11 +++++++++++
 .../ignite/internal/table/distributed/TableManager.java      |  1 -
 7 files changed, 19 insertions(+), 40 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 9418e41262f..f1dd90ab055 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -38,7 +38,6 @@ import static 
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePa
 import static 
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_DEFAULT;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_MS;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
@@ -97,7 +96,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -258,7 +256,6 @@ import 
org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -1247,8 +1244,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
         /** Failure processor. */
         private final FailureManager failureManager;
 
-        private final ScheduledExecutorService rebalanceScheduler;
-
         private final LogStorageFactory logStorageFactory;
 
         private final LogStorageFactory cmgLogStorageFactory;
@@ -1518,9 +1513,6 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     metricManager
             );
 
-            rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
-                    IgniteThreadFactory.create(name, 
"test-rebalance-scheduler", logger()));
-
             replicaManager = spy(new ReplicaManager(
                     name,
                     clusterService,
@@ -1596,7 +1588,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     failureManager,
                     nodeProperties,
                     threadPoolsManager.tableIoExecutor(),
-                    rebalanceScheduler,
+                    threadPoolsManager.rebalanceScheduler(),
                     threadPoolsManager.partitionOperationsExecutor(),
                     clockService,
                     placementDriver,
@@ -1628,7 +1620,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     schemaManager,
                     threadPoolsManager.tableIoExecutor(),
                     threadPoolsManager.partitionOperationsExecutor(),
-                    rebalanceScheduler,
+                    threadPoolsManager.rebalanceScheduler(),
                     threadPoolsManager.commonScheduler(),
                     clockService,
                     outgoingSnapshotManager,
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 04c109ae71c..74b03b41394 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -156,9 +156,6 @@ public class RebalanceUtil {
         }
     }
 
-    /** Rebalance scheduler pool size. */
-    public static final int REBALANCE_SCHEDULER_POOL_SIZE = 1;
-
     /**
      * Update keys that related to rebalance algorithm in Meta Storage. Keys 
are specific for partition.
      *
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 33a19374ebc..147070ef5f7 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -23,7 +23,6 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.BaseIgniteRestartTest.createVault;
 import static 
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -49,7 +48,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.LongSupplier;
@@ -180,7 +178,6 @@ import 
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOn
 import 
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
 import 
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
@@ -703,11 +700,6 @@ public class Node {
         lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
                 params -> 
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
 params).newLowWatermark()));
 
-        ScheduledExecutorService rebalanceScheduler = 
Executors.newScheduledThreadPool(
-                REBALANCE_SCHEDULER_POOL_SIZE,
-                IgniteThreadFactory.create(name, "test-rebalance-scheduler", 
LOG)
-        );
-
         SystemDistributedConfiguration systemDistributedConfiguration =
                 
clusterConfigRegistry.getConfiguration(SystemDistributedExtensionConfiguration.KEY).system();
 
@@ -743,7 +735,7 @@ public class Node {
                 failureManager,
                 nodeProperties,
                 threadPoolsManager.tableIoExecutor(),
-                rebalanceScheduler,
+                threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 clockService,
                 placementDriverManager.placementDriver(),
@@ -787,7 +779,7 @@ public class Node {
                 schemaManager,
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
-                rebalanceScheduler,
+                threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.commonScheduler(),
                 clockService,
                 outgoingSnapshotsManager,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 4760067c5ab..07995a28d42 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -27,7 +27,6 @@ import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
 import static 
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getZoneIdStrict;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
@@ -73,7 +72,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -224,7 +222,6 @@ import 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
 import org.apache.ignite.internal.testframework.InjectExecutorService;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
-import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import 
org.apache.ignite.internal.tx.configuration.TransactionExtensionConfiguration;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -600,9 +597,6 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 id -> null
         );
 
-        ScheduledExecutorService rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
-                IgniteThreadFactory.create(name, "test-rebalance-scheduler", 
logger()));
-
         ReplicaManager replicaMgr = new ReplicaManager(
                 name,
                 clusterSvc,
@@ -763,7 +757,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 failureProcessor,
                 nodeProperties,
                 threadPoolsManager.tableIoExecutor(),
-                rebalanceScheduler,
+                threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 clockService,
                 placementDriverManager.placementDriver(),
@@ -795,7 +789,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 schemaManager,
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
-                rebalanceScheduler,
+                threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.commonScheduler(),
                 clockService,
                 outgoingSnapshotManager,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index ad0c23571e0..cabc8e7437d 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -23,7 +23,6 @@ import static 
org.apache.ignite.internal.configuration.IgnitePaths.cmgPath;
 import static 
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePath;
 import static 
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
 import static org.apache.ignite.internal.configuration.IgnitePaths.vaultPath;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
@@ -50,8 +49,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiPredicate;
 import java.util.function.LongSupplier;
@@ -908,9 +905,6 @@ public class IgniteImpl implements Ignite {
 
         LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = 
partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
 
-        ScheduledExecutorService rebalanceScheduler = new 
ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
-                IgniteThreadFactory.create(name, "rebalance-scheduler", LOG));
-
         // TODO: IGNITE-22222 this instantiation should be moved inside 
ReplicaManager's constructor
         Marshaller raftMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry());
 
@@ -1091,7 +1085,7 @@ public class IgniteImpl implements Ignite {
                 failureManager,
                 nodeProperties,
                 threadPoolsManager.tableIoExecutor(),
-                rebalanceScheduler,
+                threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.partitionOperationsExecutor(),
                 clockService,
                 placementDriverMgr.placementDriver(),
@@ -1141,7 +1135,7 @@ public class IgniteImpl implements Ignite {
                 schemaManager,
                 threadPoolsManager.tableIoExecutor(),
                 threadPoolsManager.partitionOperationsExecutor(),
-                rebalanceScheduler,
+                threadPoolsManager.rebalanceScheduler(),
                 threadPoolsManager.commonScheduler(),
                 clockService,
                 outgoingSnapshotsManager,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index b3e85136e64..e64e9bfedd8 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -60,6 +60,9 @@ public class ThreadPoolsManager implements IgniteComponent {
 
     private final ScheduledExecutorService commonScheduler;
 
+    /** Executor for scheduling rebalance routine. */
+    private final ScheduledExecutorService rebalanceScheduler;
+
     private final MetricManager metricManager;
 
     private final List<ThreadPoolMetricSource> metricSources;
@@ -91,6 +94,8 @@ public class ThreadPoolsManager implements IgniteComponent {
 
         commonScheduler = 
Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(nodeName, 
"common-scheduler", LOG));
 
+        rebalanceScheduler = 
Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(nodeName, 
"rebalance-scheduler", LOG));
+
         this.metricManager = metricManager;
 
         metricSources = new ArrayList<>();
@@ -117,6 +122,7 @@ public class ThreadPoolsManager implements IgniteComponent {
         IgniteUtils.shutdownAndAwaitTermination(tableIoExecutor, 10, SECONDS);
         IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 
10, SECONDS);
         IgniteUtils.shutdownAndAwaitTermination(commonScheduler, 10, SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(rebalanceScheduler, 10, 
SECONDS);
 
         return nullCompletedFuture();
     }
@@ -141,4 +147,9 @@ public class ThreadPoolsManager implements IgniteComponent {
     public ScheduledExecutorService commonScheduler() {
         return commonScheduler;
     }
+
+    /** Returns executor for scheduling rebalance routine. */
+    public ScheduledExecutorService rebalanceScheduler() {
+        return rebalanceScheduler;
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index cafe16a47bb..f48e53c6c4a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1699,7 +1699,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     fullStateTransferIndexChooser,
                     () -> shutdownAndAwaitTermination(scanRequestExecutor, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> 
shutdownAndAwaitTermination(incomingSnapshotsExecutor, shutdownTimeoutSeconds, 
TimeUnit.SECONDS),
-                    () -> shutdownAndAwaitTermination(rebalanceScheduler, 
shutdownTimeoutSeconds, TimeUnit.SECONDS),
                     () -> {
                         ScheduledExecutorService streamerFlushExecutor;
 

Reply via email to