This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 208800a809630b27325ce039cf5312b69eac7bf9 Author: Sammi Chen <[email protected]> AuthorDate: Fri Jul 22 02:15:02 2022 +0800 RATIS-1465. Use seperate channel for group heartbeat. (#561) (cherry picked from commit 09d55fc5293a2cbce9f0c841993075c45bd05a09) --- .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 10 ++++ .../apache/ratis/grpc/server/GrpcLogAppender.java | 63 +++++++++++++++++----- .../grpc/server/GrpcServerProtocolClient.java | 48 +++++++++++++---- .../grpc/server/GrpcServerProtocolService.java | 7 ++- .../org/apache/ratis/grpc/server/GrpcService.java | 8 +-- .../apache/ratis/server/impl/RaftServerImpl.java | 3 +- .../ratis/server/leader/LogAppenderBase.java | 14 +++-- .../segmented/SegmentedRaftLogOutputStream.java | 8 ++- .../ratis/server/util/ServerStringUtils.java | 3 +- .../ratis/server/impl/GroupManagementBaseTest.java | 7 ++- .../server/impl/RaftReconfigurationBaseTest.java | 4 +- .../ratis/grpc/TestLeaderInstallSnapshot.java | 18 ++++++- .../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 13 +++++ .../apache/ratis/grpc/TestRaftServerWithGrpc.java | 17 ++++-- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 14 +++++ 15 files changed, 193 insertions(+), 44 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index 4e95c1a3..c265ab2d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -194,6 +194,16 @@ public interface GrpcConfigKeys { static void setLeaderOutstandingAppendsMax(RaftProperties properties, int maxAppend) { setInt(properties::setInt, LEADER_OUTSTANDING_APPENDS_MAX_KEY, maxAppend); } + + String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel"; + boolean HEARTBEAT_CHANNEL_DEFAULT = true; + static boolean heartbeatChannel(RaftProperties properties) { + return getBoolean(properties::getBoolean, HEARTBEAT_CHANNEL_KEY, + HEARTBEAT_CHANNEL_DEFAULT, getDefaultLog()); + } + static void setHeartbeatChannel(RaftProperties properties, boolean useCached) { + setBoolean(properties::setBoolean, HEARTBEAT_CHANNEL_KEY, useCached); + } } String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max"; diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 2eab1132..4b5751fd 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -64,7 +64,8 @@ public class GrpcLogAppender extends LogAppenderBase { private final TimeDuration requestTimeoutDuration; private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); - private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; + private volatile StreamObservers appendLogRequestObserver; + private final boolean useSeparateHBChannel; private final GrpcServerMetrics grpcServerMetrics; @@ -80,6 +81,7 @@ public class GrpcLogAppender extends LogAppenderBase { this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties); this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties); this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); + this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString()); grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize); @@ -152,7 +154,7 @@ public class GrpcLogAppender extends LogAppenderBase { getLeaderState().checkHealth(getFollower()); } - Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted); + Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted); } public long getWaitTimeMs() { @@ -193,6 +195,11 @@ public class GrpcLogAppender extends LogAppenderBase { return appendLogRequestObserver == null || super.shouldSendAppendEntries(); } + @Override + public boolean hasPendingDataRequests() { + return pendingRequests.logRequestsSize() > 0; + } + /** * @return true iff not received first response or queue is full. */ @@ -204,6 +211,29 @@ public class GrpcLogAppender extends LogAppenderBase { return !firstResponseReceived || size >= maxPendingRequestsNum; } + static class StreamObservers { + private final StreamObserver<AppendEntriesRequestProto> appendLog; + private final StreamObserver<AppendEntriesRequestProto> heartbeat; + + StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat) { + this.appendLog = client.appendEntries(handler, false); + this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null; + } + + void onNext(AppendEntriesRequestProto proto) { + if (heartbeat != null && proto.getEntriesCount() == 0) { + heartbeat.onNext(proto); + } else { + appendLog.onNext(proto); + } + } + + void onCompleted() { + appendLog.onCompleted(); + Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted); + } + } + private void appendLog(boolean excludeLogEntries) throws IOException { final AppendEntriesRequestProto pending; final AppendEntriesRequest request; @@ -220,26 +250,32 @@ public class GrpcLogAppender extends LogAppenderBase { pendingRequests.put(request); increaseNextIndex(pending); if (appendLogRequestObserver == null) { - appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler()); + appendLogRequestObserver = new StreamObservers( + getClient(), new AppendLogResponseHandler(), useSeparateHBChannel); } - s = appendLogRequestObserver; } if (isRunning()) { - sendRequest(request, pending, s); + sendRequest(request, pending); } } - private void sendRequest(AppendEntriesRequest request, AppendEntriesRequestProto proto, - StreamObserver<AppendEntriesRequestProto> s) { + private void sendRequest(AppendEntriesRequest request, AppendEntriesRequestProto proto) { CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, getServer().getId(), null, proto); request.startRequestTimer(); - s.onNext(proto); - scheduler.onTimeout(requestTimeoutDuration, - () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), - LOG, () -> "Timeout check failed for append entry request: " + request); - getFollower().updateLastRpcSendTime(request.isHeartbeat()); + boolean sent = Optional.ofNullable(appendLogRequestObserver).map(observer -> { + observer.onNext(proto); + return true;}).isPresent(); + + if (sent) { + scheduler.onTimeout(requestTimeoutDuration, + () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), + LOG, () -> "Timeout check failed for append entry request: " + request); + getFollower().updateLastRpcSendTime(request.isHeartbeat()); + } else { + request.stopRequestTimer(); + } } private void timeoutAppendRequest(long cid, boolean heartbeat) { @@ -247,6 +283,7 @@ public class GrpcLogAppender extends LogAppenderBase { if (pending != null) { LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ? "HEARTBEAT" : "", pending); grpcServerMetrics.onRequestTimeout(getFollowerId().toString(), heartbeat); + pending.stopRequestTimer(); } } @@ -334,7 +371,7 @@ public class GrpcLogAppender extends LogAppenderBase { @Override public void onError(Throwable t) { if (!isRunning()) { - LOG.info("{} is stopped", GrpcLogAppender.this); + LOG.info("{} is already stopped", GrpcLogAppender.this); return; } GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index fcf8126a..6f5d06c2 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -36,23 +36,44 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.concurrent.CompletableFuture; /** * This is a RaftClient implementation that supports streaming data to the raft * ring. The stream implementation utilizes gRPC. */ public class GrpcServerProtocolClient implements Closeable { + // Common channel private final ManagedChannel channel; - private final TimeDuration requestTimeoutDuration; - private final RaftServerProtocolServiceBlockingStub blockingStub; + // Channel and stub for heartbeat + private ManagedChannel hbChannel; + private RaftServerProtocolServiceStub hbAsyncStub; private final RaftServerProtocolServiceStub asyncStub; + private final RaftServerProtocolServiceBlockingStub blockingStub; + private final boolean useSeparateHBChannel; + + private final TimeDuration requestTimeoutDuration; private static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolClient.class); //visible for using in log / error messages AND to use in instrumented tests private final RaftPeerId raftPeerId; public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow, - TimeDuration requestTimeoutDuration, GrpcTlsConfig tlsConfig) { + TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean separateHBChannel) { raftPeerId = target.getId(); + LOG.info("Build channel for {}", raftPeerId); + useSeparateHBChannel = separateHBChannel; + channel = buildChannel(target, flowControlWindow, tlsConfig); + blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); + asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); + if (useSeparateHBChannel) { + hbChannel = buildChannel(target, flowControlWindow, tlsConfig); + hbAsyncStub = RaftServerProtocolServiceGrpc.newStub(hbChannel); + } + requestTimeoutDuration = requestTimeout; + } + + private ManagedChannel buildChannel(RaftPeer target, int flowControlWindow, + GrpcTlsConfig tlsConfig) { NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(target.getAddress()); @@ -81,14 +102,16 @@ public class GrpcServerProtocolClient implements Closeable { } else { channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } - channel = channelBuilder.flowControlWindow(flowControlWindow).build(); - blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); - asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); - this.requestTimeoutDuration = requestTimeoutDuration; + return channelBuilder.flowControlWindow(flowControlWindow).build(); } @Override public void close() { + LOG.info("{} Close channels", raftPeerId); + CompletableFuture<Void> future1; + if (useSeparateHBChannel) { + GrpcUtil.shutdownManagedChannel(hbChannel); + } GrpcUtil.shutdownManagedChannel(channel); } @@ -108,8 +131,12 @@ public class GrpcServerProtocolClient implements Closeable { } StreamObserver<AppendEntriesRequestProto> appendEntries( - StreamObserver<AppendEntriesReplyProto> responseHandler) { - return asyncStub.appendEntries(responseHandler); + StreamObserver<AppendEntriesReplyProto> responseHandler, boolean isHeartbeat) { + if (isHeartbeat && useSeparateHBChannel) { + return hbAsyncStub.appendEntries(responseHandler); + } else { + return asyncStub.appendEntries(responseHandler); + } } StreamObserver<InstallSnapshotRequestProto> installSnapshot( @@ -120,6 +147,9 @@ public class GrpcServerProtocolClient implements Closeable { // short-circuit the backoff timer and make them reconnect immediately. public void resetConnectBackoff() { + if (useSeparateHBChannel) { + hbChannel.resetConnectBackoff(); + } channel.resetConnectBackoff(); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java index 7f6e521f..5ad3909d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -123,7 +123,12 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { .map(PendingServerRequest::getFuture) .orElse(CompletableFuture.completedFuture(null)); try { - process(request).thenCombine(previousFuture, (reply, v) -> { + process(request).exceptionally(e -> { + // Handle cases, such as RaftServer is paused + handleError(e, request); + current.getFuture().completeExceptionally(e); + return null; + }).thenCombine(previousFuture, (reply, v) -> { handleReply(reply); current.getFuture().complete(null); return null; diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index 877ed85e..9d65cba3 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -126,7 +126,8 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info), RaftServerConfigKeys.Log.Appender.bufferByteLimit(server.getProperties()), GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info), - RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties())); + RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()), + GrpcConfigKeys.Server.heartbeatChannel(server.getProperties())); } @SuppressWarnings("checkstyle:ParameterNumber") // private constructor @@ -135,10 +136,11 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol int clientPort, GrpcTlsConfig clientTlsConfig, int serverPort, GrpcTlsConfig serverTlsConfig, SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, - SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration) { + SizeInBytes flowControlWindow,TimeDuration requestTimeoutDuration, + boolean useSeparateHBChannel) { super(idSupplier, id -> new PeerProxyMap<>(id.toString(), p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), - requestTimeoutDuration, serverTlsConfig))); + requestTimeoutDuration, serverTlsConfig, useSeparateHBChannel))); if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { throw new IllegalArgumentException("Illegal configuration: " + RaftServerConfigKeys.Log.Appender.BUFFER_BYTE_LIMIT_KEY + " = " + appenderBufferSize diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 1954e33d..afab42cf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1347,7 +1347,8 @@ class RaftServerImpl implements RaftServer.Division, logAppendEntries(isHeartbeat, () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + initializing - + ", commits" + ProtoUtils.toString(commitInfos) + + ", commits:" + ProtoUtils.toString(commitInfos) + + ", cId:" + callId + ", entries: " + LogProtoUtils.toLogEntriesString(entries)); final long currentTerm; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index 190f45bc..50f9887f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -109,6 +109,10 @@ public abstract class LogAppenderBase implements LogAppender { return leaderState; } + public boolean hasPendingDataRequests() { + return false; + } + private TermIndex getPrevious(long nextIndex) { if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) { return null; @@ -132,17 +136,19 @@ public abstract class LogAppenderBase implements LogAppender { } @Override - public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException { - final TermIndex previous = getPrevious(follower.getNextIndex()); - final long snapshotIndex = follower.getSnapshotIndex(); + public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) + throws RaftLogIOException { final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs(); + final TermIndex previous = getPrevious(follower.getNextIndex()); if (heartbeatWaitTimeMs <= 0L || heartbeat) { // heartbeat - return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(), previous, callId); + return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(), + hasPendingDataRequests()? null : previous, callId); } Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements."); + final long snapshotIndex = follower.getSnapshotIndex(); final long leaderNext = getRaftLog().getNextIndex(); final long followerNext = follower.getNextIndex(); final long halfMs = heartbeatWaitTimeMs/2; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java index 3992638c..22eebac9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java @@ -119,7 +119,9 @@ public class SegmentedRaftLogOutputStream implements Closeable { try { out.flush(); } catch (IOException ioe) { - throw new IOException("Failed to flush " + this, ioe); + String msg = "Failed to flush " + this; + LOG.error(msg, ioe); + throw new IOException(msg, ioe); } } @@ -127,7 +129,9 @@ public class SegmentedRaftLogOutputStream implements Closeable { try { return out.asyncFlush(executor); } catch (IOException ioe) { - throw new IOException("Failed to flush " + this, ioe); + String msg = "Failed to asyncFlush " + this; + LOG.error(msg, ioe); + throw new IOException(msg, ioe); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java index 1a08e442..25223c0f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java @@ -58,7 +58,8 @@ public final class ServerStringUtils { + "-t" + reply.getTerm() + "," + reply.getResult() + ",nextIndex=" + reply.getNextIndex() - + ",followerCommit=" + reply.getFollowerCommit(); + + ",followerCommit=" + reply.getFollowerCommit() + + ",matchIndex=" + reply.getMatchIndex(); } public static String toInstallSnapshotRequestString(InstallSnapshotRequestProto request) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java index 86481af2..2c00892f 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java @@ -21,6 +21,7 @@ import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.api.GroupManagementApi; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.exceptions.AlreadyExistsException; @@ -66,8 +67,10 @@ public abstract class GroupManagementBaseTest extends BaseTest { static { // avoid flaky behaviour in CI environment - RaftServerConfigKeys.Rpc.setTimeoutMin(prop, TimeDuration.valueOf(300, TimeUnit.MILLISECONDS)); - RaftServerConfigKeys.Rpc.setTimeoutMax(prop, TimeDuration.valueOf(600, TimeUnit.MILLISECONDS)); + RaftServerConfigKeys.Rpc.setTimeoutMin(prop, TimeDuration.valueOf(1500, TimeUnit.MILLISECONDS)); + RaftServerConfigKeys.Rpc.setTimeoutMax(prop, TimeDuration.valueOf(2000, TimeUnit.MILLISECONDS)); + // it takes 5s+ to finish the blocking group add call + RaftClientConfigKeys.Rpc.setRequestTimeout(prop, TimeDuration.valueOf(12, TimeUnit.SECONDS)); } public abstract MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory(); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index b440b26d..e09ca19d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -330,8 +330,6 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste void runTestReconfTimeout(CLUSTER cluster) throws Exception { final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); - RaftServerTestUtil.LogCapturer logCapture = - RaftServerTestUtil.LogCapturer.captureLogs(RaftServer.Division.LOG); try (final RaftClient client = cluster.createClient(leaderId)) { PeerChanges c1 = cluster.addNewPeers(2, false); @@ -345,7 +343,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste client.getId(), leaderId, c1.allPeersInNewConf); try { RaftClientReply reply = sender.sendRequest(request); - Assert.fail("did not get expected exception " + reply.toString() + " [" + logCapture.getOutput() + "]"); + Assert.fail("did not get expected exception " + reply.toString()); } catch (IOException e) { Assert.assertTrue("Got exception " + e, e instanceof ReconfigurationTimeoutException); diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java index ee646941..5f7a40f0 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLeaderInstallSnapshot.java @@ -18,7 +18,23 @@ package org.apache.ratis.grpc; import org.apache.ratis.InstallSnapshotFromLeaderTests; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; + +@RunWith(Parameterized.class) public class TestLeaderInstallSnapshot extends InstallSnapshotFromLeaderTests<MiniRaftClusterWithGrpc> -implements MiniRaftClusterWithGrpc.FactoryGet {} +implements MiniRaftClusterWithGrpc.FactoryGet { + + public TestLeaderInstallSnapshot(Boolean separateHeartbeat) { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + } + + @Parameterized.Parameters + public static Collection<Boolean[]> data() { + return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}})); + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java index 36667eea..c3ad3848 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java @@ -35,14 +35,18 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Log4jUtils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.concurrent.CompletableFuture; import static org.apache.ratis.RaftTestUtil.waitForLeader; +@RunWith(Parameterized.class) public class TestLogAppenderWithGrpc extends LogAppenderTests<MiniRaftClusterWithGrpc> implements MiniRaftClusterWithGrpc.FactoryGet { @@ -50,6 +54,15 @@ public class TestLogAppenderWithGrpc Log4jUtils.setLogLevel(FollowerInfo.LOG, Level.DEBUG); } + public TestLogAppenderWithGrpc(Boolean separateHeartbeat) { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + } + + @Parameterized.Parameters + public static Collection<Boolean[]> data() { + return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}})); + } + @Test public void testPendingLimits() throws IOException, InterruptedException { int maxAppends = 10; diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index 54c65175..bcf70f16 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -62,13 +62,12 @@ import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; import java.nio.channels.OverlappingFileLockException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.SortedMap; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -76,11 +75,21 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +@RunWith(Parameterized.class) public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet { { Log4jUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL); } + public TestRaftServerWithGrpc(Boolean separateHeartbeat) { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + } + + @Parameterized.Parameters + public static Collection<Boolean[]> data() { + return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}})); + } + @Before public void setup() { final RaftProperties p = getProperties(); diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index d5dd0c36..86f8f376 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -32,12 +32,17 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.TimeDuration; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.apache.ratis.RaftTestUtil.waitForLeader; +@RunWith(Parameterized.class) public class TestRaftWithGrpc extends RaftBasicTests<MiniRaftClusterWithGrpc> implements MiniRaftClusterWithGrpc.FactoryGet { @@ -47,6 +52,15 @@ public class TestRaftWithGrpc SimpleStateMachine4Testing.class, StateMachine.class); } + public TestRaftWithGrpc(Boolean separateHeartbeat) { + GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat); + } + + @Parameterized.Parameters + public static Collection<Boolean[]> data() { + return Arrays.asList((new Boolean[][] {{Boolean.FALSE}, {Boolean.TRUE}})); + } + @Override @Test public void testWithLoad() throws Exception {
