This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 208800a809630b27325ce039cf5312b69eac7bf9
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Jul 22 02:15:02 2022 +0800

    RATIS-1465. Use seperate channel for group heartbeat. (#561)
    
    (cherry picked from commit 09d55fc5293a2cbce9f0c841993075c45bd05a09)
---
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 10 ++++
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 63 +++++++++++++++++-----
 .../grpc/server/GrpcServerProtocolClient.java      | 48 +++++++++++++----
 .../grpc/server/GrpcServerProtocolService.java     |  7 ++-
 .../org/apache/ratis/grpc/server/GrpcService.java  |  8 +--
 .../apache/ratis/server/impl/RaftServerImpl.java   |  3 +-
 .../ratis/server/leader/LogAppenderBase.java       | 14 +++--
 .../segmented/SegmentedRaftLogOutputStream.java    |  8 ++-
 .../ratis/server/util/ServerStringUtils.java       |  3 +-
 .../ratis/server/impl/GroupManagementBaseTest.java |  7 ++-
 .../server/impl/RaftReconfigurationBaseTest.java   |  4 +-
 .../ratis/grpc/TestLeaderInstallSnapshot.java      | 18 ++++++-
 .../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 13 +++++
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 17 ++++--
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java    | 14 +++++
 15 files changed, 193 insertions(+), 44 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 4e95c1a3..c265ab2d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -194,6 +194,16 @@ public interface GrpcConfigKeys {
     static void setLeaderOutstandingAppendsMax(RaftProperties properties, int 
maxAppend) {
       setInt(properties::setInt, LEADER_OUTSTANDING_APPENDS_MAX_KEY, 
maxAppend);
     }
+
+    String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel";
+    boolean HEARTBEAT_CHANNEL_DEFAULT = true;
+    static boolean heartbeatChannel(RaftProperties properties) {
+      return getBoolean(properties::getBoolean, HEARTBEAT_CHANNEL_KEY,
+              HEARTBEAT_CHANNEL_DEFAULT, getDefaultLog());
+    }
+    static void setHeartbeatChannel(RaftProperties properties, boolean 
useCached) {
+      setBoolean(properties::setBoolean, HEARTBEAT_CHANNEL_KEY, useCached);
+    }
   }
 
   String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 2eab1132..4b5751fd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -64,7 +64,8 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final TimeDuration requestTimeoutDuration;
   private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
-  private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
+  private volatile StreamObservers appendLogRequestObserver;
+  private final boolean useSeparateHBChannel;
 
   private final GrpcServerMetrics grpcServerMetrics;
 
@@ -80,6 +81,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     this.maxPendingRequestsNum = 
GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
     this.requestTimeoutDuration = 
RaftServerConfigKeys.Rpc.requestTimeout(properties);
     this.installSnapshotEnabled = 
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
+    this.useSeparateHBChannel = 
GrpcConfigKeys.Server.heartbeatChannel(properties);
 
     grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
     grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), 
pendingRequests::logRequestsSize);
@@ -152,7 +154,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       getLeaderState().checkHealth(getFollower());
     }
 
-    
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
+    
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted);
   }
 
   public long getWaitTimeMs() {
@@ -193,6 +195,11 @@ public class GrpcLogAppender extends LogAppenderBase {
     return appendLogRequestObserver == null || super.shouldSendAppendEntries();
   }
 
+  @Override
+  public boolean hasPendingDataRequests() {
+    return pendingRequests.logRequestsSize() > 0;
+  }
+
   /**
    * @return true iff not received first response or queue is full.
    */
@@ -204,6 +211,29 @@ public class GrpcLogAppender extends LogAppenderBase {
     return !firstResponseReceived || size >= maxPendingRequestsNum;
   }
 
+  static class StreamObservers {
+    private final StreamObserver<AppendEntriesRequestProto> appendLog;
+    private final StreamObserver<AppendEntriesRequestProto> heartbeat;
+
+    StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler 
handler, boolean separateHeartbeat) {
+      this.appendLog = client.appendEntries(handler, false);
+      this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): 
null;
+    }
+
+    void onNext(AppendEntriesRequestProto proto) {
+      if (heartbeat != null && proto.getEntriesCount() == 0) {
+        heartbeat.onNext(proto);
+      } else {
+        appendLog.onNext(proto);
+      }
+    }
+
+    void onCompleted() {
+      appendLog.onCompleted();
+      Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted);
+    }
+  }
+
   private void appendLog(boolean excludeLogEntries) throws IOException {
     final AppendEntriesRequestProto pending;
     final AppendEntriesRequest request;
@@ -220,26 +250,32 @@ public class GrpcLogAppender extends LogAppenderBase {
       pendingRequests.put(request);
       increaseNextIndex(pending);
       if (appendLogRequestObserver == null) {
-        appendLogRequestObserver = getClient().appendEntries(new 
AppendLogResponseHandler());
+        appendLogRequestObserver = new StreamObservers(
+            getClient(), new AppendLogResponseHandler(), useSeparateHBChannel);
       }
-      s = appendLogRequestObserver;
     }
 
     if (isRunning()) {
-      sendRequest(request, pending, s);
+      sendRequest(request, pending);
     }
   }
 
-  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto,
-        StreamObserver<AppendEntriesRequestProto> s) {
+  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto) {
     CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
         getServer().getId(), null, proto);
     request.startRequestTimer();
-    s.onNext(proto);
-    scheduler.onTimeout(requestTimeoutDuration,
-        () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
-        LOG, () -> "Timeout check failed for append entry request: " + 
request);
-    getFollower().updateLastRpcSendTime(request.isHeartbeat());
+    boolean sent = Optional.ofNullable(appendLogRequestObserver).map(observer 
-> {
+        observer.onNext(proto);
+        return true;}).isPresent();
+
+    if (sent) {
+      scheduler.onTimeout(requestTimeoutDuration,
+          () -> timeoutAppendRequest(request.getCallId(), 
request.isHeartbeat()),
+          LOG, () -> "Timeout check failed for append entry request: " + 
request);
+      getFollower().updateLastRpcSendTime(request.isHeartbeat());
+    } else {
+      request.stopRequestTimer();
+    }
   }
 
   private void timeoutAppendRequest(long cid, boolean heartbeat) {
@@ -247,6 +283,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     if (pending != null) {
       LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ? 
"HEARTBEAT" : "", pending);
       grpcServerMetrics.onRequestTimeout(getFollowerId().toString(), 
heartbeat);
+      pending.stopRequestTimer();
     }
   }
 
@@ -334,7 +371,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     @Override
     public void onError(Throwable t) {
       if (!isRunning()) {
-        LOG.info("{} is stopped", GrpcLogAppender.this);
+        LOG.info("{} is already stopped", GrpcLogAppender.this);
         return;
       }
       GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index fcf8126a..6f5d06c2 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -36,23 +36,44 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * This is a RaftClient implementation that supports streaming data to the raft
  * ring. The stream implementation utilizes gRPC.
  */
 public class GrpcServerProtocolClient implements Closeable {
+  // Common channel
   private final ManagedChannel channel;
-  private final TimeDuration requestTimeoutDuration;
-  private final RaftServerProtocolServiceBlockingStub blockingStub;
+  // Channel and stub for heartbeat
+  private ManagedChannel hbChannel;
+  private RaftServerProtocolServiceStub hbAsyncStub;
   private final RaftServerProtocolServiceStub asyncStub;
+  private final RaftServerProtocolServiceBlockingStub blockingStub;
+  private final boolean useSeparateHBChannel;
+
+  private final TimeDuration requestTimeoutDuration;
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcServerProtocolClient.class);
   //visible for using in log / error messages AND to use in instrumented tests
   private final RaftPeerId raftPeerId;
 
   public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
-      TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) {
+      TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean 
separateHBChannel) {
     raftPeerId = target.getId();
+    LOG.info("Build channel for {}", raftPeerId);
+    useSeparateHBChannel = separateHBChannel;
+    channel = buildChannel(target, flowControlWindow, tlsConfig);
+    blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
+    asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
+    if (useSeparateHBChannel) {
+      hbChannel = buildChannel(target, flowControlWindow, tlsConfig);
+      hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel);
+    }
+    requestTimeoutDuration = requestTimeout;
+  }
+
+  private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow,
+      GrpcTlsConfig tlsConfig) {
     NettyChannelBuilder channelBuilder =
         NettyChannelBuilder.forTarget(target.getAddress());
 
@@ -81,14 +102,16 @@ public class GrpcServerProtocolClient implements Closeable 
{
     } else {
       channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
     }
-    channel = channelBuilder.flowControlWindow(flowControlWindow).build();
-    blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
-    asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
-    this.requestTimeoutDuration = requestTimeoutDuration;
+    return channelBuilder.flowControlWindow(flowControlWindow).build();
   }
 
   @Override
   public void close() {
+    LOG.info("{} Close channels", raftPeerId);
+    CompletableFuture<Void> future1;
+    if (useSeparateHBChannel) {
+      GrpcUtil.shutdownManagedChannel(hbChannel);
+    }
     GrpcUtil.shutdownManagedChannel(channel);
   }
 
@@ -108,8 +131,12 @@ public class GrpcServerProtocolClient implements Closeable 
{
   }
 
   StreamObserver<AppendEntriesRequestProto> appendEntries(
-      StreamObserver<AppendEntriesReplyProto> responseHandler) {
-    return asyncStub.appendEntries(responseHandler);
+      StreamObserver<AppendEntriesReplyProto> responseHandler, boolean 
isHeartbeat) {
+    if (isHeartbeat && useSeparateHBChannel) {
+      return hbAsyncStub.appendEntries(responseHandler);
+    } else {
+      return asyncStub.appendEntries(responseHandler);
+    }
   }
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
@@ -120,6 +147,9 @@ public class GrpcServerProtocolClient implements Closeable {
 
   // short-circuit the backoff timer and make them reconnect immediately.
   public void resetConnectBackoff() {
+    if (useSeparateHBChannel) {
+      hbChannel.resetConnectBackoff();
+    }
     channel.resetConnectBackoff();
   }
 }
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 7f6e521f..5ad3909d 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -123,7 +123,12 @@ class GrpcServerProtocolService extends 
RaftServerProtocolServiceImplBase {
           .map(PendingServerRequest::getFuture)
           .orElse(CompletableFuture.completedFuture(null));
       try {
-        process(request).thenCombine(previousFuture, (reply, v) -> {
+        process(request).exceptionally(e -> {
+          // Handle cases, such as RaftServer is paused
+          handleError(e, request);
+          current.getFuture().completeExceptionally(e);
+          return null;
+        }).thenCombine(previousFuture, (reply, v) -> {
           handleReply(reply);
           current.getFuture().complete(null);
           return null;
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 877ed85e..9d65cba3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -126,7 +126,8 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
         GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
         
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
         GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
-        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
+        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
+        GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()));
   }
 
   @SuppressWarnings("checkstyle:ParameterNumber") // private constructor
@@ -135,10 +136,11 @@ public final class GrpcService extends 
RaftServerRpcWithProxy<GrpcServerProtocol
       int clientPort, GrpcTlsConfig clientTlsConfig,
       int serverPort, GrpcTlsConfig serverTlsConfig,
       SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
-      SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration) {
+      SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
+      boolean useSeparateHBChannel) {
     super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
         p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
-            requestTimeoutDuration, serverTlsConfig)));
+            requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
     if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
       throw new IllegalArgumentException("Illegal configuration: "
           + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + 
appenderBufferSize
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1954e33d..afab42cf 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1347,7 +1347,8 @@ class RaftServerImpl implements RaftServer.Division,
     logAppendEntries(isHeartbeat,
         () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + 
leaderTerm + ", "
             + previous + ", " + leaderCommit + ", " + initializing
-            + ", commits" + ProtoUtils.toString(commitInfos)
+            + ", commits:" + ProtoUtils.toString(commitInfos)
+            + ", cId:" + callId
             + ", entries: " + LogProtoUtils.toLogEntriesString(entries));
 
     final long currentTerm;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 190f45bc..50f9887f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -109,6 +109,10 @@ public abstract class LogAppenderBase implements 
LogAppender {
     return leaderState;
   }
 
+  public boolean hasPendingDataRequests() {
+    return false;
+  }
+
   private TermIndex getPrevious(long nextIndex) {
     if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
       return null;
@@ -132,17 +136,19 @@ public abstract class LogAppenderBase implements 
LogAppender {
   }
 
   @Override
-  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat) throws RaftLogIOException {
-    final TermIndex previous = getPrevious(follower.getNextIndex());
-    final long snapshotIndex = follower.getSnapshotIndex();
+  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat)
+      throws RaftLogIOException {
     final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
+    final TermIndex previous = getPrevious(follower.getNextIndex());
     if (heartbeatWaitTimeMs <= 0L || heartbeat) {
       // heartbeat
-      return leaderState.newAppendEntriesRequestProto(follower, 
Collections.emptyList(), previous, callId);
+      return leaderState.newAppendEntriesRequestProto(follower, 
Collections.emptyList(),
+          hasPendingDataRequests()? null : previous, callId);
     }
 
     Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + 
buffer.getNumElements() + " elements.");
 
+    final long snapshotIndex = follower.getSnapshotIndex();
     final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
     final long halfMs = heartbeatWaitTimeMs/2;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 3992638c..22eebac9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -119,7 +119,9 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
     try {
       out.flush();
     } catch (IOException ioe) {
-      throw new IOException("Failed to flush " + this, ioe);
+      String msg = "Failed to flush " + this;
+      LOG.error(msg, ioe);
+      throw new IOException(msg, ioe);
     }
   }
 
@@ -127,7 +129,9 @@ public class SegmentedRaftLogOutputStream implements 
Closeable {
     try {
       return out.asyncFlush(executor);
     } catch (IOException ioe) {
-      throw new IOException("Failed to flush " + this, ioe);
+      String msg = "Failed to asyncFlush " + this;
+      LOG.error(msg, ioe);
+      throw new IOException(msg, ioe);
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
index 1a08e442..25223c0f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
@@ -58,7 +58,8 @@ public final class ServerStringUtils {
         + "-t" + reply.getTerm()
         + "," + reply.getResult()
         + ",nextIndex=" + reply.getNextIndex()
-        + ",followerCommit=" + reply.getFollowerCommit();
+        + ",followerCommit=" + reply.getFollowerCommit()
+        + ",matchIndex=" + reply.getMatchIndex();
   }
 
   public static String 
toInstallSnapshotRequestString(InstallSnapshotRequestProto request) {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 86481af2..2c00892f 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.api.GroupManagementApi;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
@@ -66,8 +67,10 @@ public abstract class GroupManagementBaseTest extends 
BaseTest {
 
   static {
     // avoid flaky behaviour in CI environment
-    RaftServerConfigKeys.Rpc.setTimeoutMin(prop, TimeDuration.valueOf(300, 
TimeUnit.MILLISECONDS));
-    RaftServerConfigKeys.Rpc.setTimeoutMax(prop, TimeDuration.valueOf(600, 
TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Rpc.setTimeoutMin(prop, TimeDuration.valueOf(1500, 
TimeUnit.MILLISECONDS));
+    RaftServerConfigKeys.Rpc.setTimeoutMax(prop, TimeDuration.valueOf(2000, 
TimeUnit.MILLISECONDS));
+    // it takes 5s+ to finish the blocking group add call
+    RaftClientConfigKeys.Rpc.setRequestTimeout(prop, TimeDuration.valueOf(12, 
TimeUnit.SECONDS));
   }
 
   public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> 
getClusterFactory();
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index b440b26d..e09ca19d 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -330,8 +330,6 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
 
   void runTestReconfTimeout(CLUSTER cluster) throws Exception {
     final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
-    RaftServerTestUtil.LogCapturer logCapture =
-        RaftServerTestUtil.LogCapturer.captureLogs(RaftServer.Division.LOG);
 
     try (final RaftClient client = cluster.createClient(leaderId)) {
       PeerChanges c1 = cluster.addNewPeers(2, false);
@@ -345,7 +343,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER 
extends MiniRaftCluste
           client.getId(), leaderId, c1.allPeersInNewConf);
       try {
         RaftClientReply reply = sender.sendRequest(request);
-        Assert.fail("did not get expected exception " + reply.toString() + " 
[" + logCapture.getOutput() + "]");
+        Assert.fail("did not get expected exception " + reply.toString());
       } catch (IOException e) {
         Assert.assertTrue("Got exception " + e,
             e instanceof ReconfigurationTimeoutException);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
index ee646941..5f7a40f0 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
@@ -18,7 +18,23 @@
 package org.apache.ratis.grpc;
 
 import org.apache.ratis.InstallSnapshotFromLeaderTests;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
 public class TestLeaderInstallSnapshot
 extends InstallSnapshotFromLeaderTests<MiniRaftClusterWithGrpc>
-implements MiniRaftClusterWithGrpc.FactoryGet {}
+implements MiniRaftClusterWithGrpc.FactoryGet {
+
+    public TestLeaderInstallSnapshot(Boolean separateHeartbeat) {
+        GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), 
separateHeartbeat);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, 
{Boolean.TRUE}}));
+    }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 36667eea..c3ad3848 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -35,14 +35,18 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
+@RunWith(Parameterized.class)
 public class TestLogAppenderWithGrpc
     extends LogAppenderTests<MiniRaftClusterWithGrpc>
     implements MiniRaftClusterWithGrpc.FactoryGet {
@@ -50,6 +54,15 @@ public class TestLogAppenderWithGrpc
     Log4jUtils.setLogLevel(FollowerInfo.LOG, Level.DEBUG);
   }
 
+  public TestLogAppenderWithGrpc(Boolean separateHeartbeat) {
+    GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), 
separateHeartbeat);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> data() {
+    return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}}));
+  }
+
   @Test
   public void testPendingLimits() throws IOException, InterruptedException {
     int maxAppends = 10;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 54c65175..bcf70f16 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -62,13 +62,12 @@ import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.nio.channels.OverlappingFileLockException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedMap;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -76,11 +75,21 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+@RunWith(Parameterized.class)
 public class TestRaftServerWithGrpc extends BaseTest implements 
MiniRaftClusterWithGrpc.FactoryGet {
   {
     Log4jUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL);
   }
 
+  public TestRaftServerWithGrpc(Boolean separateHeartbeat) {
+    GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), 
separateHeartbeat);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> data() {
+    return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}}));
+  }
+
   @Before
   public void setup() {
     final RaftProperties p = getProperties();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index d5dd0c36..86f8f376 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -32,12 +32,17 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
+@RunWith(Parameterized.class)
 public class TestRaftWithGrpc
     extends RaftBasicTests<MiniRaftClusterWithGrpc>
     implements MiniRaftClusterWithGrpc.FactoryGet {
@@ -47,6 +52,15 @@ public class TestRaftWithGrpc
         SimpleStateMachine4Testing.class, StateMachine.class);
   }
 
+  public TestRaftWithGrpc(Boolean separateHeartbeat) {
+    GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), 
separateHeartbeat);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Boolean[]> data() {
+    return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}}));
+  }
+
   @Override
   @Test
   public void testWithLoad() throws Exception {

Reply via email to