This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26545 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 303debd353c25072bc5b3b131348b53aed847816 Author: Kirill Tkalenko <[email protected]> AuthorDate: Tue Sep 30 10:14:48 2025 +0300 IGNITE-26545 wip --- .../internal/rebalance/ItRebalanceDistributedTest.java | 12 ++---------- .../internal/distributionzones/rebalance/RebalanceUtil.java | 3 --- .../ignite/internal/partition/replicator/fixtures/Node.java | 12 ++---------- .../ignite/internal/runner/app/ItIgniteNodeRestartTest.java | 10 ++-------- .../main/java/org/apache/ignite/internal/app/IgniteImpl.java | 10 ++-------- .../org/apache/ignite/internal/app/ThreadPoolsManager.java | 11 +++++++++++ .../ignite/internal/table/distributed/TableManager.java | 1 - 7 files changed, 19 insertions(+), 40 deletions(-) diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 9418e41262f..f1dd90ab055 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -38,7 +38,6 @@ import static org.apache.ignite.internal.configuration.IgnitePaths.metastoragePa import static org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_DEFAULT; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_MS; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey; @@ -97,7 +96,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -258,7 +256,6 @@ import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.testframework.TestIgnitionManager; import org.apache.ignite.internal.testframework.WorkDirectory; import org.apache.ignite.internal.testframework.WorkDirectoryExtension; -import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; @@ -1247,8 +1244,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { /** Failure processor. */ private final FailureManager failureManager; - private final ScheduledExecutorService rebalanceScheduler; - private final LogStorageFactory logStorageFactory; private final LogStorageFactory cmgLogStorageFactory; @@ -1518,9 +1513,6 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { metricManager ); - rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - IgniteThreadFactory.create(name, "test-rebalance-scheduler", logger())); - replicaManager = spy(new ReplicaManager( name, clusterService, @@ -1596,7 +1588,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { failureManager, nodeProperties, threadPoolsManager.tableIoExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.partitionOperationsExecutor(), clockService, placementDriver, @@ -1628,7 +1620,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { schemaManager, threadPoolsManager.tableIoExecutor(), threadPoolsManager.partitionOperationsExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.commonScheduler(), clockService, outgoingSnapshotManager, diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java index 84faef21361..df96f5a1c8c 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java @@ -156,9 +156,6 @@ public class RebalanceUtil { } } - /** Rebalance scheduler pool size. */ - public static final int REBALANCE_SCHEDULER_POOL_SIZE = 1; - /** * Update keys that related to rebalance algorithm in Meta Storage. Keys are specific for partition. * diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index 297ef1261a0..ab423282622 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -23,7 +23,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.internal.BaseIgniteRestartTest.createVault; import static org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey; import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; @@ -49,7 +48,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ScheduledExecutorService; import java.util.function.LongSupplier; @@ -180,7 +178,6 @@ import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOn import org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl; import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.testframework.TestIgnitionManager; -import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tx.LockManager; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; @@ -701,11 +698,6 @@ public class Node { lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED, params -> catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters) params).newLowWatermark())); - ScheduledExecutorService rebalanceScheduler = Executors.newScheduledThreadPool( - REBALANCE_SCHEDULER_POOL_SIZE, - IgniteThreadFactory.create(name, "test-rebalance-scheduler", LOG) - ); - SystemDistributedConfiguration systemDistributedConfiguration = clusterConfigRegistry.getConfiguration(SystemDistributedExtensionConfiguration.KEY).system(); @@ -741,7 +733,7 @@ public class Node { failureManager, nodeProperties, threadPoolsManager.tableIoExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.partitionOperationsExecutor(), clockService, placementDriverManager.placementDriver(), @@ -785,7 +777,7 @@ public class Node { schemaManager, threadPoolsManager.tableIoExecutor(), threadPoolsManager.partitionOperationsExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.commonScheduler(), clockService, outgoingSnapshotsManager, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 1e70034f487..842a050fcea 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -27,7 +27,6 @@ import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_ import static org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.alterZone; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.getZoneIdStrict; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey; @@ -73,7 +72,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -224,7 +222,6 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; import org.apache.ignite.internal.testframework.InjectExecutorService; import org.apache.ignite.internal.testframework.TestIgnitionManager; -import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.configuration.TransactionExtensionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; @@ -598,9 +595,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { metricManager ); - ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - IgniteThreadFactory.create(name, "test-rebalance-scheduler", logger())); - ReplicaManager replicaMgr = new ReplicaManager( name, clusterSvc, @@ -761,7 +755,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { failureProcessor, nodeProperties, threadPoolsManager.tableIoExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.partitionOperationsExecutor(), clockService, placementDriverManager.placementDriver(), @@ -793,7 +787,7 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { schemaManager, threadPoolsManager.tableIoExecutor(), threadPoolsManager.partitionOperationsExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.commonScheduler(), clockService, outgoingSnapshotManager, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index cf511826f16..43137c16d73 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -23,7 +23,6 @@ import static org.apache.ignite.internal.configuration.IgnitePaths.cmgPath; import static org.apache.ignite.internal.configuration.IgnitePaths.metastoragePath; import static org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath; import static org.apache.ignite.internal.configuration.IgnitePaths.vaultPath; -import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE; import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ; import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE; @@ -50,8 +49,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiPredicate; import java.util.function.LongSupplier; @@ -892,9 +889,6 @@ public class IgniteImpl implements Ignite { LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig); - ScheduledExecutorService rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE, - IgniteThreadFactory.create(name, "rebalance-scheduler", LOG)); - // TODO: IGNITE-22222 this instantiation should be moved inside ReplicaManager's constructor Marshaller raftMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterSvc.serializationRegistry()); @@ -1085,7 +1079,7 @@ public class IgniteImpl implements Ignite { failureManager, nodeProperties, threadPoolsManager.tableIoExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.partitionOperationsExecutor(), clockService, placementDriverMgr.placementDriver(), @@ -1135,7 +1129,7 @@ public class IgniteImpl implements Ignite { schemaManager, threadPoolsManager.tableIoExecutor(), threadPoolsManager.partitionOperationsExecutor(), - rebalanceScheduler, + threadPoolsManager.rebalanceScheduler(), threadPoolsManager.commonScheduler(), clockService, outgoingSnapshotsManager, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java index b3e85136e64..e64e9bfedd8 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java @@ -60,6 +60,9 @@ public class ThreadPoolsManager implements IgniteComponent { private final ScheduledExecutorService commonScheduler; + /** Executor for scheduling rebalance routine. */ + private final ScheduledExecutorService rebalanceScheduler; + private final MetricManager metricManager; private final List<ThreadPoolMetricSource> metricSources; @@ -91,6 +94,8 @@ public class ThreadPoolsManager implements IgniteComponent { commonScheduler = Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(nodeName, "common-scheduler", LOG)); + rebalanceScheduler = Executors.newSingleThreadScheduledExecutor(IgniteThreadFactory.create(nodeName, "rebalance-scheduler", LOG)); + this.metricManager = metricManager; metricSources = new ArrayList<>(); @@ -117,6 +122,7 @@ public class ThreadPoolsManager implements IgniteComponent { IgniteUtils.shutdownAndAwaitTermination(tableIoExecutor, 10, SECONDS); IgniteUtils.shutdownAndAwaitTermination(partitionOperationsExecutor, 10, SECONDS); IgniteUtils.shutdownAndAwaitTermination(commonScheduler, 10, SECONDS); + IgniteUtils.shutdownAndAwaitTermination(rebalanceScheduler, 10, SECONDS); return nullCompletedFuture(); } @@ -141,4 +147,9 @@ public class ThreadPoolsManager implements IgniteComponent { public ScheduledExecutorService commonScheduler() { return commonScheduler; } + + /** Returns executor for scheduling rebalance routine. */ + public ScheduledExecutorService rebalanceScheduler() { + return rebalanceScheduler; + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index cafe16a47bb..f48e53c6c4a 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -1699,7 +1699,6 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { fullStateTransferIndexChooser, () -> shutdownAndAwaitTermination(scanRequestExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> shutdownAndAwaitTermination(incomingSnapshotsExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS), - () -> shutdownAndAwaitTermination(rebalanceScheduler, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> { ScheduledExecutorService streamerFlushExecutor;
