Repository: incubator-ratis Updated Branches: refs/heads/master d022b687f -> de25e8130
RATIS-183. In gRPC, Follower should not wait for log sync to process next appendEntries. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/de25e813 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/de25e813 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/de25e813 Branch: refs/heads/master Commit: de25e8130176d8d347acc82ef9fc16881837f80c Parents: d022b68 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Mon Jan 8 17:52:56 2018 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Mon Jan 8 17:52:56 2018 +0800 ---------------------------------------------------------------------- .../grpc/server/RaftServerProtocolService.java | 9 +++--- .../org/apache/ratis/server/RaftServer.java | 4 ++- .../ratis/server/impl/RaftServerImpl.java | 30 ++++++++++++------- .../ratis/server/impl/RaftServerProxy.java | 6 ++++ .../RaftServerAsynchronousProtocol.java | 31 ++++++++++++++++++++ 5 files changed, 64 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java index a95926a..a7a6990 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java @@ -19,7 +19,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.protocol.RaftServerProtocol; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; @@ -33,9 +33,9 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class); private final Supplier<RaftPeerId> idSupplier; - private final RaftServerProtocol server; + private final RaftServer server; - public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServerProtocol server) { + public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server) { this.idSupplier = idSupplier; this.server = server; } @@ -65,8 +65,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase @Override public void onNext(AppendEntriesRequestProto request) { try { - final AppendEntriesReplyProto reply = server.appendEntries(request); - responseObserver.onNext(reply); + server.appendEntriesAsync(request).thenAccept(responseObserver::onNext); } catch (Throwable e) { if (LOG.isDebugEnabled()) { LOG.debug("{} got exception when appendEntries {}: {}", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index a3d9d91..085f2d1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -23,6 +23,7 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.impl.ServerFactory; import org.apache.ratis.server.impl.ServerImplUtils; +import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.statemachine.StateMachine; @@ -31,7 +32,8 @@ import java.io.IOException; import java.util.Objects; /** Raft server interface */ -public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, +public interface RaftServer extends Closeable, RpcType.Get, + RaftServerProtocol, RaftServerAsynchronousProtocol, RaftClientProtocol, RaftClientAsynchronousProtocol, AdminProtocol, AdminAsynchronousProtocol { /** @return the server ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d039e6c..082fe3c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -22,6 +22,7 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerMXBean; import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.FileInfo; @@ -40,6 +41,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; @@ -52,7 +54,7 @@ import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBod import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY; import static org.apache.ratis.util.LifeCycle.State.*; -public class RaftServerImpl implements RaftServerProtocol, +public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronousProtocol, RaftClientProtocol, RaftClientAsynchronousProtocol { public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class); @@ -686,13 +688,23 @@ public class RaftServerImpl implements RaftServerProtocol, @Override public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) throws IOException { + try { + return appendEntriesAsync(r).join(); + } catch (CompletionException e) { + throw IOUtils.asIOException(JavaUtils.unwrapCompletionException(e)); + } + } + + @Override + public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto r) + throws IOException { // TODO avoid converting list to array final RaftRpcRequestProto request = r.getServerRequest(); final LogEntryProto[] entries = r.getEntriesList() .toArray(new LogEntryProto[r.getEntriesCount()]); final TermIndex previous = r.hasPreviousLog() ? ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; - return appendEntries(RaftPeerId.valueOf(request.getRequestorId()), + return appendEntriesAsync(RaftPeerId.valueOf(request.getRequestorId()), ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), entries); @@ -710,7 +722,7 @@ public class RaftServerImpl implements RaftServerProtocol, } } - private AppendEntriesReplyProto appendEntries( + private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm, TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException { @@ -745,7 +757,7 @@ public class RaftServerImpl implements RaftServerProtocol, LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}", getId(), leaderId, leaderTerm, state, ProtoUtils.toString(reply)); } - return reply; + return CompletableFuture.completedFuture(reply); } changeToFollower(leaderTerm, true); state.setLeader(leaderId, "appendEntries"); @@ -771,7 +783,7 @@ public class RaftServerImpl implements RaftServerProtocol, LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}", getId(), previous, ServerProtoUtils.toString(reply)); } - return reply; + return CompletableFuture.completedFuture(reply); } futures = state.getLog().append(entries); @@ -781,10 +793,6 @@ public class RaftServerImpl implements RaftServerProtocol, } if (entries.length > 0) { CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null); - for (CompletableFuture future : futures) { - future.join(); - } - nextIndex = entries[entries.length - 1].getIndex() + 1; } synchronized (this) { @@ -800,7 +808,9 @@ public class RaftServerImpl implements RaftServerProtocol, logAppendEntries(isHeartbeat, () -> getId() + ": succeeded to handle AppendEntries. Reply: " + ServerProtoUtils.toString(reply)); - return reply; + return CompletableFuture + .allOf(futures.toArray(new CompletableFuture[futures.size()])) + .thenApply(v -> reply); } private boolean containPrevious(TermIndex previous) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 27483ec..071f65c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -241,6 +241,12 @@ public class RaftServerProxy implements RaftServer { } @Override + public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( + AppendEntriesRequestProto r) throws IOException { + return getImpl().appendEntriesAsync(r); + } + + @Override public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) throws IOException { return getImpl().appendEntries(r); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/de25e813/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java new file mode 100644 index 0000000..d9461c7 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.server.protocol; + +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +public interface RaftServerAsynchronousProtocol { + + CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) + throws IOException; +}
