This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26960
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit ab24c1d7388765338bb23ea278d29374392c7dfe
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 6 13:54:34 2025 +0300

    IGNITE-26960 wip
---
 .../rebalance/ItRebalanceDistributedTest.java       |  3 ++-
 .../partition/replicator/fixtures/Node.java         |  3 ++-
 .../PartitionReplicaLifecycleManagerTest.java       |  3 ++-
 .../ItPlacementDriverReplicaSideTest.java           |  3 ++-
 .../ignite/internal/replicator/ReplicaManager.java  | 21 +++------------------
 .../internal/replicator/ReplicaManagerTest.java     |  3 ++-
 .../runner/app/ItIgniteNodeRestartTest.java         |  3 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java  |  3 ++-
 .../ignite/distributed/ReplicaUnavailableTest.java  |  3 ++-
 .../apache/ignite/distributed/ItTxTestCluster.java  |  3 ++-
 10 files changed, 21 insertions(+), 27 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 80f853be282..ebd17bbd316 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1530,7 +1530,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     view -> new LocalLogStorageFactory(),
                     threadPoolsManager.tableIoExecutor(),
                     replicaGrpId -> 
metaStorageManager.get(pendingPartAssignmentsQueueKey((TablePartitionId) 
replicaGrpId))
-                            .thenApply(Entry::value)
+                            .thenApply(Entry::value),
+                    threadPoolsManager.commonScheduler()
             ));
 
             LongSupplier delayDurationMsSupplier = () -> 10L;
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index e8c616a44dd..558194ea650 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -653,7 +653,8 @@ public class Node {
                 volatileLogStorageFactoryCreator,
                 threadPoolsManager.tableIoExecutor(),
                 replicaGrpId -> 
metaStorageManager.get(pendingPartAssignmentsQueueKey((ZonePartitionId) 
replicaGrpId))
-                        .thenApply(Entry::value)
+                        .thenApply(Entry::value),
+                threadPoolsManager.commonScheduler()
         );
 
         LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 79d51a28f8f..660ca666dfd 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -260,7 +260,8 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 RaftGroupOptionsConfigurer.EMPTY,
                 logStorageFactoryCreator,
                 executorService,
-                groupId -> nullCompletedFuture()
+                groupId -> nullCompletedFuture(),
+                executorService
         ));
 
         partitionReplicaLifecycleManager = new 
PartitionReplicaLifecycleManager(
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index eec9d1389bb..b96704a9d7d 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -233,7 +233,8 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                     partitionsConfigurer,
                     new VolatileLogStorageFactoryCreator(nodeName, 
workDir.resolve("volatile-log-spillout")),
                     ForkJoinPool.commonPool(),
-                    replicaGrpId -> nullCompletedFuture()
+                    replicaGrpId -> nullCompletedFuture(),
+                    ForkJoinPool.commonPool()
             );
 
             replicaManagers.put(nodeName, replicaManager);
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index fc63dc59ba2..c2da4c5d411 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -55,9 +55,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -153,10 +151,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
 
     private final Map<ReplicationGroupId, Integer> timeoutAttemptsCounters = 
new ConcurrentHashMap<>();
 
-    /** Executor for the throttled log. */
-    // TODO: IGNITE-20063 Maybe get rid of it
-    private final ThreadPoolExecutor throttledLogExecutor;
-
     private final IgniteThrottledLogger throttledLog;
 
     /** Busy lock to stop synchronously. */
@@ -249,6 +243,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      *      volatile tables.
      * @param replicaStartStopExecutor Executor for asynchronous replicas 
lifecycle management.
      * @param getPendingAssignmentsSupplier The supplier of pending 
assignments for rebalance failover purposes.
+     * @param throttledLogExecutor Executor to clean up the throttled logger 
cache.
      */
     public ReplicaManager(
             String nodeName,
@@ -266,7 +261,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             RaftGroupOptionsConfigurer partitionRaftConfigurer,
             LogStorageFactoryCreator volatileLogStorageFactoryCreator,
             Executor replicaStartStopExecutor,
-            Function<ReplicationGroupId, CompletableFuture<byte[]>> 
getPendingAssignmentsSupplier
+            Function<ReplicationGroupId, CompletableFuture<byte[]>> 
getPendingAssignmentsSupplier,
+            Executor throttledLogExecutor
     ) {
         this.clusterNetSvc = clusterNetSvc;
         this.cmgMgr = cmgMgr;
@@ -299,16 +295,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 IgniteThreadFactory.create(nodeName, 
"scheduled-idle-safe-time-sync-thread", LOG)
         );
 
-        throttledLogExecutor = new ThreadPoolExecutor(
-                1,
-                1,
-                30,
-                SECONDS,
-                new LinkedBlockingQueue<>(),
-                IgniteThreadFactory.create(nodeName, 
"throttled-log-replica-manager", LOG)
-        );
-        throttledLogExecutor.allowCoreThreadTimeOut(true);
-
         throttledLog = Loggers.toThrottledLogger(LOG, throttledLogExecutor);
     }
 
@@ -968,7 +954,6 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         int shutdownTimeoutSeconds = 10;
 
         shutdownAndAwaitTermination(scheduledIdleSafeTimeSyncExecutor, 
shutdownTimeoutSeconds, SECONDS);
-        shutdownAndAwaitTermination(throttledLogExecutor, 
shutdownTimeoutSeconds, SECONDS);
 
         // There we're closing replicas' futures that was created by requests 
and should be completed with NodeStoppingException.
         try {
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index afd74543612..9a83d085ca8 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -137,7 +137,8 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
                 partitionsConfigurer,
                 volatileLogStorageFactoryCreator,
                 ForkJoinPool.commonPool(),
-                replicaGrpId -> nullCompletedFuture()
+                replicaGrpId -> nullCompletedFuture(),
+                ForkJoinPool.commonPool()
         );
 
         assertThat(replicaManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 6f3454d6d65..11eff870ba0 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -614,7 +614,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 view -> new LocalLogStorageFactory(),
                 threadPoolsManager.tableIoExecutor(),
                 replicaGrpId -> 
metaStorageMgr.get(pendingPartAssignmentsQueueKey((TablePartitionId) 
replicaGrpId))
-                        .thenApply(Entry::value)
+                        .thenApply(Entry::value),
+                threadPoolsManager.commonScheduler()
         );
 
         TransactionInflights transactionInflights = new 
TransactionInflights(placementDriverManager.placementDriver(), clockService);
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 3b71d8f799f..67adb33c135 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -927,7 +927,8 @@ public class IgniteImpl implements Ignite {
                 volatileLogStorageFactoryCreator,
                 threadPoolsManager.tableIoExecutor(),
                 replicaGrpId -> 
metaStorageMgr.get(pendingPartAssignmentsQueueKey((TablePartitionId) 
replicaGrpId))
-                        
.thenApply(org.apache.ignite.internal.metastorage.Entry::value)
+                        
.thenApply(org.apache.ignite.internal.metastorage.Entry::value),
+                threadPoolsManager.commonScheduler()
         );
 
         metricManager.configure(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 81a1bed2158..45e416252a9 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -220,7 +220,8 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 RaftGroupOptionsConfigurer.EMPTY,
                 view -> new LocalLogStorageFactory(),
                 ForkJoinPool.commonPool(),
-                replicaGrpId -> nullCompletedFuture()
+                replicaGrpId -> nullCompletedFuture(),
+                ForkJoinPool.commonPool()
         );
 
         assertThat(replicaManager.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index f53682bb5f6..aac8a847da2 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -500,7 +500,8 @@ public class ItTxTestCluster {
                     partitionRaftConfigurer,
                     new VolatileLogStorageFactoryCreator(nodeName, 
workDir.resolve("volatile-log-spillout")),
                     ForkJoinPool.commonPool(),
-                    replicaGrpId -> nullCompletedFuture()
+                    replicaGrpId -> nullCompletedFuture(),
+                    ForkJoinPool.commonPool()
             );
 
             assertThat(replicaMgr.startAsync(new ComponentContext()), 
willCompleteSuccessfully());

Reply via email to