This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 09d55fc5 RATIS-1465. Use seperate channel for group heartbeat. (#561)
09d55fc5 is described below
commit 09d55fc5293a2cbce9f0c841993075c45bd05a09
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Jul 22 02:15:02 2022 +0800
RATIS-1465. Use seperate channel for group heartbeat. (#561)
---
.../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 10 ++++
.../apache/ratis/grpc/server/GrpcLogAppender.java | 63 +++++++++++++++++-----
.../grpc/server/GrpcServerProtocolClient.java | 48 +++++++++++++----
.../grpc/server/GrpcServerProtocolService.java | 7 ++-
.../org/apache/ratis/grpc/server/GrpcService.java | 8 +--
.../apache/ratis/server/impl/RaftServerImpl.java | 3 +-
.../ratis/server/leader/LogAppenderBase.java | 14 +++--
.../segmented/SegmentedRaftLogOutputStream.java | 8 ++-
.../ratis/server/util/ServerStringUtils.java | 3 +-
.../ratis/server/impl/GroupManagementBaseTest.java | 7 ++-
.../server/impl/RaftReconfigurationBaseTest.java | 4 +-
.../ratis/grpc/TestLeaderInstallSnapshot.java | 18 ++++++-
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 13 +++++
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 17 ++++--
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 14 +++++
15 files changed, 193 insertions(+), 44 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 4e95c1a3..c265ab2d 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -194,6 +194,16 @@ public interface GrpcConfigKeys {
static void setLeaderOutstandingAppendsMax(RaftProperties properties, int
maxAppend) {
setInt(properties::setInt, LEADER_OUTSTANDING_APPENDS_MAX_KEY,
maxAppend);
}
+
+ String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel";
+ boolean HEARTBEAT_CHANNEL_DEFAULT = true;
+ static boolean heartbeatChannel(RaftProperties properties) {
+ return getBoolean(properties::getBoolean, HEARTBEAT_CHANNEL_KEY,
+ HEARTBEAT_CHANNEL_DEFAULT, getDefaultLog());
+ }
+ static void setHeartbeatChannel(RaftProperties properties, boolean
useCached) {
+ setBoolean(properties::setBoolean, HEARTBEAT_CHANNEL_KEY, useCached);
+ }
}
String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 2eab1132..4b5751fd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -64,7 +64,8 @@ public class GrpcLogAppender extends LogAppenderBase {
private final TimeDuration requestTimeoutDuration;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
- private volatile StreamObserver<AppendEntriesRequestProto>
appendLogRequestObserver;
+ private volatile StreamObservers appendLogRequestObserver;
+ private final boolean useSeparateHBChannel;
private final GrpcServerMetrics grpcServerMetrics;
@@ -80,6 +81,7 @@ public class GrpcLogAppender extends LogAppenderBase {
this.maxPendingRequestsNum =
GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
this.requestTimeoutDuration =
RaftServerConfigKeys.Rpc.requestTimeout(properties);
this.installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
+ this.useSeparateHBChannel =
GrpcConfigKeys.Server.heartbeatChannel(properties);
grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(),
pendingRequests::logRequestsSize);
@@ -152,7 +154,7 @@ public class GrpcLogAppender extends LogAppenderBase {
getLeaderState().checkHealth(getFollower());
}
-
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
+
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted);
}
public long getWaitTimeMs() {
@@ -193,6 +195,11 @@ public class GrpcLogAppender extends LogAppenderBase {
return appendLogRequestObserver == null || super.shouldSendAppendEntries();
}
+ @Override
+ public boolean hasPendingDataRequests() {
+ return pendingRequests.logRequestsSize() > 0;
+ }
+
/**
* @return true iff not received first response or queue is full.
*/
@@ -204,6 +211,29 @@ public class GrpcLogAppender extends LogAppenderBase {
return !firstResponseReceived || size >= maxPendingRequestsNum;
}
+ static class StreamObservers {
+ private final StreamObserver<AppendEntriesRequestProto> appendLog;
+ private final StreamObserver<AppendEntriesRequestProto> heartbeat;
+
+ StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler
handler, boolean separateHeartbeat) {
+ this.appendLog = client.appendEntries(handler, false);
+ this.heartbeat = separateHeartbeat? client.appendEntries(handler, true):
null;
+ }
+
+ void onNext(AppendEntriesRequestProto proto) {
+ if (heartbeat != null && proto.getEntriesCount() == 0) {
+ heartbeat.onNext(proto);
+ } else {
+ appendLog.onNext(proto);
+ }
+ }
+
+ void onCompleted() {
+ appendLog.onCompleted();
+ Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted);
+ }
+ }
+
private void appendLog(boolean excludeLogEntries) throws IOException {
final AppendEntriesRequestProto pending;
final AppendEntriesRequest request;
@@ -220,26 +250,32 @@ public class GrpcLogAppender extends LogAppenderBase {
pendingRequests.put(request);
increaseNextIndex(pending);
if (appendLogRequestObserver == null) {
- appendLogRequestObserver = getClient().appendEntries(new
AppendLogResponseHandler());
+ appendLogRequestObserver = new StreamObservers(
+ getClient(), new AppendLogResponseHandler(), useSeparateHBChannel);
}
- s = appendLogRequestObserver;
}
if (isRunning()) {
- sendRequest(request, pending, s);
+ sendRequest(request, pending);
}
}
- private void sendRequest(AppendEntriesRequest request,
AppendEntriesRequestProto proto,
- StreamObserver<AppendEntriesRequestProto> s) {
+ private void sendRequest(AppendEntriesRequest request,
AppendEntriesRequestProto proto) {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
getServer().getId(), null, proto);
request.startRequestTimer();
- s.onNext(proto);
- scheduler.onTimeout(requestTimeoutDuration,
- () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
- LOG, () -> "Timeout check failed for append entry request: " +
request);
- getFollower().updateLastRpcSendTime(request.isHeartbeat());
+ boolean sent = Optional.ofNullable(appendLogRequestObserver).map(observer
-> {
+ observer.onNext(proto);
+ return true;}).isPresent();
+
+ if (sent) {
+ scheduler.onTimeout(requestTimeoutDuration,
+ () -> timeoutAppendRequest(request.getCallId(),
request.isHeartbeat()),
+ LOG, () -> "Timeout check failed for append entry request: " +
request);
+ getFollower().updateLastRpcSendTime(request.isHeartbeat());
+ } else {
+ request.stopRequestTimer();
+ }
}
private void timeoutAppendRequest(long cid, boolean heartbeat) {
@@ -247,6 +283,7 @@ public class GrpcLogAppender extends LogAppenderBase {
if (pending != null) {
LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ?
"HEARTBEAT" : "", pending);
grpcServerMetrics.onRequestTimeout(getFollowerId().toString(),
heartbeat);
+ pending.stopRequestTimer();
}
}
@@ -334,7 +371,7 @@ public class GrpcLogAppender extends LogAppenderBase {
@Override
public void onError(Throwable t) {
if (!isRunning()) {
- LOG.info("{} is stopped", GrpcLogAppender.this);
+ LOG.info("{} is already stopped", GrpcLogAppender.this);
return;
}
GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index fcf8126a..6f5d06c2 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -36,23 +36,44 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
/**
* This is a RaftClient implementation that supports streaming data to the raft
* ring. The stream implementation utilizes gRPC.
*/
public class GrpcServerProtocolClient implements Closeable {
+ // Common channel
private final ManagedChannel channel;
- private final TimeDuration requestTimeoutDuration;
- private final RaftServerProtocolServiceBlockingStub blockingStub;
+ // Channel and stub for heartbeat
+ private ManagedChannel hbChannel;
+ private RaftServerProtocolServiceStub hbAsyncStub;
private final RaftServerProtocolServiceStub asyncStub;
+ private final RaftServerProtocolServiceBlockingStub blockingStub;
+ private final boolean useSeparateHBChannel;
+
+ private final TimeDuration requestTimeoutDuration;
private static final Logger LOG =
LoggerFactory.getLogger(GrpcServerProtocolClient.class);
//visible for using in log / error messages AND to use in instrumented tests
private final RaftPeerId raftPeerId;
public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
- TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) {
+ TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean
separateHBChannel) {
raftPeerId = target.getId();
+ LOG.info("Build channel for {}", raftPeerId);
+ useSeparateHBChannel = separateHBChannel;
+ channel = buildChannel(target, flowControlWindow, tlsConfig);
+ blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
+ asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
+ if (useSeparateHBChannel) {
+ hbChannel = buildChannel(target, flowControlWindow, tlsConfig);
+ hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel);
+ }
+ requestTimeoutDuration = requestTimeout;
+ }
+
+ private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow,
+ GrpcTlsConfig tlsConfig) {
NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(target.getAddress());
@@ -81,14 +102,16 @@ public class GrpcServerProtocolClient implements Closeable
{
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
- channel = channelBuilder.flowControlWindow(flowControlWindow).build();
- blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
- asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
- this.requestTimeoutDuration = requestTimeoutDuration;
+ return channelBuilder.flowControlWindow(flowControlWindow).build();
}
@Override
public void close() {
+ LOG.info("{} Close channels", raftPeerId);
+ CompletableFuture<Void> future1;
+ if (useSeparateHBChannel) {
+ GrpcUtil.shutdownManagedChannel(hbChannel);
+ }
GrpcUtil.shutdownManagedChannel(channel);
}
@@ -108,8 +131,12 @@ public class GrpcServerProtocolClient implements Closeable
{
}
StreamObserver<AppendEntriesRequestProto> appendEntries(
- StreamObserver<AppendEntriesReplyProto> responseHandler) {
- return asyncStub.appendEntries(responseHandler);
+ StreamObserver<AppendEntriesReplyProto> responseHandler, boolean
isHeartbeat) {
+ if (isHeartbeat && useSeparateHBChannel) {
+ return hbAsyncStub.appendEntries(responseHandler);
+ } else {
+ return asyncStub.appendEntries(responseHandler);
+ }
}
StreamObserver<InstallSnapshotRequestProto> installSnapshot(
@@ -120,6 +147,9 @@ public class GrpcServerProtocolClient implements Closeable {
// short-circuit the backoff timer and make them reconnect immediately.
public void resetConnectBackoff() {
+ if (useSeparateHBChannel) {
+ hbChannel.resetConnectBackoff();
+ }
channel.resetConnectBackoff();
}
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 7f6e521f..5ad3909d 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -123,7 +123,12 @@ class GrpcServerProtocolService extends
RaftServerProtocolServiceImplBase {
.map(PendingServerRequest::getFuture)
.orElse(CompletableFuture.completedFuture(null));
try {
- process(request).thenCombine(previousFuture, (reply, v) -> {
+ process(request).exceptionally(e -> {
+ // Handle cases, such as RaftServer is paused
+ handleError(e, request);
+ current.getFuture().completeExceptionally(e);
+ return null;
+ }).thenCombine(previousFuture, (reply, v) -> {
handleReply(reply);
current.getFuture().complete(null);
return null;
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 877ed85e..9d65cba3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -126,7 +126,8 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()),
GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
- RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
+ RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()),
+ GrpcConfigKeys.Server.heartbeatChannel(server.getProperties()));
}
@SuppressWarnings("checkstyle:ParameterNumber") // private constructor
@@ -135,10 +136,11 @@ public final class GrpcService extends
RaftServerRpcWithProxy<GrpcServerProtocol
int clientPort, GrpcTlsConfig clientTlsConfig,
int serverPort, GrpcTlsConfig serverTlsConfig,
SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
- SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration) {
+ SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration,
+ boolean useSeparateHBChannel) {
super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(),
- requestTimeoutDuration, serverTlsConfig)));
+ requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel)));
if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
throw new IllegalArgumentException("Illegal configuration: "
+ RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " +
appenderBufferSize
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1954e33d..afab42cf 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1347,7 +1347,8 @@ class RaftServerImpl implements RaftServer.Division,
logAppendEntries(isHeartbeat,
() -> getMemberId() + ": receive appendEntries(" + leaderId + ", " +
leaderTerm + ", "
+ previous + ", " + leaderCommit + ", " + initializing
- + ", commits" + ProtoUtils.toString(commitInfos)
+ + ", commits:" + ProtoUtils.toString(commitInfos)
+ + ", cId:" + callId
+ ", entries: " + LogProtoUtils.toLogEntriesString(entries));
final long currentTerm;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 190f45bc..50f9887f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -109,6 +109,10 @@ public abstract class LogAppenderBase implements
LogAppender {
return leaderState;
}
+ public boolean hasPendingDataRequests() {
+ return false;
+ }
+
private TermIndex getPrevious(long nextIndex) {
if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
return null;
@@ -132,17 +136,19 @@ public abstract class LogAppenderBase implements
LogAppender {
}
@Override
- public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat) throws RaftLogIOException {
- final TermIndex previous = getPrevious(follower.getNextIndex());
- final long snapshotIndex = follower.getSnapshotIndex();
+ public AppendEntriesRequestProto newAppendEntriesRequest(long callId,
boolean heartbeat)
+ throws RaftLogIOException {
final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
+ final TermIndex previous = getPrevious(follower.getNextIndex());
if (heartbeatWaitTimeMs <= 0L || heartbeat) {
// heartbeat
- return leaderState.newAppendEntriesRequestProto(follower,
Collections.emptyList(), previous, callId);
+ return leaderState.newAppendEntriesRequestProto(follower,
Collections.emptyList(),
+ hasPendingDataRequests()? null : previous, callId);
}
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " +
buffer.getNumElements() + " elements.");
+ final long snapshotIndex = follower.getSnapshotIndex();
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
final long halfMs = heartbeatWaitTimeMs/2;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
index 3992638c..22eebac9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java
@@ -119,7 +119,9 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
try {
out.flush();
} catch (IOException ioe) {
- throw new IOException("Failed to flush " + this, ioe);
+ String msg = "Failed to flush " + this;
+ LOG.error(msg, ioe);
+ throw new IOException(msg, ioe);
}
}
@@ -127,7 +129,9 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
try {
return out.asyncFlush(executor);
} catch (IOException ioe) {
- throw new IOException("Failed to flush " + this, ioe);
+ String msg = "Failed to asyncFlush " + this;
+ LOG.error(msg, ioe);
+ throw new IOException(msg, ioe);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
index 1a08e442..25223c0f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
@@ -58,7 +58,8 @@ public final class ServerStringUtils {
+ "-t" + reply.getTerm()
+ "," + reply.getResult()
+ ",nextIndex=" + reply.getNextIndex()
- + ",followerCommit=" + reply.getFollowerCommit();
+ + ",followerCommit=" + reply.getFollowerCommit()
+ + ",matchIndex=" + reply.getMatchIndex();
}
public static String
toInstallSnapshotRequestString(InstallSnapshotRequestProto request) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 86481af2..2c00892f 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
@@ -66,8 +67,10 @@ public abstract class GroupManagementBaseTest extends
BaseTest {
static {
// avoid flaky behaviour in CI environment
- RaftServerConfigKeys.Rpc.setTimeoutMin(prop, TimeDuration.valueOf(300,
TimeUnit.MILLISECONDS));
- RaftServerConfigKeys.Rpc.setTimeoutMax(prop, TimeDuration.valueOf(600,
TimeUnit.MILLISECONDS));
+ RaftServerConfigKeys.Rpc.setTimeoutMin(prop, TimeDuration.valueOf(1500,
TimeUnit.MILLISECONDS));
+ RaftServerConfigKeys.Rpc.setTimeoutMax(prop, TimeDuration.valueOf(2000,
TimeUnit.MILLISECONDS));
+ // it takes 5s+ to finish the blocking group add call
+ RaftClientConfigKeys.Rpc.setRequestTimeout(prop, TimeDuration.valueOf(12,
TimeUnit.SECONDS));
}
public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster>
getClusterFactory();
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index b440b26d..e09ca19d 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -330,8 +330,6 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
void runTestReconfTimeout(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
- RaftServerTestUtil.LogCapturer logCapture =
- RaftServerTestUtil.LogCapturer.captureLogs(RaftServer.Division.LOG);
try (final RaftClient client = cluster.createClient(leaderId)) {
PeerChanges c1 = cluster.addNewPeers(2, false);
@@ -345,7 +343,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
client.getId(), leaderId, c1.allPeersInNewConf);
try {
RaftClientReply reply = sender.sendRequest(request);
- Assert.fail("did not get expected exception " + reply.toString() + "
[" + logCapture.getOutput() + "]");
+ Assert.fail("did not get expected exception " + reply.toString());
} catch (IOException e) {
Assert.assertTrue("Got exception " + e,
e instanceof ReconfigurationTimeoutException);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
index ee646941..5f7a40f0 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java
@@ -18,7 +18,23 @@
package org.apache.ratis.grpc;
import org.apache.ratis.InstallSnapshotFromLeaderTests;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
public class TestLeaderInstallSnapshot
extends InstallSnapshotFromLeaderTests<MiniRaftClusterWithGrpc>
-implements MiniRaftClusterWithGrpc.FactoryGet {}
+implements MiniRaftClusterWithGrpc.FactoryGet {
+
+ public TestLeaderInstallSnapshot(Boolean separateHeartbeat) {
+ GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(),
separateHeartbeat);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList((new Boolean[][] {{Boolean.FALSE},
{Boolean.TRUE}}));
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 36667eea..c3ad3848 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -35,14 +35,18 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
+@RunWith(Parameterized.class)
public class TestLogAppenderWithGrpc
extends LogAppenderTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
@@ -50,6 +54,15 @@ public class TestLogAppenderWithGrpc
Log4jUtils.setLogLevel(FollowerInfo.LOG, Level.DEBUG);
}
+ public TestLogAppenderWithGrpc(Boolean separateHeartbeat) {
+ GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(),
separateHeartbeat);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}}));
+ }
+
@Test
public void testPendingLimits() throws IOException, InterruptedException {
int maxAppends = 10;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 54c65175..bcf70f16 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -62,13 +62,12 @@ import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.channels.OverlappingFileLockException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.SortedMap;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -76,11 +75,21 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+@RunWith(Parameterized.class)
public class TestRaftServerWithGrpc extends BaseTest implements
MiniRaftClusterWithGrpc.FactoryGet {
{
Log4jUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL);
}
+ public TestRaftServerWithGrpc(Boolean separateHeartbeat) {
+ GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(),
separateHeartbeat);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}}));
+ }
+
@Before
public void setup() {
final RaftProperties p = getProperties();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index d5dd0c36..86f8f376 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -32,12 +32,17 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
+@RunWith(Parameterized.class)
public class TestRaftWithGrpc
extends RaftBasicTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
@@ -47,6 +52,15 @@ public class TestRaftWithGrpc
SimpleStateMachine4Testing.class, StateMachine.class);
}
+ public TestRaftWithGrpc(Boolean separateHeartbeat) {
+ GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(),
separateHeartbeat);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}}));
+ }
+
@Override
@Test
public void testWithLoad() throws Exception {