This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 660fe53ee RATIS-2325. Create GrpcStubPool for GrpcServerProtocolClient 
(#1306)
660fe53ee is described below

commit 660fe53ee677636dfc3181cd1487e5b9151fcde4
Author: Symious <[email protected]>
AuthorDate: Tue Nov 18 00:42:09 2025 +0800

    RATIS-2325. Create GrpcStubPool for GrpcServerProtocolClient (#1306)
---
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java |   9 ++
 .../grpc/server/GrpcServerProtocolClient.java      |  51 ++++++++-
 .../apache/ratis/grpc/server/GrpcServicesImpl.java |   4 +-
 .../org/apache/ratis/grpc/server/GrpcStubPool.java | 121 +++++++++++++++++++++
 4 files changed, 181 insertions(+), 4 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index f21a9b99f..f31794ac3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -282,6 +282,15 @@ public interface GrpcConfigKeys {
     static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
       parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
     }
+
+    String STUB_POOL_SIZE_KEY = PREFIX + ".stub.pool.size";
+    int STUB_POOL_SIZE_DEFAULT = 1;
+    static int stubPoolSize(RaftProperties properties) {
+      return get(properties::getInt, STUB_POOL_SIZE_KEY, 
STUB_POOL_SIZE_DEFAULT, getDefaultLog());
+    }
+    static void setStubPoolSize(RaftProperties properties, int size) {
+      setInt(properties::setInt, STUB_POOL_SIZE_KEY, size);
+    }
   }
 
   String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 1e40a75ad..d2748c7be 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -45,6 +45,7 @@ import java.io.Closeable;
 class GrpcServerProtocolClient implements Closeable {
   // Common channel
   private final ManagedChannel channel;
+  private final GrpcStubPool<RaftServerProtocolServiceStub> pool;
   // Channel and stub for heartbeat
   private ManagedChannel hbChannel;
   private RaftServerProtocolServiceStub hbAsyncStub;
@@ -57,7 +58,7 @@ class GrpcServerProtocolClient implements Closeable {
   //visible for using in log / error messages AND to use in instrumented tests
   private final RaftPeerId raftPeerId;
 
-  GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
+  GrpcServerProtocolClient(RaftPeer target, int connections, int 
flowControlWindow,
       TimeDuration requestTimeout, SslContext sslContext, boolean 
separateHBChannel) {
     raftPeerId = target.getId();
     LOG.info("Build channel for {}", target);
@@ -70,6 +71,11 @@ class GrpcServerProtocolClient implements Closeable {
       hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel);
     }
     requestTimeoutDuration = requestTimeout;
+    this.pool = connections == 1? null : newGrpcStubPool(target.getAddress(), 
sslContext, connections);
+  }
+
+  GrpcStubPool<RaftServerProtocolServiceStub> newGrpcStubPool(String address, 
SslContext sslContext, int connections) {
+    return new GrpcStubPool<>(connections, address, sslContext, 
RaftServerProtocolServiceGrpc::newStub, 16);
   }
 
   private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, 
SslContext sslContext) {
@@ -94,6 +100,9 @@ class GrpcServerProtocolClient implements Closeable {
       GrpcUtil.shutdownManagedChannel(hbChannel);
     }
     GrpcUtil.shutdownManagedChannel(channel);
+    if (pool != null) {
+      pool.close();
+    }
   }
 
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
@@ -112,8 +121,44 @@ class GrpcServerProtocolClient implements Closeable {
   }
 
   void readIndex(ReadIndexRequestProto request, 
StreamObserver<ReadIndexReplyProto> s) {
-    asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), 
requestTimeoutDuration.getUnit())
-        .readIndex(request, s);
+    if (pool == null) {
+      asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), 
requestTimeoutDuration.getUnit())
+          .readIndex(request, s);
+    } else {
+      GrpcStubPool.Stub<RaftServerProtocolServiceStub> p;
+      try {
+        p = pool.acquire();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        s.onError(e);
+        return;
+      }
+      p.getStub().withDeadlineAfter(requestTimeoutDuration.getDuration(), 
requestTimeoutDuration.getUnit())
+          .readIndex(request, new StreamObserver<ReadIndexReplyProto>() {
+            @Override
+            public void onNext(ReadIndexReplyProto v) {
+              s.onNext(v);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+              try {
+                s.onError(t);
+              } finally {
+                p.release();
+              }
+            }
+
+            @Override
+            public void onCompleted() {
+              try {
+                s.onCompleted();
+              } finally {
+                p.release();
+              }
+            }
+          });
+    }
   }
 
   CallStreamObserver<AppendEntriesRequestProto> appendEntries(
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
index 8200aa3ef..b1af0960d 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServicesImpl.java
@@ -108,6 +108,7 @@ public final class GrpcServicesImpl
     private int serverPort;
     private SslContext serverSslContextForServer;
     private SslContext serverSslContextForClient;
+    private int serverStubPoolSize;
 
     private SizeInBytes messageSizeMax;
     private SizeInBytes flowControlWindow;
@@ -130,6 +131,7 @@ public final class GrpcServicesImpl
       this.flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, 
LOG::info);
       this.requestTimeoutDuration = 
RaftServerConfigKeys.Rpc.requestTimeout(properties);
       this.separateHeartbeatChannel = 
GrpcConfigKeys.Server.heartbeatChannel(properties);
+      this.serverStubPoolSize = GrpcConfigKeys.Server.stubPoolSize(properties);
 
       final SizeInBytes appenderBufferSize = 
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
       final SizeInBytes gap = SizeInBytes.ONE_MB;
@@ -150,7 +152,7 @@ public final class GrpcServicesImpl
     }
 
     private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer 
target) {
-      return new GrpcServerProtocolClient(target, 
flowControlWindow.getSizeInt(),
+      return new GrpcServerProtocolClient(target, serverStubPoolSize, 
flowControlWindow.getSizeInt(),
           requestTimeoutDuration, serverSslContextForClient, 
separateHeartbeatChannel);
     }
 
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
new file mode 100644
index 000000000..fd27ac996
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcStubPool.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.AbstractStub;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
+import org.apache.ratis.thirdparty.io.netty.channel.WriteBufferWaterMark;
+import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+final class GrpcStubPool<S extends AbstractStub<S>> {
+  public static final Logger LOG = LoggerFactory.getLogger(GrpcStubPool.class);
+
+  static ManagedChannel buildManagedChannel(String address, SslContext 
sslContext) {
+    NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(address)
+        .keepAliveTime(10, TimeUnit.MINUTES)
+        .keepAliveWithoutCalls(false)
+        .idleTimeout(30, TimeUnit.MINUTES)
+        .withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new 
WriteBufferWaterMark(64 << 10, 128 << 10));
+    if (sslContext != null) {
+      LOG.debug("Setting TLS for {}", address);
+      channelBuilder.useTransportSecurity().sslContext(sslContext);
+    } else {
+      channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
+    }
+    ManagedChannel ch = channelBuilder.build();
+    ch.getState(true);
+    return ch;
+  }
+
+  static final class Stub<S extends AbstractStub<S>> {
+    private final ManagedChannel ch;
+    private final S stub;
+    private final Semaphore permits;
+
+    Stub(String address, SslContext sslContext, Function<ManagedChannel, S> 
stubFactory, int maxInflight) {
+      this.ch = buildManagedChannel(address, sslContext);
+      this.stub = stubFactory.apply(ch);
+      this.permits = new Semaphore(maxInflight);
+    }
+
+    S getStub() {
+      return stub;
+    }
+
+    void release() {
+      permits.release();
+    }
+
+    void shutdown() {
+      ch.shutdown();
+    }
+  }
+
+  private final List<MemoizedSupplier<Stub<S>>> pool;
+
+  GrpcStubPool(int connections, String address, SslContext sslContext, 
Function<ManagedChannel, S> stubFactory,
+               int maxInflightPerConn) {
+    Preconditions.assertTrue(connections > 1, "connections must be > 1");
+    final List<MemoizedSupplier<Stub<S>>> tmpPool = new 
ArrayList<>(connections);
+    for (int i = 0; i < connections; i++) {
+      tmpPool.add(MemoizedSupplier.valueOf(() -> new Stub<>(address, 
sslContext, stubFactory, maxInflightPerConn)));
+    }
+    this.pool = Collections.unmodifiableList(tmpPool);
+  }
+
+  Stub<S> getStub(int i) {
+    return pool.get(i).get();
+  }
+
+  Stub<S> acquire() throws InterruptedException {
+    final int size = pool.size();
+    final int start = ThreadLocalRandom.current().nextInt(size);
+    for (int k = 0; k < size; k++) {
+      Stub<S> p = getStub((start + k) % size);
+      if (p.permits.tryAcquire()) {
+        return p;
+      }
+    }
+    final Stub<S> p = getStub(start);
+    p.permits.acquire();
+    return p;
+  }
+
+  public void close() {
+    for (MemoizedSupplier<Stub<S>> p : pool) {
+      if (p.isInitialized()) {
+        p.get().shutdown();
+      }
+    }
+  }
+}

Reply via email to