This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 513d54d637 IGNITE-22837 Invocation of the local raft client happens in
a different pool (#4209)
513d54d637 is described below
commit 513d54d637b64215b7fc04e3992aeb80faee9f66
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Aug 9 21:43:14 2024 +0300
IGNITE-22837 Invocation of the local raft client happens in a different
pool (#4209)
---
.../ignite/internal/thread/ThreadOperation.java | 4 ++-
.../apache/ignite/internal/util/IgniteUtils.java | 37 ++++++++++++++++++++++
.../internal/raft/server/impl/JraftServerImpl.java | 11 +++++--
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 8 ++++-
.../ignite/internal/replicator/ReplicaManager.java | 25 ++-------------
.../ignite/internal/app/ThreadPoolsManager.java | 16 +++++++---
6 files changed, 69 insertions(+), 32 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
index e1bd09ec7a..9cef2d122e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/ThreadOperation.java
@@ -28,7 +28,9 @@ public enum ThreadOperation {
/** Access TX State storage. */
TX_STATE_STORAGE_ACCESS,
/** Make a blocking wait (involving taking a lock or waiting on a
conditional variable or waiting for time to pass. */
- WAIT;
+ WAIT,
+ /** This permission allows tread process RAFT action request. */
+ PROCESS_RAFT_REQ;
/**
* Empty list of operations denoting that no potentially blocking/time
consuming operations are allowed
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 2f9eed2924..9871464625 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -75,6 +75,9 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.internal.thread.ThreadAttributes;
+import org.apache.ignite.internal.thread.ThreadOperation;
import org.apache.ignite.internal.util.worker.IgniteWorker;
import org.jetbrains.annotations.Nullable;
@@ -1260,4 +1263,38 @@ public class IgniteUtils {
public static CompletableFuture<Void> stopAsync(ComponentContext
componentContext, Collection<? extends IgniteComponent> components) {
return stopAsync(componentContext, components.stream());
}
+
+ /**
+ * The method checks the list of allowed operations in the current thread
and returns false if the thread is fit to continue or true if
+ * we must switch to another.
+ *
+ * @param requiredOperationPermissions Set of thread operations that have
to be supported by the current thread.
+ * @return True if we have to switch to a specific pool, otherwise we can
continue in the current thread.
+ */
+ public static boolean shouldSwitchToRequestsExecutor(ThreadOperation...
requiredOperationPermissions) {
+ if (Thread.currentThread() instanceof ThreadAttributes) {
+ ThreadAttributes thread = (ThreadAttributes)
Thread.currentThread();
+
+ for (ThreadOperation op : requiredOperationPermissions) {
+ if (!thread.allows(op)) {
+ return true;
+ }
+ }
+
+ return false;
+ } else {
+ if (PublicApiThreading.executingSyncPublicApi()) {
+ // It's a user thread, it executes a sync public API call, so
it can do anything, no switch is needed.
+ return false;
+ }
+ if (PublicApiThreading.executingAsyncPublicApi()) {
+ // It's a user thread, it executes an async public API call,
so it cannot do anything, a switch is needed.
+ return true;
+ }
+
+ // It's something else: either a JRE thread or an Ignite thread
not marked with ThreadAttributes. As we are not sure,
+ // let's switch: false negative can produce assertion errors.
+ return true;
+ }
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 2c0ee7a116..54a6c9982f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.raft.server.impl;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -39,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.stream.IntStream;
@@ -65,6 +68,7 @@ import
org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
import
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Iterator;
import org.apache.ignite.raft.jraft.JRaftUtils;
@@ -264,7 +268,10 @@ public class JraftServerImpl implements RaftServer {
opts.setSnapshotTimer(JRaftUtils.createTimer(opts,
"JRaft-SnapshotTimer"));
}
- requestExecutor = JRaftUtils.createRequestExecutor(opts);
+ requestExecutor = Executors.newFixedThreadPool(
+ opts.getRaftRpcThreadPoolSize(),
+ IgniteThreadFactory.create(opts.getServerName(),
"JRaft-Request-Processor", LOG, PROCESS_RAFT_REQ)
+ );
rpcServer = new IgniteRpcServer(
service,
@@ -397,7 +404,7 @@ public class JraftServerImpl implements RaftServer {
ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
}
- ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor);
+ IgniteUtils.shutdownAndAwaitTermination(requestExecutor, 10, SECONDS);
return nullCompletedFuture();
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 6a676223d0..f5b397deea 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -16,6 +16,8 @@
*/
package org.apache.ignite.raft.jraft.rpc.impl;
+import static
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
+import static
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -175,7 +177,11 @@ public class IgniteRpcServer implements RpcServer<Void> {
RpcProcessor<NetworkMessage> finalPrc = prc;
try {
- executor.execute(() -> finalPrc.handleRequest(new
NetworkRpcContext(executor, sender, correlationId), message));
+ if (shouldSwitchToRequestsExecutor(PROCESS_RAFT_REQ)) {
+ executor.execute(() -> finalPrc.handleRequest(new
NetworkRpcContext(executor, sender, correlationId), message));
+ } else {
+ finalPrc.handleRequest(new NetworkRpcContext(executor,
sender, correlationId), message);
+ }
} catch (RejectedExecutionException e) {
// The rejection is ok if an executor has been stopped,
otherwise it shouldn't happen.
LOG.warn("A request execution was rejected [sender={} req={}
reason={}]", sender, S.toString(message), e.getMessage());
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index b54ed44ff1..9ca9bb1edd 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -34,6 +34,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSucc
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import java.io.IOException;
@@ -111,8 +112,6 @@ import
org.apache.ignite.internal.replicator.message.TimestampAware;
import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.thread.PublicApiThreading;
-import org.apache.ignite.internal.thread.ThreadAttributes;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.ClusterNode;
@@ -355,33 +354,13 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
// If the request actually came from the network, we are already in
the correct thread that has permissions to do storage reads
// and writes.
// But if this is a local call (in the same Ignite instance), we might
still be in a thread that does not have those permissions.
- if (shouldSwitchToRequestsExecutor()) {
+ if (shouldSwitchToRequestsExecutor(STORAGE_READ, STORAGE_WRITE,
TX_STATE_STORAGE_ACCESS)) {
requestsExecutor.execute(() -> handleReplicaRequest(request,
sender, correlationId));
} else {
handleReplicaRequest(request, sender, correlationId);
}
}
- private static boolean shouldSwitchToRequestsExecutor() {
- if (Thread.currentThread() instanceof ThreadAttributes) {
- ThreadAttributes thread = (ThreadAttributes)
Thread.currentThread();
- return !thread.allows(STORAGE_READ) ||
!thread.allows(STORAGE_WRITE) || !thread.allows(TX_STATE_STORAGE_ACCESS);
- } else {
- if (PublicApiThreading.executingSyncPublicApi()) {
- // It's a user thread, it executes a sync public API call, so
it can do anything, no switch is needed.
- return false;
- }
- if (PublicApiThreading.executingAsyncPublicApi()) {
- // It's a user thread, it executes an async public API call,
so it cannot do anything, a switch is needed.
- return true;
- }
-
- // It's something else: either a JRE thread or an Ignite thread
not marked with ThreadAttributes. As we are not sure,
- // let's switch: false negative can produce assertion errors.
- return true;
- }
- }
-
private void handleReplicaRequest(ReplicaRequest request, ClusterNode
sender, @Nullable Long correlationId) {
if (!busyLock.enterBusy()) {
if (LOG.isInfoEnabled()) {
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index 0831a23d3a..ce11befebe 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.app;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.thread.ThreadOperation.PROCESS_RAFT_REQ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE_ACCESS;
@@ -72,12 +73,17 @@ public class ThreadPoolsManager implements IgniteComponent {
IgniteThreadFactory.create(nodeName, "tableManager-io", LOG,
STORAGE_READ, STORAGE_WRITE));
int partitionsOperationsThreads = Math.min(cpus * 3, 25);
- partitionOperationsExecutor = new ThreadPoolExecutor(
+ partitionOperationsExecutor = Executors.newFixedThreadPool(
partitionsOperationsThreads,
- partitionsOperationsThreads,
- 0, SECONDS,
- new LinkedBlockingQueue<>(),
- IgniteThreadFactory.create(nodeName, "partition-operations",
LOG, STORAGE_READ, STORAGE_WRITE, TX_STATE_STORAGE_ACCESS)
+ IgniteThreadFactory.create(
+ nodeName,
+ "partition-operations",
+ LOG,
+ STORAGE_READ,
+ STORAGE_WRITE,
+ TX_STATE_STORAGE_ACCESS,
+ PROCESS_RAFT_REQ
+ )
);
commonScheduler =
Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.create(nodeName,
"common-scheduler", LOG));