This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f479974 RATIS-1497. Avoid using ForkJoinPool.commonPool() in
GrpcClientProtocolService. (#587)
f479974 is described below
commit f47997461e2faee04ec68b76cc49eaaf72017f38
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Jan 24 01:05:20 2022 +0800
RATIS-1497. Avoid using ForkJoinPool.commonPool() in
GrpcClientProtocolService. (#587)
---
.../org/apache/ratis/util/ConcurrentUtils.java | 31 ++++++++++++++++++++++
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 21 +++++++++++++++
.../GrpcClientProtocolService.java | 18 ++++++++-----
.../org/apache/ratis/grpc/server/GrpcService.java | 12 +++++++--
.../ratis/netty/server/DataStreamManagement.java | 19 ++++---------
.../org/apache/ratis/OutputStreamBaseTest.java | 1 -
.../apache/ratis/grpc/TestGrpcOutputStream.java | 7 +++--
.../ratis/grpc/TestRaftOutputStreamWithGrpc.java | 7 -----
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 2 --
.../ratis/grpc/TestWatchRequestWithGrpc.java | 2 --
10 files changed, 83 insertions(+), 37 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
index a929595..f602c63 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.util;
import org.apache.ratis.util.function.CheckedFunction;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -83,4 +84,34 @@ public interface ConcurrentUtils {
return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), threadFactory);
}
+
+ /**
+ * Create a new {@link ExecutorService} with a maximum pool size.
+ * If it is cached, this method is similar to {@link
#newCachedThreadPool(int, ThreadFactory)}.
+ * Otherwise, this method is similar to {@link
java.util.concurrent.Executors#newFixedThreadPool(int)}.
+ *
+ * @param cached Use cached thread pool? If not, use a fixed thread pool.
+ * @param maximumPoolSize the maximum number of threads to allow in the pool.
+ * @param namePrefix the prefix used in the name of the threads created.
+ * @return a new {@link ExecutorService}.
+ */
+ static ExecutorService newThreadPoolWithMax(boolean cached, int
maximumPoolSize, String namePrefix) {
+ final ThreadFactory f = newThreadFactory(namePrefix);
+ return cached ? newCachedThreadPool(maximumPoolSize, f)
+ : Executors.newFixedThreadPool(maximumPoolSize, f);
+ }
+
+ /**
+ * Shutdown the given executor and wait for its termination.
+ *
+ * @param executor The executor to be shut down.
+ */
+ static void shutdownAndWait(ExecutorService executor) {
+ try {
+ executor.shutdown();
+ Preconditions.assertTrue(executor.awaitTermination(1, TimeUnit.DAYS));
+ } catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index eca51ad..a9dddbb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -157,6 +157,27 @@ public interface GrpcConfigKeys {
setInt(properties::setInt, PORT_KEY, port);
}
+ String ASYNC_REQUEST_THREAD_POOL_CACHED_KEY = PREFIX +
".async.request.thread.pool.cached";
+ boolean ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT = true;
+ static boolean asyncRequestThreadPoolCached(RaftProperties properties) {
+ return getBoolean(properties::getBoolean,
ASYNC_REQUEST_THREAD_POOL_CACHED_KEY,
+ ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT, getDefaultLog());
+ }
+ static void setAsyncRequestThreadPoolCached(RaftProperties properties,
boolean useCached) {
+ setBoolean(properties::setBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY,
useCached);
+ }
+
+ String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX +
".async.request.thread.pool.size";
+ int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32;
+ static int asyncRequestThreadPoolSize(RaftProperties properties) {
+ return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY,
+ ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
+ requireMin(0), requireMax(65536));
+ }
+ static void setAsyncRequestThreadPoolSize(RaftProperties properties, int
port) {
+ setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port);
+ }
+
String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
static GrpcTlsConfig tlsConf(Parameters parameters) {
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
similarity index 95%
rename from
ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
rename to
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 7cd8c08..9c19684 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.grpc.client;
+package org.apache.ratis.grpc.server;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.grpc.GrpcUtil;
@@ -41,14 +41,15 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
-public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase {
- public static final Logger LOG =
LoggerFactory.getLogger(GrpcClientProtocolService.class);
+class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(GrpcClientProtocolService.class);
private static class PendingOrderedRequest implements
SlidingWindow.ServerSideRequest<RaftClientReply> {
private final RaftClientRequest request;
@@ -131,12 +132,15 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
private final Supplier<RaftPeerId> idSupplier;
private final RaftClientAsynchronousProtocol protocol;
+ private final ExecutorService executor;
private final OrderedStreamObservers orderedStreamObservers = new
OrderedStreamObservers();
- public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier,
RaftClientAsynchronousProtocol protocol) {
+ GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier,
RaftClientAsynchronousProtocol protocol,
+ ExecutorService executor) {
this.idSupplier = idSupplier;
this.protocol = protocol;
+ this.executor = executor;
}
RaftPeerId getId() {
@@ -150,7 +154,7 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
return so;
}
- public void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
+ void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
LOG.debug("{}: closeAllOrderedRequestStreamObservers", getId());
orderedStreamObservers.closeAllExisting(groupId);
}
@@ -218,9 +222,9 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
CompletableFuture<Void> processClientRequest(RaftClientRequest request,
Consumer<RaftClientReply> replyHandler) {
try {
- String errMsg = LOG.isDebugEnabled() ? "processClientRequest for " +
request.toString() : "";
+ final String errMsg = LOG.isDebugEnabled() ? "processClientRequest for
" + request : "";
return protocol.submitClientRequestAsync(request
- ).thenAcceptAsync(replyHandler
+ ).thenAcceptAsync(replyHandler, executor
).exceptionally(exception -> {
// TODO: the exception may be from either raft or state machine.
// Currently we skip all the following responses when getting an
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 0fbdbfc..3ce4fc5 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -17,9 +17,9 @@
*/
package org.apache.ratis.grpc.server;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
@@ -44,6 +44,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import static
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
@@ -107,6 +108,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
private final Supplier<InetSocketAddress> clientServerAddressSupplier;
private final Supplier<InetSocketAddress> adminServerAddressSupplier;
+ private final ExecutorService executor;
private final GrpcClientProtocolService clientProtocolService;
private final MetricServerInterceptor serverInterceptor;
@@ -143,7 +145,12 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
+ " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " +
grpcMessageSizeMax);
}
- this.clientProtocolService = new GrpcClientProtocolService(idSupplier,
raftServer);
+ final RaftProperties properties = raftServer.getProperties();
+ this.executor = ConcurrentUtils.newThreadPoolWithMax(
+ GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties),
+ GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
+ getId() + "-request-");
+ this.clientProtocolService = new GrpcClientProtocolService(idSupplier,
raftServer, executor);
this.serverInterceptor = new MetricServerInterceptor(
idSupplier,
@@ -272,6 +279,7 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
}
serverInterceptor.close();
+ ConcurrentUtils.shutdownAndWait(executor);
}
@Override
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 31634bb..37cf90e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -68,7 +68,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -219,21 +218,13 @@ public class DataStreamManagement {
this.name = server.getId() + "-" +
JavaUtils.getClassSimpleName(getClass());
final RaftProperties properties = server.getProperties();
- Boolean useCachedThreadPool =
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
- if(useCachedThreadPool) {
- this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
+ final boolean useCachedThreadPool =
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
+ this.requestExecutor =
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
- ConcurrentUtils.newThreadFactory(name + "-request-"));
- this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
+ name + "-request-");
+ this.writeExecutor =
ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
- ConcurrentUtils.newThreadFactory(name + "-write-"));
- } else {
- this.requestExecutor = Executors.newFixedThreadPool(
-
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
- this.writeExecutor = Executors.newFixedThreadPool(
-
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
- }
-
+ name + "-write-");
}
private CompletableFuture<DataStream>
computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index aa69fa4..8861e2a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -108,7 +108,6 @@ public abstract class OutputStreamBaseTest<CLUSTER extends
MiniRaftCluster>
final String message = "log " + entry + " " + log.getLogEntryBodyCase()
+ " " + StringUtils.bytes2HexString(logData)
+ ", expected=" + StringUtils.bytes2HexString(expected);
- LOG.info(message);
Assert.assertArrayEquals(message, expected, logData);
count++;
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
index 77c311c..87cf758 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
@@ -34,8 +34,11 @@ import java.io.OutputStream;
public class TestGrpcOutputStream
extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
- static {
- Log4jUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
+
+ {
+ final RaftProperties p = getProperties();
+ GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(p, false);
+ GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(p, 8);
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
index 4cc82ee..fc5c91d 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java
@@ -17,18 +17,11 @@
*/
package org.apache.ratis.grpc;
-import org.apache.log4j.Level;
import org.apache.ratis.OutputStreamBaseTest;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
-import org.apache.ratis.util.Log4jUtils;
public class TestRaftOutputStreamWithGrpc
extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
- {
- Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.TRACE);
- }
-
@Override
public int getGlobalTimeoutSeconds() {
return 30;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 230c488..8dcff75 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -39,7 +39,6 @@ import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
@@ -78,7 +77,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class TestRaftServerWithGrpc extends BaseTest implements
MiniRaftClusterWithGrpc.FactoryGet {
{
- Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
Log4jUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
index 2861102..65c6fab 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
@@ -21,7 +21,6 @@ import org.apache.log4j.Level;
import org.apache.ratis.WatchRequestTests;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.UnorderedAsync;
-import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.grpc.client.GrpcClientRpc;
import org.apache.ratis.util.Log4jUtils;
@@ -29,7 +28,6 @@ public class TestWatchRequestWithGrpc
extends WatchRequestTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
{
- Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
Log4jUtils.setLogLevel(GrpcClientRpc.LOG, Level.ALL);
Log4jUtils.setLogLevel(UnorderedAsync.LOG, Level.ALL);
Log4jUtils.setLogLevel(RaftClient.LOG, Level.ALL);