Repository: incubator-ratis
Updated Branches:
  refs/heads/master d022b687f -> de25e8130


RATIS-183.  In gRPC, Follower should not wait for log sync to process next 
appendEntries.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/de25e813
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/de25e813
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/de25e813

Branch: refs/heads/master
Commit: de25e8130176d8d347acc82ef9fc16881837f80c
Parents: d022b68
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Mon Jan 8 17:52:56 2018 +0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Mon Jan 8 17:52:56 2018 +0800

----------------------------------------------------------------------
 .../grpc/server/RaftServerProtocolService.java  |  9 +++---
 .../org/apache/ratis/server/RaftServer.java     |  4 ++-
 .../ratis/server/impl/RaftServerImpl.java       | 30 ++++++++++++-------
 .../ratis/server/impl/RaftServerProxy.java      |  6 ++++
 .../RaftServerAsynchronousProtocol.java         | 31 ++++++++++++++++++++
 5 files changed, 64 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
index a95926a..a7a6990 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
@@ -19,7 +19,7 @@ package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import 
org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
@@ -33,9 +33,9 @@ public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase
   public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerProtocolService.class);
 
   private final Supplier<RaftPeerId> idSupplier;
-  private final RaftServerProtocol server;
+  private final RaftServer server;
 
-  public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, 
RaftServerProtocol server) {
+  public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer 
server) {
     this.idSupplier = idSupplier;
     this.server = server;
   }
@@ -65,8 +65,7 @@ public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase
       @Override
       public void onNext(AppendEntriesRequestProto request) {
         try {
-          final AppendEntriesReplyProto reply = server.appendEntries(request);
-          responseObserver.onNext(reply);
+          
server.appendEntriesAsync(request).thenAccept(responseObserver::onNext);
         } catch (Throwable e) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("{} got exception when appendEntries {}: {}",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index a3d9d91..085f2d1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.statemachine.StateMachine;
 
@@ -31,7 +32,8 @@ import java.io.IOException;
 import java.util.Objects;
 
 /** Raft server interface */
-public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
+public interface RaftServer extends Closeable, RpcType.Get,
+    RaftServerProtocol, RaftServerAsynchronousProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol,
     AdminProtocol, AdminAsynchronousProtocol {
   /** @return the server ID. */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d039e6c..082fe3c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.FileInfo;
@@ -40,6 +41,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
@@ -52,7 +54,7 @@ import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBod
 import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
 import static org.apache.ratis.util.LifeCycle.State.*;
 
-public class RaftServerImpl implements RaftServerProtocol,
+public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronousProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol {
   public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerImpl.class);
 
@@ -686,13 +688,23 @@ public class RaftServerImpl implements RaftServerProtocol,
   @Override
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
       throws IOException {
+    try {
+      return appendEntriesAsync(r).join();
+    } catch (CompletionException e) {
+      throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e));
+    }
+  }
+
+  @Override
+  public CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto r)
+      throws IOException {
     // TODO avoid converting list to array
     final RaftRpcRequestProto request = r.getServerRequest();
     final LogEntryProto[] entries = r.getEntriesList()
         .toArray(new LogEntryProto[r.getEntriesCount()]);
     final TermIndex previous = r.hasPreviousLog() ?
         ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
-    return appendEntries(RaftPeerId.valueOf(request.getRequestorId()),
+    return appendEntriesAsync(RaftPeerId.valueOf(request.getRequestorId()),
         ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
         r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
         entries);
@@ -710,7 +722,7 @@ public class RaftServerImpl implements RaftServerProtocol,
     }
   }
 
-  private AppendEntriesReplyProto appendEntries(
+  private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
       TermIndex previous, long leaderCommit, boolean initializing,
       LogEntryProto... entries) throws IOException {
@@ -745,7 +757,7 @@ public class RaftServerImpl implements RaftServerProtocol,
           LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} 
reply: {}",
               getId(), leaderId, leaderTerm, state, 
ProtoUtils.toString(reply));
         }
-        return reply;
+        return CompletableFuture.completedFuture(reply);
       }
       changeToFollower(leaderTerm, true);
       state.setLeader(leaderId, "appendEntries");
@@ -771,7 +783,7 @@ public class RaftServerImpl implements RaftServerProtocol,
           LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
               getId(), previous, ServerProtoUtils.toString(reply));
         }
-        return reply;
+        return CompletableFuture.completedFuture(reply);
       }
 
       futures = state.getLog().append(entries);
@@ -781,10 +793,6 @@ public class RaftServerImpl implements RaftServerProtocol,
     }
     if (entries.length > 0) {
       CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
-      for (CompletableFuture future : futures) {
-        future.join();
-      }
-
       nextIndex = entries[entries.length - 1].getIndex() + 1;
     }
     synchronized (this) {
@@ -800,7 +808,9 @@ public class RaftServerImpl implements RaftServerProtocol,
     logAppendEntries(isHeartbeat,
         () -> getId() + ": succeeded to handle AppendEntries. Reply: "
             + ServerProtoUtils.toString(reply));
-    return reply;
+    return CompletableFuture
+        .allOf(futures.toArray(new CompletableFuture[futures.size()]))
+        .thenApply(v -> reply);
   }
 
   private boolean containPrevious(TermIndex previous) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 27483ec..071f65c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -241,6 +241,12 @@ public class RaftServerProxy implements RaftServer {
   }
 
   @Override
+  public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
+      AppendEntriesRequestProto r) throws IOException {
+    return getImpl().appendEntriesAsync(r);
+  }
+
+  @Override
   public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
       throws IOException {
     return getImpl().appendEntries(r);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
new file mode 100644
index 0000000..d9461c7
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.protocol;
+
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public interface RaftServerAsynchronousProtocol {
+
+  CompletableFuture<AppendEntriesReplyProto> 
appendEntriesAsync(AppendEntriesRequestProto request)
+      throws IOException;
+}

Reply via email to