This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fed7c2305 IGNITE-22933 Replace ThreadPoolExecutors with a 
corePoolSize equal to 0 (#4190)
3fed7c2305 is described below

commit 3fed7c2305d5badb2d4bcdf8d29e524b7a8842ba
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Aug 7 11:46:52 2024 +0400

    IGNITE-22933 Replace ThreadPoolExecutors with a corePoolSize equal to 0 
(#4190)
---
 .../apache/ignite/internal/hlc/ClockWaiter.java    | 18 ++++++++++------
 .../network/file/FileTransferServiceImpl.java      | 25 ++++++++++++++--------
 .../internal/network/netty/ConnectionManager.java  |  9 +++++---
 .../ItPlacementDriverReplicaSideTest.java          |  9 +++-----
 .../internal/replicator/ReplicaManagerTest.java    |  9 +++-----
 .../PersistentPageMemoryStorageEngine.java         |  7 ++++--
 .../VolatilePageMemoryStorageEngine.java           |  7 ++++--
 .../ignite/distributed/ReplicaUnavailableTest.java |  9 +++-----
 .../outgoing/OutgoingSnapshotsManager.java         | 13 ++++++-----
 .../table/distributed/TableManagerTest.java        |  9 +++-----
 .../apache/ignite/distributed/ItTxTestCluster.java |  9 +++-----
 11 files changed, 66 insertions(+), 58 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java
index b43b30b2a0..7c6075b56f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.hlc;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.concurrent.CancellationException;
@@ -80,14 +81,17 @@ public class ClockWaiter implements IgniteComponent {
         this.nodeName = nodeName;
         this.clock = clock;
 
-        futureExecutor = new ThreadPoolExecutor(
-                0,
-                4,
-                1,
-                TimeUnit.MINUTES,
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(
+                2,
+                2,
+                10,
+                SECONDS,
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, 
"clock-waiter-future-executor", log)
         );
+        executor.allowCoreThreadTimeOut(true);
+
+        futureExecutor = executor;
     }
 
     @Override
@@ -118,10 +122,10 @@ public class ClockWaiter implements IgniteComponent {
         // so it's simpler to just use shutdownNow().
         scheduler.shutdownNow();
 
-        IgniteUtils.shutdownAndAwaitTermination(futureExecutor, 10, 
TimeUnit.SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(futureExecutor, 10, SECONDS);
 
         try {
-            scheduler.awaitTermination(10, TimeUnit.SECONDS);
+            scheduler.awaitTermination(10, SECONDS);
         } catch (InterruptedException e) {
             return failedFuture(e);
         }
diff --git 
a/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
 
b/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
index 15a310a2d4..761dec728e 100644
--- 
a/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
+++ 
b/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.network.file;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.CompletableFuture.runAsync;
 import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.network.file.Channel.FILE_TRANSFER_CHANNEL;
 import static 
org.apache.ignite.internal.network.file.messages.FileHeader.fromPaths;
 import static 
org.apache.ignite.internal.network.file.messages.FileTransferError.fromThrowable;
@@ -38,7 +39,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -146,16 +146,23 @@ public class FileTransferServiceImpl implements 
FileTransferService {
                 messagingService,
                 configuration,
                 transferDirectory,
-                new ThreadPoolExecutor(
-                        0,
-                        configuration.value().threadPoolSize(),
-                        0L, TimeUnit.MILLISECONDS,
-                        new LinkedBlockingQueue<>(),
-                        NamedThreadFactory.create(nodeName, "file-transfer", 
LOG)
-                )
+                createExecutor(nodeName, configuration)
         );
     }
 
+    private static ExecutorService createExecutor(String nodeName, 
FileTransferConfiguration configuration) {
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(
+                configuration.value().threadPoolSize(),
+                configuration.value().threadPoolSize(),
+                10, SECONDS,
+                new LinkedBlockingQueue<>(),
+                NamedThreadFactory.create(nodeName, "file-transfer", LOG)
+        );
+        executor.allowCoreThreadTimeOut(true);
+
+        return executor;
+    }
+
     /**
      * Constructor.
      *
@@ -249,7 +256,7 @@ public class FileTransferServiceImpl implements 
FileTransferService {
 
     @Override
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
-        IgniteUtils.shutdownAndAwaitTermination(executorService, 10, 
TimeUnit.SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(executorService, 10, SECONDS);
 
         return nullCompletedFuture();
     }
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 1f48b85e22..df4670980c 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -218,16 +218,19 @@ public class ConnectionManager implements 
ChannelCreationListener {
 
         this.clientBootstrap = bootstrapFactory.createClientBootstrap();
 
-        // We don't just use Executors#newSingleThreadExecutor() here because 
it defines corePoolSize=1, so the maintenance thread will
+        // We don't just use Executors#newSingleThreadExecutor() here because 
the maintenance thread will
         // be kept alive forever, and we only need it from time to time, so it 
seems a waste to keep the thread alive.
-        connectionMaintenanceExecutor = new ThreadPoolExecutor(
-                0,
+        ThreadPoolExecutor maintenanceExecutor = new ThreadPoolExecutor(
+                1,
                 1,
                 1,
                 SECONDS,
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(consistentId, 
"connection-maintenance", LOG)
         );
+        maintenanceExecutor.allowCoreThreadTimeOut(true);
+
+        connectionMaintenanceExecutor = maintenanceExecutor;
     }
 
     /**
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 5b88e751bd..f35873ec1e 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -50,9 +50,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Supplier;
@@ -152,10 +151,8 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
 
     @BeforeEach
     public void beforeTest(TestInfo testInfo) {
-        partitionOperationsExecutor = new ThreadPoolExecutor(
-                0, 20,
-                0, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
+        partitionOperationsExecutor = Executors.newFixedThreadPool(
+                5,
                 NamedThreadFactory.create("test", "partition-operations", log)
         );
 
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 380234b89d..53713a837b 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -38,9 +38,8 @@ import static org.mockito.Mockito.when;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.event.EventListener;
@@ -109,10 +108,8 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
 
         var clock = new HybridClockImpl();
 
-        requestsExecutor = new ThreadPoolExecutor(
-                0, 5,
-                0, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
+        requestsExecutor = Executors.newFixedThreadPool(
+                5,
                 NamedThreadFactory.create(nodeName, "partition-operations", 
log)
         );
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index c4edd357fa..96df62fd0e 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -196,14 +196,17 @@ public class PersistentPageMemoryStorageEngine extends 
AbstractPageMemoryStorage
         });
 
         // TODO: remove this executor, see 
https://issues.apache.org/jira/browse/IGNITE-21683
-        destructionExecutor = new ThreadPoolExecutor(
-                0,
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(
+                Runtime.getRuntime().availableProcessors(),
                 Runtime.getRuntime().availableProcessors(),
                 100,
                 TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(igniteInstanceName, 
"persistent-mv-partition-destruction", LOG)
         );
+        executor.allowCoreThreadTimeOut(true);
+
+        destructionExecutor = executor;
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index e5cea6df02..172ab000af 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -106,14 +106,17 @@ public class VolatilePageMemoryStorageEngine extends 
AbstractPageMemoryStorageEn
         });
 
         // TODO: remove this executor, see 
https://issues.apache.org/jira/browse/IGNITE-21683
-        destructionExecutor = new ThreadPoolExecutor(
-                0,
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(
+                Runtime.getRuntime().availableProcessors(),
                 Runtime.getRuntime().availableProcessors(),
                 100,
                 TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(igniteInstanceName, 
"volatile-mv-partition-destruction", LOG)
         );
+        executor.allowCoreThreadTimeOut(true);
+
+        destructionExecutor = executor;
     }
 
     @Override
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index aae2ca7d88..1fb38d7b4a 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -46,9 +46,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -176,10 +175,8 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
         raftClient = mock(TopologyAwareRaftGroupService.class);
         when(raftManager.startRaftGroupService(any(), any(), any(), 
any())).thenReturn(completedFuture(raftClient));
 
-        requestsExecutor = new ThreadPoolExecutor(
-                0, 5,
-                0, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
+        requestsExecutor = Executors.newFixedThreadPool(
+                5,
                 NamedThreadFactory.create(NODE_NAME, "partition-operations", 
log)
         );
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
index 1ce4081dca..8bab387a8a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
 
 import static java.util.Collections.unmodifiableList;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -105,11 +104,15 @@ public class OutgoingSnapshotsManager implements 
PartitionsSnapshots, IgniteComp
 
     @Override
     public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
-        executor = new ThreadPoolExecutor(
-                0, 4, 0L, MILLISECONDS,
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
+                4, 4,
+                10, SECONDS,
                 new LinkedBlockingQueue<>(),
                 IgniteThreadFactory.create(nodeName, "outgoing-snapshots", 
LOG, STORAGE_READ)
         );
+        threadPoolExecutor.allowCoreThreadTimeOut(true);
+
+        executor = threadPoolExecutor;
 
         
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class, 
this::handleMessage);
 
@@ -120,7 +123,7 @@ public class OutgoingSnapshotsManager implements 
PartitionsSnapshots, IgniteComp
     public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
         // At this moment, all RAFT groups should already be stopped, so all 
snapshots are already closed and finished.
 
-        IgniteUtils.shutdownAndAwaitTermination(executor, 10, 
TimeUnit.SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(executor, 10, SECONDS);
 
         return nullCompletedFuture();
     }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7dd883acdd..6aa866e20a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -64,11 +64,10 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -310,10 +309,8 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         mockMetastore();
 
-        partitionOperationsExecutor = new ThreadPoolExecutor(
-                0, 5,
-                0, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
+        partitionOperationsExecutor = Executors.newFixedThreadPool(
+                5,
                 IgniteThreadFactory.create("test", "partition-operations", 
log, STORAGE_READ, STORAGE_WRITE)
         );
     }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 9ae2390a08..941e99e32c 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -54,10 +54,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
@@ -371,10 +370,8 @@ public class ItTxTestCluster {
         executor = new ScheduledThreadPoolExecutor(20,
                 new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
 
-        partitionOperationsExecutor = new ThreadPoolExecutor(
-                0, 20,
-                0, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(),
+        partitionOperationsExecutor = Executors.newFixedThreadPool(
+                5,
                 NamedThreadFactory.create("test", "partition-operations", LOG)
         );
 

Reply via email to