This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26960 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit ab24c1d7388765338bb23ea278d29374392c7dfe Author: Kirill Tkalenko <[email protected]> AuthorDate: Thu Nov 6 13:54:34 2025 +0300 IGNITE-26960 wip --- .../rebalance/ItRebalanceDistributedTest.java | 3 ++- .../partition/replicator/fixtures/Node.java | 3 ++- .../PartitionReplicaLifecycleManagerTest.java | 3 ++- .../ItPlacementDriverReplicaSideTest.java | 3 ++- .../ignite/internal/replicator/ReplicaManager.java | 21 +++------------------ .../internal/replicator/ReplicaManagerTest.java | 3 ++- .../runner/app/ItIgniteNodeRestartTest.java | 3 ++- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 ++- .../ignite/distributed/ReplicaUnavailableTest.java | 3 ++- .../apache/ignite/distributed/ItTxTestCluster.java | 3 ++- 10 files changed, 21 insertions(+), 27 deletions(-) diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 80f853be282..ebd17bbd316 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1530,7 +1530,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { view -> new LocalLogStorageFactory(), threadPoolsManager.tableIoExecutor(), replicaGrpId -> metaStorageManager.get(pendingPartAssignmentsQueueKey((TablePartitionId) replicaGrpId)) - .thenApply(Entry::value) + .thenApply(Entry::value), + threadPoolsManager.commonScheduler() )); LongSupplier delayDurationMsSupplier = () -> 10L; diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index e8c616a44dd..558194ea650 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -653,7 +653,8 @@ public class Node { volatileLogStorageFactoryCreator, threadPoolsManager.tableIoExecutor(), replicaGrpId -> metaStorageManager.get(pendingPartAssignmentsQueueKey((ZonePartitionId) replicaGrpId)) - .thenApply(Entry::value) + .thenApply(Entry::value), + threadPoolsManager.commonScheduler() ); LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS; diff --git a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java index 79d51a28f8f..660ca666dfd 100644 --- a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java +++ b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java @@ -260,7 +260,8 @@ class PartitionReplicaLifecycleManagerTest extends BaseIgniteAbstractTest { RaftGroupOptionsConfigurer.EMPTY, logStorageFactoryCreator, executorService, - groupId -> nullCompletedFuture() + groupId -> nullCompletedFuture(), + executorService )); partitionReplicaLifecycleManager = new PartitionReplicaLifecycleManager( diff --git a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java index eec9d1389bb..b96704a9d7d 100644 --- a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java +++ b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java @@ -233,7 +233,8 @@ public class ItPlacementDriverReplicaSideTest extends IgniteAbstractTest { partitionsConfigurer, new VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("volatile-log-spillout")), ForkJoinPool.commonPool(), - replicaGrpId -> nullCompletedFuture() + replicaGrpId -> nullCompletedFuture(), + ForkJoinPool.commonPool() ); replicaManagers.put(nodeName, replicaManager); diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java index fc63dc59ba2..c2da4c5d411 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java @@ -55,9 +55,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -153,10 +151,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc private final Map<ReplicationGroupId, Integer> timeoutAttemptsCounters = new ConcurrentHashMap<>(); - /** Executor for the throttled log. */ - // TODO: IGNITE-20063 Maybe get rid of it - private final ThreadPoolExecutor throttledLogExecutor; - private final IgniteThrottledLogger throttledLog; /** Busy lock to stop synchronously. */ @@ -249,6 +243,7 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc * volatile tables. * @param replicaStartStopExecutor Executor for asynchronous replicas lifecycle management. * @param getPendingAssignmentsSupplier The supplier of pending assignments for rebalance failover purposes. + * @param throttledLogExecutor Executor to clean up the throttled logger cache. */ public ReplicaManager( String nodeName, @@ -266,7 +261,8 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc RaftGroupOptionsConfigurer partitionRaftConfigurer, LogStorageFactoryCreator volatileLogStorageFactoryCreator, Executor replicaStartStopExecutor, - Function<ReplicationGroupId, CompletableFuture<byte[]>> getPendingAssignmentsSupplier + Function<ReplicationGroupId, CompletableFuture<byte[]>> getPendingAssignmentsSupplier, + Executor throttledLogExecutor ) { this.clusterNetSvc = clusterNetSvc; this.cmgMgr = cmgMgr; @@ -299,16 +295,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc IgniteThreadFactory.create(nodeName, "scheduled-idle-safe-time-sync-thread", LOG) ); - throttledLogExecutor = new ThreadPoolExecutor( - 1, - 1, - 30, - SECONDS, - new LinkedBlockingQueue<>(), - IgniteThreadFactory.create(nodeName, "throttled-log-replica-manager", LOG) - ); - throttledLogExecutor.allowCoreThreadTimeOut(true); - throttledLog = Loggers.toThrottledLogger(LOG, throttledLogExecutor); } @@ -968,7 +954,6 @@ public class ReplicaManager extends AbstractEventProducer<LocalReplicaEvent, Loc int shutdownTimeoutSeconds = 10; shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, shutdownTimeoutSeconds, SECONDS); - shutdownAndAwaitTermination(throttledLogExecutor, shutdownTimeoutSeconds, SECONDS); // There we're closing replicas' futures that was created by requests and should be completed with NodeStoppingException. try { diff --git a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java index afd74543612..9a83d085ca8 100644 --- a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java +++ b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java @@ -137,7 +137,8 @@ public class ReplicaManagerTest extends BaseIgniteAbstractTest { partitionsConfigurer, volatileLogStorageFactoryCreator, ForkJoinPool.commonPool(), - replicaGrpId -> nullCompletedFuture() + replicaGrpId -> nullCompletedFuture(), + ForkJoinPool.commonPool() ); assertThat(replicaManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 6f3454d6d65..11eff870ba0 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -614,7 +614,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { view -> new LocalLogStorageFactory(), threadPoolsManager.tableIoExecutor(), replicaGrpId -> metaStorageMgr.get(pendingPartAssignmentsQueueKey((TablePartitionId) replicaGrpId)) - .thenApply(Entry::value) + .thenApply(Entry::value), + threadPoolsManager.commonScheduler() ); TransactionInflights transactionInflights = new TransactionInflights(placementDriverManager.placementDriver(), clockService); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 3b71d8f799f..67adb33c135 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -927,7 +927,8 @@ public class IgniteImpl implements Ignite { volatileLogStorageFactoryCreator, threadPoolsManager.tableIoExecutor(), replicaGrpId -> metaStorageMgr.get(pendingPartAssignmentsQueueKey((TablePartitionId) replicaGrpId)) - .thenApply(org.apache.ignite.internal.metastorage.Entry::value) + .thenApply(org.apache.ignite.internal.metastorage.Entry::value), + threadPoolsManager.commonScheduler() ); metricManager.configure( diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java index 81a1bed2158..45e416252a9 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java @@ -220,7 +220,8 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest { RaftGroupOptionsConfigurer.EMPTY, view -> new LocalLogStorageFactory(), ForkJoinPool.commonPool(), - replicaGrpId -> nullCompletedFuture() + replicaGrpId -> nullCompletedFuture(), + ForkJoinPool.commonPool() ); assertThat(replicaManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index f53682bb5f6..aac8a847da2 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -500,7 +500,8 @@ public class ItTxTestCluster { partitionRaftConfigurer, new VolatileLogStorageFactoryCreator(nodeName, workDir.resolve("volatile-log-spillout")), ForkJoinPool.commonPool(), - replicaGrpId -> nullCompletedFuture() + replicaGrpId -> nullCompletedFuture(), + ForkJoinPool.commonPool() ); assertThat(replicaMgr.startAsync(new ComponentContext()), willCompleteSuccessfully());
