Repository: incubator-ratis Updated Branches: refs/heads/master b1c6d6503 -> 357ba8293
RATIS-323. Update grpc to not using the deprecated APIs. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/357ba829 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/357ba829 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/357ba829 Branch: refs/heads/master Commit: 357ba82937a757e70e32555423cc8b9edd69271b Parents: b1c6d65 Author: Lokesh Jain <[email protected]> Authored: Tue Sep 18 15:46:47 2018 +0530 Committer: Lokesh Jain <[email protected]> Committed: Tue Sep 18 15:46:47 2018 +0530 ---------------------------------------------------------------------- .../java/org/apache/ratis/grpc/RaftGRpcService.java | 5 ++--- .../ratis/grpc/client/RaftClientProtocolClient.java | 9 +++++---- .../org/apache/ratis/grpc/server/GRpcLogAppender.java | 14 +++++++------- 3 files changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/357ba829/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 929b564..d638a45 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -28,7 +28,6 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.RaftServerRpcWithProxy; import org.apache.ratis.shaded.io.grpc.Server; -import org.apache.ratis.shaded.io.grpc.ServerBuilder; import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.*; @@ -85,8 +84,8 @@ public class RaftGRpcService extends RaftServerRpcWithProxy<RaftServerProtocolCl + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); } - server = ((NettyServerBuilder) ServerBuilder.forPort(port)) - .maxMessageSize(grpcMessageSizeMax.getSizeInt()) + server = NettyServerBuilder.forPort(port) + .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt()) .flowControlWindow(flowControlWindow.getSizeInt()) .addService(new RaftServerProtocolService(idSupplier, raftServer)) .addService(new RaftClientProtocolService(idSupplier, raftServer)) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/357ba829/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index 7d444e6..9dd1a31 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -26,6 +26,7 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.util.TimeoutScheduler; import org.apache.ratis.shaded.io.grpc.ManagedChannel; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.netty.NegotiationType; import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.*; @@ -75,8 +76,9 @@ public class RaftClientProtocolClient implements Closeable { final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug); final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); channel = NettyChannelBuilder.forTarget(target.getAddress()) - .usePlaintext(true).flowControlWindow(flowControlWindow.getSizeInt()) - .maxMessageSize(maxMessageSize.getSizeInt()) + .negotiationType(NegotiationType.PLAINTEXT) + .flowControlWindow(flowControlWindow.getSizeInt()) + .maxInboundMessageSize(maxMessageSize.getSizeInt()) .build(); blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); @@ -103,8 +105,7 @@ public class RaftClientProtocolClient implements Closeable { .groupManagement(request)); } - ServerInformationReplyProto serverInformation( - ServerInformationRequestProto request) throws IOException { + ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) { return adminBlockingStub .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) .serverInformation(request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/357ba829/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index f060c24..7dfe033 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -25,6 +25,7 @@ import org.apache.ratis.server.impl.FollowerInfo; import org.apache.ratis.server.impl.LeaderState; import org.apache.ratis.server.impl.LogAppender; import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; @@ -32,6 +33,8 @@ import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; @@ -42,6 +45,8 @@ import java.util.concurrent.atomic.AtomicBoolean; * A new log appender implementation using grpc bi-directional stream API. */ public class GRpcLogAppender extends LogAppender { + public static final Logger LOG = LoggerFactory.getLogger(GRpcLogAppender.class); + private final RaftGRpcService rpcService; private final Map<Long, AppendEntriesRequestProto> pendingRequests; private final int maxPendingRequestsNum; @@ -176,8 +181,7 @@ public class GRpcLogAppender extends LogAppender { private void timeoutAppendRequest(AppendEntriesRequestProto request) { AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId()); if (pendingRequest != null) { - final String err = this + ": appendEntries Timeout, request=" + ProtoUtils.toString(pendingRequest.getServerRequest()); - LOG.warn(err); + LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest())); } } @@ -260,16 +264,12 @@ public class GRpcLogAppender extends LogAppender { AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId()); if (request == null) { // If reply comes after timeout, the reply is ignored. - LOG.warn("Ignoring reply: " + reply); + LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply)); return; } updateCommitIndex(request.getLeaderCommit()); final long replyNextIndex = reply.getNextIndex(); - Objects.requireNonNull(request, - () -> "Got reply with next index " + replyNextIndex - + " but the pending queue is empty"); - final long lastIndex = replyNextIndex - 1; final boolean updateMatchIndex;
