Move RequestDispatcher code to RaftServerImpl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a38e2f71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a38e2f71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a38e2f71 Branch: refs/heads/master Commit: a38e2f71ef0e84cb0af9a394544274e0dd56bcd9 Parents: 673a282 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Tue Jan 3 02:05:43 2017 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Tue Jan 3 02:05:43 2017 +0800 ---------------------------------------------------------------------- .../RaftClientAsynchronousProtocol.java | 30 ++++ .../org/apache/raft/grpc/RaftGRpcService.java | 6 +- .../grpc/client/RaftClientProtocolService.java | 14 +- .../grpc/server/RaftServerProtocolService.java | 15 +- .../raft/hadooprpc/server/HadoopRpcService.java | 18 +-- .../raft/netty/server/NettyRpcService.java | 15 +- .../java/org/apache/raft/server/RaftServer.java | 5 +- .../apache/raft/server/impl/RaftServerImpl.java | 72 +++++++++- .../raft/server/impl/RequestDispatcher.java | 140 ------------------- .../server/simulation/SimulatedServerRpc.java | 23 ++- 10 files changed, 142 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java ---------------------------------------------------------------------- diff --git a/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java b/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java new file mode 100644 index 0000000..3572b7e --- /dev/null +++ b/raft-common/src/main/java/org/apache/raft/protocol/RaftClientAsynchronousProtocol.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.protocol; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** Asynchronous version of {@link RaftClientProtocol}. */ +public interface RaftClientAsynchronousProtocol { + CompletableFuture<RaftClientReply> submitClientRequestAsync( + RaftClientRequest request) throws IOException; + + CompletableFuture<RaftClientReply> setConfigurationAsync( + SetConfigurationRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java index d0c98c3..1184e2e 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java @@ -25,7 +25,6 @@ import org.apache.raft.grpc.server.RaftServerProtocolService; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.RaftServer; import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.grpc.Server; import org.apache.raft.shaded.io.grpc.ServerBuilder; import org.apache.raft.shaded.io.grpc.netty.NettyServerBuilder; @@ -61,11 +60,10 @@ public class RaftGRpcService implements RaftServerRpc { RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY, RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT); ServerBuilder serverBuilder = ServerBuilder.forPort(port); - final RequestDispatcher dispatcher = new RequestDispatcher(raftServer); selfId = raftServer.getId(); server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) - .addService(new RaftServerProtocolService(selfId, dispatcher)) - .addService(new RaftClientProtocolService(selfId, dispatcher)) + .addService(new RaftServerProtocolService(selfId, raftServer)) + .addService(new RaftClientProtocolService(selfId, raftServer)) .build(); // start service to determine the port (in case port is configured as 0) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java index 32dbac7..8f41bdc 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/client/RaftClientProtocolService.java @@ -20,8 +20,8 @@ package org.apache.raft.grpc.client; import com.google.common.base.Preconditions; import org.apache.raft.client.impl.ClientProtoUtils; import org.apache.raft.grpc.RaftGrpcUtil; +import org.apache.raft.protocol.RaftClientAsynchronousProtocol; import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.grpc.stub.StreamObserver; import org.apache.raft.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.raft.shaded.proto.RaftProtos.RaftClientRequestProto; @@ -65,18 +65,18 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE); private final String id; - private final RequestDispatcher dispatcher; + private final RaftClientAsynchronousProtocol client; - public RaftClientProtocolService(String id, RequestDispatcher dispatcher) { + public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol client) { this.id = id; - this.dispatcher = dispatcher; + this.client = client; } @Override public void setConfiguration(SetConfigurationRequestProto request, StreamObserver<RaftClientReplyProto> responseObserver) { try { - CompletableFuture<RaftClientReply> future = dispatcher.setConfigurationAsync( + CompletableFuture<RaftClientReply> future = client.setConfigurationAsync( ClientProtoUtils.toSetConfigurationRequest(request)); future.whenCompleteAsync((reply, exception) -> { if (exception != null) { @@ -114,8 +114,8 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase pendingList.add(p); } - CompletableFuture<RaftClientReply> future = dispatcher - .handleClientRequest(ClientProtoUtils.toRaftClientRequest(request)); + CompletableFuture<RaftClientReply> future = client.submitClientRequestAsync( + ClientProtoUtils.toRaftClientRequest(request)); future.whenCompleteAsync((reply, exception) -> { if (exception != null) { // TODO: the exception may be from either raft or state machine. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java index 2f06c59..53dbb6a 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java @@ -18,7 +18,7 @@ package org.apache.raft.grpc.server; import org.apache.raft.grpc.RaftGrpcUtil; -import org.apache.raft.server.impl.RequestDispatcher; +import org.apache.raft.server.protocol.RaftServerProtocol; import org.apache.raft.shaded.io.grpc.stub.StreamObserver; import org.apache.raft.shaded.proto.RaftProtos.*; import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; @@ -29,18 +29,18 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class); private final String id; - private final RequestDispatcher dispatcher; + private final RaftServerProtocol server; - public RaftServerProtocolService(String id, RequestDispatcher dispatcher) { + public RaftServerProtocolService(String id, RaftServerProtocol server) { this.id = id; - this.dispatcher = dispatcher; + this.server = server; } @Override public void requestVote(RequestVoteRequestProto request, StreamObserver<RequestVoteReplyProto> responseObserver) { try { - final RequestVoteReplyProto reply = dispatcher.requestVote(request); + final RequestVoteReplyProto reply = server.requestVote(request); responseObserver.onNext(reply); responseObserver.onCompleted(); } catch (Throwable e) { @@ -57,7 +57,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase @Override public void onNext(AppendEntriesRequestProto request) { try { - final AppendEntriesReplyProto reply = dispatcher.appendEntries(request); + final AppendEntriesReplyProto reply = server.appendEntries(request); responseObserver.onNext(reply); } catch (Throwable e) { LOG.info("{} got exception when handling appendEntries {}: {}", @@ -87,8 +87,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase @Override public void onNext(InstallSnapshotRequestProto request) { try { - final InstallSnapshotReplyProto reply = - dispatcher.installSnapshot(request); + final InstallSnapshotReplyProto reply = server.installSnapshot(request); responseObserver.onNext(reply); } catch (Throwable e) { LOG.info("{} got exception when handling installSnapshot {}: {}", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java index b73deca..24e1d2c 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java @@ -24,11 +24,12 @@ import org.apache.hadoop.ipc.RPC; import org.apache.raft.hadooprpc.Proxy; import org.apache.raft.hadooprpc.client.RaftClientProtocolPB; import org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; +import org.apache.raft.protocol.RaftClientProtocol; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.RaftServer; import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.impl.RequestDispatcher; +import org.apache.raft.server.protocol.RaftServerProtocol; import org.apache.raft.shaded.com.google.protobuf.BlockingService; import org.apache.raft.shaded.com.google.protobuf.ServiceException; import org.apache.raft.shaded.proto.RaftProtos.*; @@ -49,7 +50,6 @@ public class HadoopRpcService implements RaftServerRpc { static final String CLASS_NAME = HadoopRpcService.class.getSimpleName(); public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; - private final RequestDispatcher raftService; private final String id; private final RPC.Server ipcServer; private final InetSocketAddress ipcServerAddress; @@ -60,12 +60,11 @@ public class HadoopRpcService implements RaftServerRpc { throws IOException { this.proxies = new PeerProxyMap<>( p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf)); - this.raftService = new RequestDispatcher(server); this.id = server.getId(); - this.ipcServer = newRpcServer(conf); + this.ipcServer = newRpcServer(server, conf); this.ipcServerAddress = ipcServer.getListenerAddress(); - addRaftClientProtocol(conf); + addRaftClientProtocol(server, conf); LOG.info(getClass().getSimpleName() + " created RPC.Server at " + ipcServerAddress); @@ -76,7 +75,8 @@ public class HadoopRpcService implements RaftServerRpc { return ipcServerAddress; } - private RPC.Server newRpcServer(final Configuration conf) throws IOException { + private RPC.Server newRpcServer(RaftServerProtocol serverProtocol, final Configuration conf) + throws IOException { final RaftServerConfigKeys.Get get = new RaftServerConfigKeys.Get() { @Override protected int getInt(String key, int defaultValue) { @@ -94,7 +94,7 @@ public class HadoopRpcService implements RaftServerRpc { final BlockingService service = RaftServerProtocolService.newReflectiveBlockingService( - new RaftServerProtocolServerSideTranslatorPB(raftService)); + new RaftServerProtocolServerSideTranslatorPB(serverProtocol)); RPC.setProtocolEngine(conf, RaftServerProtocolPB.class, ProtobufRpcEngineShaded.class); return new RPC.Builder(conf) .setProtocol(RaftServerProtocolPB.class) @@ -106,13 +106,13 @@ public class HadoopRpcService implements RaftServerRpc { .build(); } - private void addRaftClientProtocol(Configuration conf) { + private void addRaftClientProtocol(RaftClientProtocol clientProtocol, Configuration conf) { final Class<?> protocol = RaftClientProtocolPB.class; RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class); final BlockingService service = RaftClientProtocolService.newReflectiveBlockingService( - new RaftClientProtocolServerSideTranslatorPB(raftService)); + new RaftClientProtocolServerSideTranslatorPB(clientProtocol)); ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java index 08e379a..50833fb 100644 --- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java +++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java @@ -31,7 +31,6 @@ import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.server.RaftServer; import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; @@ -56,7 +55,7 @@ public final class NettyRpcService implements RaftServerRpc { public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); - private final RequestDispatcher raftService; + private final RaftServer server; private final String id; private final EventLoopGroup bossGroup = new NioEventLoopGroup(); @@ -76,7 +75,7 @@ public final class NettyRpcService implements RaftServerRpc { /** Constructs a netty server with the given port. */ public NettyRpcService(int port, RaftServer server) { - this.raftService = new RequestDispatcher(server); + this.server = server; this.id = server.getId(); final ChannelInitializer<SocketChannel> initializer @@ -134,7 +133,7 @@ public final class NettyRpcService implements RaftServerRpc { case REQUESTVOTEREQUEST: { final RequestVoteRequestProto request = proto.getRequestVoteRequest(); rpcRequest = request.getServerRequest(); - final RequestVoteReplyProto reply = raftService.requestVote(request); + final RequestVoteReplyProto reply = server.requestVote(request); return RaftNettyServerReplyProto.newBuilder() .setRequestVoteReply(reply) .build(); @@ -142,7 +141,7 @@ public final class NettyRpcService implements RaftServerRpc { case APPENDENTRIESREQUEST: { final AppendEntriesRequestProto request = proto.getAppendEntriesRequest(); rpcRequest = request.getServerRequest(); - final AppendEntriesReplyProto reply = raftService.appendEntries(request); + final AppendEntriesReplyProto reply = server.appendEntries(request); return RaftNettyServerReplyProto.newBuilder() .setAppendEntriesReply(reply) .build(); @@ -150,7 +149,7 @@ public final class NettyRpcService implements RaftServerRpc { case INSTALLSNAPSHOTREQUEST: { final InstallSnapshotRequestProto request = proto.getInstallSnapshotRequest(); rpcRequest = request.getServerRequest(); - final InstallSnapshotReplyProto reply = raftService.installSnapshot(request); + final InstallSnapshotReplyProto reply = server.installSnapshot(request); return RaftNettyServerReplyProto.newBuilder() .setInstallSnapshotReply(reply) .build(); @@ -158,7 +157,7 @@ public final class NettyRpcService implements RaftServerRpc { case RAFTCLIENTREQUEST: { final RaftClientRequestProto request = proto.getRaftClientRequest(); rpcRequest = request.getRpcRequest(); - final RaftClientReply reply = raftService.submitClientRequest( + final RaftClientReply reply = server.submitClientRequest( ClientProtoUtils.toRaftClientRequest(request)); return RaftNettyServerReplyProto.newBuilder() .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) @@ -167,7 +166,7 @@ public final class NettyRpcService implements RaftServerRpc { case SETCONFIGURATIONREQUEST: { final SetConfigurationRequestProto request = proto.getSetConfigurationRequest(); rpcRequest = request.getRpcRequest(); - final RaftClientReply reply = raftService.setConfiguration( + final RaftClientReply reply = server.setConfiguration( ClientProtoUtils.toSetConfigurationRequest(request)); return RaftNettyServerReplyProto.newBuilder() .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java index 7141eca..aa4dfbf 100644 --- a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java +++ b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java @@ -17,12 +17,15 @@ */ package org.apache.raft.server; +import org.apache.raft.protocol.RaftClientAsynchronousProtocol; +import org.apache.raft.protocol.RaftClientProtocol; import org.apache.raft.server.protocol.RaftServerProtocol; import java.io.Closeable; /** Raft server interface */ -public interface RaftServer extends RaftServerProtocol, Closeable { +public interface RaftServer extends Closeable, RaftServerProtocol, + RaftClientProtocol, RaftClientAsynchronousProtocol { /** @return the server ID. */ String getId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java index 3026afa..1ea40f6 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java @@ -44,6 +44,7 @@ import java.util.Collection; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; import static org.apache.raft.util.LifeCycle.State.*; @@ -146,6 +147,7 @@ public class RaftServerImpl implements RaftServer { return serverRpc; } + @Override public void start() { lifeCycle.transition(STARTING); state.start(); @@ -186,11 +188,12 @@ public class RaftServerImpl implements RaftServer { return this.state; } + @Override public String getId() { return getState().getSelfId(); } - public RaftConfiguration getRaftConf() { + RaftConfiguration getRaftConf() { return getState().getRaftConf(); } @@ -323,7 +326,7 @@ public class RaftServerImpl implements RaftServer { /** * @return null if the server is in leader state. */ - CompletableFuture<RaftClientReply> checkLeaderState( + private CompletableFuture<RaftClientReply> checkLeaderState( RaftClientRequest request) { if (!isLeader()) { NotLeaderException exception = generateNotLeaderException(); @@ -355,7 +358,7 @@ public class RaftServerImpl implements RaftServer { /** * Handle a normal update request from client. */ - public CompletableFuture<RaftClientReply> appendTransaction( + private CompletableFuture<RaftClientReply> appendTransaction( RaftClientRequest request, TransactionContext entry) throws RaftException { LOG.debug("{}: receive client request({})", getId(), request); @@ -384,10 +387,71 @@ public class RaftServerImpl implements RaftServer { return pending.getFuture(); } + @Override + public CompletableFuture<RaftClientReply> submitClientRequestAsync( + RaftClientRequest request) throws IOException { + // first check the server's leader state + CompletableFuture<RaftClientReply> reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + // let the state machine handle read-only request from client + if (request.isReadOnly()) { + // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper, + // section 8 (last part) + return stateMachine.query(request); + } + + // TODO: this client request will not be added to pending requests + // until later which means that any failure in between will leave partial state in the + // state machine. We should call cancelTransaction() for failed requests + TransactionContext entry = stateMachine.startTransaction(request); + if (entry.getException().isPresent()) { + throw RaftUtils.asIOException(entry.getException().get()); + } + + return appendTransaction(request, entry); + } + + @Override + public RaftClientReply submitClientRequest(RaftClientRequest request) + throws IOException { + return waitForReply(getId(), request, submitClientRequestAsync(request)); + } + + private static RaftClientReply waitForReply(String id, RaftClientRequest request, + CompletableFuture<RaftClientReply> future) throws IOException { + try { + return future.get(); + } catch (InterruptedException e) { + final String s = id + ": Interrupted when waiting for reply, request=" + request; + LOG.info(s, e); + throw RaftUtils.toInterruptedIOException(s, e); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause == null) { + throw new IOException(e); + } + if (cause instanceof NotLeaderException) { + return new RaftClientReply(request, (NotLeaderException)cause); + } else { + throw RaftUtils.asIOException(cause); + } + } + } + + @Override + public RaftClientReply setConfiguration(SetConfigurationRequest request) + throws IOException { + return waitForReply(getId(), request, setConfigurationAsync(request)); + } + /** * Handle a raft configuration change request from client. */ - public CompletableFuture<RaftClientReply> setConfiguration( + @Override + public CompletableFuture<RaftClientReply> setConfigurationAsync( SetConfigurationRequest request) throws IOException { LOG.debug("{}: receive setConfiguration({})", getId(), request); lifeCycle.assertCurrentState(RUNNING); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java deleted file mode 100644 index 39a4ac8..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import com.google.common.base.Preconditions; -import org.apache.raft.protocol.*; -import org.apache.raft.server.RaftServer; -import org.apache.raft.server.protocol.RaftServerProtocol; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.statemachine.StateMachine; -import org.apache.raft.statemachine.TransactionContext; -import org.apache.raft.util.RaftUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -/** - * Each RPC request is first handled by the RequestDispatcher: - * 1. A request from another RaftPeer is to be handled by RaftServer. - * - * If the raft peer is the leader, then: - * - * 2. A read-only request from client is to be handled by the state machine. - * 3. A write request from client is first validated by the state machine. The - * state machine returns the content of the raft log entry, which is then passed - * to the RaftServer for replication. - */ -public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol { - static final Logger LOG = LoggerFactory.getLogger(RequestDispatcher.class); - - private final RaftServerImpl server; - private final StateMachine stateMachine; - - public RequestDispatcher(RaftServer server) { - Preconditions.checkArgument(server instanceof RaftServerImpl); - this.server = (RaftServerImpl)server; - this.stateMachine = this.server.getStateMachine(); - } - - public CompletableFuture<RaftClientReply> handleClientRequest( - RaftClientRequest request) throws IOException { - // first check the server's leader state - CompletableFuture<RaftClientReply> reply = server.checkLeaderState(request); - if (reply != null) { - return reply; - } - - // let the state machine handle read-only request from client - if (request.isReadOnly()) { - // TODO: We might not be the leader anymore by the time this completes. See the RAFT paper, - // section 8 (last part) - return stateMachine.query(request); - } - - // TODO: this client request will not be added to pending requests - // until later which means that any failure in between will leave partial state in the - // state machine. We should call cancelTransaction() for failed requests - TransactionContext entry = stateMachine.startTransaction(request); - if (entry.getException().isPresent()) { - throw RaftUtils.asIOException(entry.getException().get()); - } - - return server.appendTransaction(request, entry); - } - - @Override - public RaftClientReply submitClientRequest(RaftClientRequest request) - throws IOException { - return waitForReply(server.getId(), request, handleClientRequest(request)); - } - - public CompletableFuture<RaftClientReply> setConfigurationAsync( - SetConfigurationRequest request) throws IOException { - return server.setConfiguration(request); - } - - @Override - public RaftClientReply setConfiguration(SetConfigurationRequest request) - throws IOException { - return waitForReply(server.getId(), request, setConfigurationAsync(request)); - } - - private static RaftClientReply waitForReply(String serverId, - RaftClientRequest request, CompletableFuture<RaftClientReply> future) - throws IOException { - try { - return future.get(); - } catch (InterruptedException e) { - final String s = serverId + ": Interrupted when waiting for reply, request=" + request; - LOG.info(s, e); - throw RaftUtils.toInterruptedIOException(s, e); - } catch (ExecutionException e) { - final Throwable cause = e.getCause(); - if (cause == null) { - throw new IOException(e); - } - if (cause instanceof NotLeaderException) { - return new RaftClientReply(request, (NotLeaderException)cause); - } else { - throw RaftUtils.asIOException(cause); - } - } - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) - throws IOException { - return server.requestVote(request); - } - - @Override - public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) - throws IOException { - return server.appendEntries(request); - } - - @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - return server.installSnapshot(request); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a38e2f71/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java index 8a7e752..799ee65 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java @@ -18,13 +18,10 @@ package org.apache.raft.server.simulation; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.raft.protocol.RaftClientReply; -import org.apache.raft.protocol.RaftClientRequest; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.protocol.SetConfigurationRequest; -import org.apache.raft.server.impl.RaftServerImpl; +import org.apache.raft.protocol.*; import org.apache.raft.server.RaftServerRpc; -import org.apache.raft.server.impl.RequestDispatcher; +import org.apache.raft.server.impl.RaftServerImpl; +import org.apache.raft.server.protocol.RaftServerProtocol; import org.apache.raft.shaded.proto.RaftProtos.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +37,6 @@ class SimulatedServerRpc implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); private final RaftServerImpl server; - private final RequestDispatcher dispatcher; private final RequestHandler<RaftServerRequest, RaftServerReply> serverHandler; private final RequestHandler<RaftClientRequest, RaftClientReply> clientHandler; private final ExecutorService executor = Executors.newFixedThreadPool(3, @@ -50,7 +46,6 @@ class SimulatedServerRpc implements RaftServerRpc { SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { this.server = server; - this.dispatcher = new RequestDispatcher(server); this.serverHandler = new RequestHandler<>(server.getId(), "serverHandler", serverRequestReply, serverHandlerImpl, 3); this.clientHandler = new RequestHandler<>(server.getId(), @@ -125,13 +120,11 @@ class SimulatedServerRpc implements RaftServerRpc { public RaftServerReply handleRequest(RaftServerRequest r) throws IOException { if (r.isAppendEntries()) { - return new RaftServerReply( - dispatcher.appendEntries(r.getAppendEntries())); + return new RaftServerReply(server.appendEntries(r.getAppendEntries())); } else if (r.isRequestVote()) { - return new RaftServerReply(dispatcher.requestVote(r.getRequestVote())); + return new RaftServerReply(server.requestVote(r.getRequestVote())); } else if (r.isInstallSnapshot()) { - return new RaftServerReply( - dispatcher.installSnapshot(r.getInstallSnapshot())); + return new RaftServerReply(server.installSnapshot(r.getInstallSnapshot())); } else { throw new IllegalStateException("unexpected state"); } @@ -150,9 +143,9 @@ class SimulatedServerRpc implements RaftServerRpc { throws IOException { final CompletableFuture<RaftClientReply> future; if (request instanceof SetConfigurationRequest) { - future = dispatcher.setConfigurationAsync((SetConfigurationRequest) request); + future = server.setConfigurationAsync((SetConfigurationRequest) request); } else { - future = dispatcher.handleClientRequest(request); + future = server.submitClientRequestAsync(request); } future.whenCompleteAsync((reply, exception) -> {