Repository: incubator-ratis
Updated Branches:
  refs/heads/master b1c6d6503 -> 357ba8293


RATIS-323. Update grpc to not using the deprecated APIs. Contributed by Tsz Wo 
Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/357ba829
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/357ba829
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/357ba829

Branch: refs/heads/master
Commit: 357ba82937a757e70e32555423cc8b9edd69271b
Parents: b1c6d65
Author: Lokesh Jain <[email protected]>
Authored: Tue Sep 18 15:46:47 2018 +0530
Committer: Lokesh Jain <[email protected]>
Committed: Tue Sep 18 15:46:47 2018 +0530

----------------------------------------------------------------------
 .../java/org/apache/ratis/grpc/RaftGRpcService.java   |  5 ++---
 .../ratis/grpc/client/RaftClientProtocolClient.java   |  9 +++++----
 .../org/apache/ratis/grpc/server/GRpcLogAppender.java | 14 +++++++-------
 3 files changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/357ba829/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 929b564..d638a45 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -28,7 +28,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
 import org.apache.ratis.shaded.io.grpc.Server;
-import org.apache.ratis.shaded.io.grpc.ServerBuilder;
 import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.*;
@@ -85,8 +84,8 @@ public class RaftGRpcService extends 
RaftServerRpcWithProxy<RaftServerProtocolCl
           + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + 
grpcMessageSizeMax);
     }
 
-    server = ((NettyServerBuilder) ServerBuilder.forPort(port))
-        .maxMessageSize(grpcMessageSizeMax.getSizeInt())
+    server = NettyServerBuilder.forPort(port)
+        .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
         .flowControlWindow(flowControlWindow.getSizeInt())
         .addService(new RaftServerProtocolService(idSupplier, raftServer))
         .addService(new RaftClientProtocolService(idSupplier, raftServer))

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/357ba829/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index 7d444e6..9dd1a31 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.util.TimeoutScheduler;
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.netty.NegotiationType;
 import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
@@ -75,8 +76,9 @@ public class RaftClientProtocolClient implements Closeable {
     final SizeInBytes flowControlWindow = 
GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
     final SizeInBytes maxMessageSize = 
GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
     channel = NettyChannelBuilder.forTarget(target.getAddress())
-        .usePlaintext(true).flowControlWindow(flowControlWindow.getSizeInt())
-        .maxMessageSize(maxMessageSize.getSizeInt())
+        .negotiationType(NegotiationType.PLAINTEXT)
+        .flowControlWindow(flowControlWindow.getSizeInt())
+        .maxInboundMessageSize(maxMessageSize.getSizeInt())
         .build();
     blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
@@ -103,8 +105,7 @@ public class RaftClientProtocolClient implements Closeable {
         .groupManagement(request));
   }
 
-  ServerInformationReplyProto serverInformation(
-      ServerInformationRequestProto request) throws IOException {
+  ServerInformationReplyProto serverInformation(ServerInformationRequestProto 
request) {
     return adminBlockingStub
         .withDeadlineAfter(requestTimeoutDuration.getDuration(), 
requestTimeoutDuration.getUnit())
         .serverInformation(request);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/357ba829/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index f060c24..7dfe033 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -25,6 +25,7 @@ import org.apache.ratis.server.impl.FollowerInfo;
 import org.apache.ratis.server.impl.LeaderState;
 import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
@@ -32,6 +33,8 @@ import 
org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
@@ -42,6 +45,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * A new log appender implementation using grpc bi-directional stream API.
  */
 public class GRpcLogAppender extends LogAppender {
+  public static final Logger LOG = 
LoggerFactory.getLogger(GRpcLogAppender.class);
+
   private final RaftGRpcService rpcService;
   private final Map<Long, AppendEntriesRequestProto> pendingRequests;
   private final int maxPendingRequestsNum;
@@ -176,8 +181,7 @@ public class GRpcLogAppender extends LogAppender {
   private void timeoutAppendRequest(AppendEntriesRequestProto request) {
     AppendEntriesRequestProto pendingRequest = 
pendingRequests.remove(request.getServerRequest().getCallId());
     if (pendingRequest != null) {
-      final String err = this + ": appendEntries Timeout, request=" + 
ProtoUtils.toString(pendingRequest.getServerRequest());
-      LOG.warn(err);
+      LOG.warn( "{}: appendEntries Timeout, request={}", this, 
ProtoUtils.toString(pendingRequest.getServerRequest()));
     }
   }
 
@@ -260,16 +264,12 @@ public class GRpcLogAppender extends LogAppender {
     AppendEntriesRequestProto request = 
pendingRequests.remove(reply.getServerReply().getCallId());
     if (request == null) {
       // If reply comes after timeout, the reply is ignored.
-      LOG.warn("Ignoring reply: " + reply);
+      LOG.warn("{}: Request not found, ignoring reply: {}", this, 
ServerProtoUtils.toString(reply));
       return;
     }
     updateCommitIndex(request.getLeaderCommit());
 
     final long replyNextIndex = reply.getNextIndex();
-    Objects.requireNonNull(request,
-        () -> "Got reply with next index " + replyNextIndex
-            + " but the pending queue is empty");
-
     final long lastIndex = replyNextIndex - 1;
     final boolean updateMatchIndex;
 

Reply via email to