This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3fed7c2305 IGNITE-22933 Replace ThreadPoolExecutors with a
corePoolSize equal to 0 (#4190)
3fed7c2305 is described below
commit 3fed7c2305d5badb2d4bcdf8d29e524b7a8842ba
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Aug 7 11:46:52 2024 +0400
IGNITE-22933 Replace ThreadPoolExecutors with a corePoolSize equal to 0
(#4190)
---
.../apache/ignite/internal/hlc/ClockWaiter.java | 18 ++++++++++------
.../network/file/FileTransferServiceImpl.java | 25 ++++++++++++++--------
.../internal/network/netty/ConnectionManager.java | 9 +++++---
.../ItPlacementDriverReplicaSideTest.java | 9 +++-----
.../internal/replicator/ReplicaManagerTest.java | 9 +++-----
.../PersistentPageMemoryStorageEngine.java | 7 ++++--
.../VolatilePageMemoryStorageEngine.java | 7 ++++--
.../ignite/distributed/ReplicaUnavailableTest.java | 9 +++-----
.../outgoing/OutgoingSnapshotsManager.java | 13 ++++++-----
.../table/distributed/TableManagerTest.java | 9 +++-----
.../apache/ignite/distributed/ItTxTestCluster.java | 9 +++-----
11 files changed, 66 insertions(+), 58 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java
index b43b30b2a0..7c6075b56f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockWaiter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.hlc;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.concurrent.CancellationException;
@@ -80,14 +81,17 @@ public class ClockWaiter implements IgniteComponent {
this.nodeName = nodeName;
this.clock = clock;
- futureExecutor = new ThreadPoolExecutor(
- 0,
- 4,
- 1,
- TimeUnit.MINUTES,
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ 2,
+ 2,
+ 10,
+ SECONDS,
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(nodeName,
"clock-waiter-future-executor", log)
);
+ executor.allowCoreThreadTimeOut(true);
+
+ futureExecutor = executor;
}
@Override
@@ -118,10 +122,10 @@ public class ClockWaiter implements IgniteComponent {
// so it's simpler to just use shutdownNow().
scheduler.shutdownNow();
- IgniteUtils.shutdownAndAwaitTermination(futureExecutor, 10,
TimeUnit.SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(futureExecutor, 10, SECONDS);
try {
- scheduler.awaitTermination(10, TimeUnit.SECONDS);
+ scheduler.awaitTermination(10, SECONDS);
} catch (InterruptedException e) {
return failedFuture(e);
}
diff --git
a/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
b/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
index 15a310a2d4..761dec728e 100644
---
a/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
+++
b/modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.network.file;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.network.file.Channel.FILE_TRANSFER_CHANNEL;
import static
org.apache.ignite.internal.network.file.messages.FileHeader.fromPaths;
import static
org.apache.ignite.internal.network.file.messages.FileTransferError.fromThrowable;
@@ -38,7 +39,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -146,16 +146,23 @@ public class FileTransferServiceImpl implements
FileTransferService {
messagingService,
configuration,
transferDirectory,
- new ThreadPoolExecutor(
- 0,
- configuration.value().threadPoolSize(),
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(),
- NamedThreadFactory.create(nodeName, "file-transfer",
LOG)
- )
+ createExecutor(nodeName, configuration)
);
}
+ private static ExecutorService createExecutor(String nodeName,
FileTransferConfiguration configuration) {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ configuration.value().threadPoolSize(),
+ configuration.value().threadPoolSize(),
+ 10, SECONDS,
+ new LinkedBlockingQueue<>(),
+ NamedThreadFactory.create(nodeName, "file-transfer", LOG)
+ );
+ executor.allowCoreThreadTimeOut(true);
+
+ return executor;
+ }
+
/**
* Constructor.
*
@@ -249,7 +256,7 @@ public class FileTransferServiceImpl implements
FileTransferService {
@Override
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
- IgniteUtils.shutdownAndAwaitTermination(executorService, 10,
TimeUnit.SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(executorService, 10, SECONDS);
return nullCompletedFuture();
}
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 1f48b85e22..df4670980c 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -218,16 +218,19 @@ public class ConnectionManager implements
ChannelCreationListener {
this.clientBootstrap = bootstrapFactory.createClientBootstrap();
- // We don't just use Executors#newSingleThreadExecutor() here because
it defines corePoolSize=1, so the maintenance thread will
+ // We don't just use Executors#newSingleThreadExecutor() here because
the maintenance thread will
// be kept alive forever, and we only need it from time to time, so it
seems a waste to keep the thread alive.
- connectionMaintenanceExecutor = new ThreadPoolExecutor(
- 0,
+ ThreadPoolExecutor maintenanceExecutor = new ThreadPoolExecutor(
+ 1,
1,
1,
SECONDS,
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(consistentId,
"connection-maintenance", LOG)
);
+ maintenanceExecutor.allowCoreThreadTimeOut(true);
+
+ connectionMaintenanceExecutor = maintenanceExecutor;
}
/**
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 5b88e751bd..f35873ec1e 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -50,9 +50,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
@@ -152,10 +151,8 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
@BeforeEach
public void beforeTest(TestInfo testInfo) {
- partitionOperationsExecutor = new ThreadPoolExecutor(
- 0, 20,
- 0, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ partitionOperationsExecutor = Executors.newFixedThreadPool(
+ 5,
NamedThreadFactory.create("test", "partition-operations", log)
);
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 380234b89d..53713a837b 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -38,9 +38,8 @@ import static org.mockito.Mockito.when;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.EventListener;
@@ -109,10 +108,8 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
var clock = new HybridClockImpl();
- requestsExecutor = new ThreadPoolExecutor(
- 0, 5,
- 0, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ requestsExecutor = Executors.newFixedThreadPool(
+ 5,
NamedThreadFactory.create(nodeName, "partition-operations",
log)
);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index c4edd357fa..96df62fd0e 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -196,14 +196,17 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
});
// TODO: remove this executor, see
https://issues.apache.org/jira/browse/IGNITE-21683
- destructionExecutor = new ThreadPoolExecutor(
- 0,
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
100,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(igniteInstanceName,
"persistent-mv-partition-destruction", LOG)
);
+ executor.allowCoreThreadTimeOut(true);
+
+ destructionExecutor = executor;
}
@Override
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
index e5cea6df02..172ab000af 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryStorageEngine.java
@@ -106,14 +106,17 @@ public class VolatilePageMemoryStorageEngine extends
AbstractPageMemoryStorageEn
});
// TODO: remove this executor, see
https://issues.apache.org/jira/browse/IGNITE-21683
- destructionExecutor = new ThreadPoolExecutor(
- 0,
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
100,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
NamedThreadFactory.create(igniteInstanceName,
"volatile-mv-partition-destruction", LOG)
);
+ executor.allowCoreThreadTimeOut(true);
+
+ destructionExecutor = executor;
}
@Override
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index aae2ca7d88..1fb38d7b4a 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -46,9 +46,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -176,10 +175,8 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
raftClient = mock(TopologyAwareRaftGroupService.class);
when(raftManager.startRaftGroupService(any(), any(), any(),
any())).thenReturn(completedFuture(raftClient));
- requestsExecutor = new ThreadPoolExecutor(
- 0, 5,
- 0, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ requestsExecutor = Executors.newFixedThreadPool(
+ 5,
NamedThreadFactory.create(NODE_NAME, "partition-operations",
log)
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
index 1ce4081dca..8bab387a8a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
import static java.util.Collections.unmodifiableList;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -105,11 +104,15 @@ public class OutgoingSnapshotsManager implements
PartitionsSnapshots, IgniteComp
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
- executor = new ThreadPoolExecutor(
- 0, 4, 0L, MILLISECONDS,
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
+ 4, 4,
+ 10, SECONDS,
new LinkedBlockingQueue<>(),
IgniteThreadFactory.create(nodeName, "outgoing-snapshots",
LOG, STORAGE_READ)
);
+ threadPoolExecutor.allowCoreThreadTimeOut(true);
+
+ executor = threadPoolExecutor;
messagingService.addMessageHandler(PartitionReplicationMessageGroup.class,
this::handleMessage);
@@ -120,7 +123,7 @@ public class OutgoingSnapshotsManager implements
PartitionsSnapshots, IgniteComp
public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
// At this moment, all RAFT groups should already be stopped, so all
snapshots are already closed and finished.
- IgniteUtils.shutdownAndAwaitTermination(executor, 10,
TimeUnit.SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(executor, 10, SECONDS);
return nullCompletedFuture();
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7dd883acdd..6aa866e20a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -64,11 +64,10 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
@@ -310,10 +309,8 @@ public class TableManagerTest extends IgniteAbstractTest {
mockMetastore();
- partitionOperationsExecutor = new ThreadPoolExecutor(
- 0, 5,
- 0, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ partitionOperationsExecutor = Executors.newFixedThreadPool(
+ 5,
IgniteThreadFactory.create("test", "partition-operations",
log, STORAGE_READ, STORAGE_WRITE)
);
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 9ae2390a08..941e99e32c 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -54,10 +54,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -371,10 +370,8 @@ public class ItTxTestCluster {
executor = new ScheduledThreadPoolExecutor(20,
new NamedThreadFactory(Loza.CLIENT_POOL_NAME, LOG));
- partitionOperationsExecutor = new ThreadPoolExecutor(
- 0, 20,
- 0, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ partitionOperationsExecutor = Executors.newFixedThreadPool(
+ 5,
NamedThreadFactory.create("test", "partition-operations", LOG)
);