Add a new RaftServer interface.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/56e9b719 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/56e9b719 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/56e9b719 Branch: refs/heads/master Commit: 56e9b719003e7d7024b05d9edec5a95937b3d3b3 Parents: 22f3ee0 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Mon Jan 2 21:10:11 2017 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Mon Jan 2 21:10:11 2017 +0800 ---------------------------------------------------------------------- .../org/apache/raft/grpc/RaftGRpcService.java | 6 +-- .../raft/hadooprpc/server/HadoopRpcService.java | 8 ++-- .../raft/netty/server/NettyRpcService.java | 6 +-- .../java/org/apache/raft/server/RaftServer.java | 34 +++++++++++++++ .../org/apache/raft/server/RaftServerRpc.java | 44 ++++++++++++++++++++ .../apache/raft/server/impl/RaftServerImpl.java | 7 ++-- .../apache/raft/server/impl/RaftServerRpc.java | 44 -------------------- .../raft/server/impl/RequestDispatcher.java | 9 ++-- .../java/org/apache/raft/MiniRaftCluster.java | 2 +- .../server/simulation/SimulatedServerRpc.java | 6 +-- 10 files changed, 102 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java index d465724..f3d894a 100644 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java +++ b/raft-grpc/src/main/java/org/apache/raft/grpc/RaftGRpcService.java @@ -23,8 +23,8 @@ import org.apache.raft.grpc.client.RaftClientProtocolService; import org.apache.raft.grpc.server.RaftServerProtocolClient; import org.apache.raft.grpc.server.RaftServerProtocolService; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.RaftServer; +import org.apache.raft.server.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.grpc.Server; import org.apache.raft.shaded.io.grpc.ServerBuilder; @@ -54,7 +54,7 @@ public class RaftGRpcService implements RaftServerRpc { Collections.synchronizedMap(new HashMap<>()); private final String selfId; - public RaftGRpcService(RaftServerImpl raftServer, RaftProperties properties) { + public RaftGRpcService(RaftServer raftServer, RaftProperties properties) { int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY, RAFT_GRPC_SERVER_PORT_DEFAULT); int maxMessageSize = properties.getInt( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java index eb92372..3330d78 100644 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java +++ b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/server/HadoopRpcService.java @@ -25,9 +25,9 @@ import org.apache.raft.hadooprpc.Proxy; import org.apache.raft.hadooprpc.client.RaftClientProtocolPB; import org.apache.raft.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.*; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.RaftServer; +import org.apache.raft.server.RaftServerConfigKeys; +import org.apache.raft.server.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.com.google.protobuf.BlockingService; import org.apache.raft.shaded.com.google.protobuf.ServiceException; @@ -56,7 +56,7 @@ public class HadoopRpcService implements RaftServerRpc { private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies; - public HadoopRpcService(RaftServerImpl server, final Configuration conf) + public HadoopRpcService(RaftServer server, final Configuration conf) throws IOException { this.proxies = new PeerProxyMap<>( p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java index b5b8550..19f5979 100644 --- a/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java +++ b/raft-netty/src/main/java/org/apache/raft/netty/server/NettyRpcService.java @@ -29,8 +29,8 @@ import org.apache.raft.client.impl.ClientProtoUtils; import org.apache.raft.netty.NettyRpcProxy; import org.apache.raft.protocol.RaftClientReply; import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.RaftServer; +import org.apache.raft.server.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; import org.apache.raft.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; @@ -75,7 +75,7 @@ public final class NettyRpcService implements RaftServerRpc { } /** Constructs a netty server with the given port. */ - public NettyRpcService(int port, RaftServerImpl server) { + public NettyRpcService(int port, RaftServer server) { this.raftService = new RequestDispatcher(server); this.id = server.getId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-server/src/main/java/org/apache/raft/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServer.java b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java new file mode 100644 index 0000000..7141eca --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/RaftServer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server; + +import org.apache.raft.server.protocol.RaftServerProtocol; + +import java.io.Closeable; + +/** Raft server interface */ +public interface RaftServer extends RaftServerProtocol, Closeable { + /** @return the server ID. */ + String getId(); + + /** Set server RPC service. */ + void setServerRpc(RaftServerRpc serverRpc); + + /** Start this server. */ + void start(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java new file mode 100644 index 0000000..de81ec2 --- /dev/null +++ b/raft-server/src/main/java/org/apache/raft/server/RaftServerRpc.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.raft.server; + +import org.apache.raft.protocol.RaftPeer; +import org.apache.raft.shaded.proto.RaftProtos.*; + +import java.io.IOException; +import java.net.InetSocketAddress; + +public interface RaftServerRpc { + void start(); + + void shutdown(); + + InetSocketAddress getInetSocketAddress(); + + AppendEntriesReplyProto sendAppendEntries( + AppendEntriesRequestProto request) throws IOException; + + InstallSnapshotReplyProto sendInstallSnapshot( + InstallSnapshotRequestProto request) throws IOException; + + RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) + throws IOException; + + /** add information of the given peers */ + void addPeers(Iterable<RaftPeer> peers); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java index 6778683..131e002 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerImpl.java @@ -21,8 +21,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.raft.conf.RaftProperties; import org.apache.raft.protocol.*; +import org.apache.raft.server.RaftServer; import org.apache.raft.server.RaftServerConfigKeys; -import org.apache.raft.server.protocol.RaftServerProtocol; +import org.apache.raft.server.RaftServerRpc; import org.apache.raft.server.protocol.TermIndex; import org.apache.raft.server.storage.FileInfo; import org.apache.raft.shaded.proto.RaftProtos.*; @@ -36,7 +37,6 @@ import org.apache.raft.util.RaftUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; @@ -48,7 +48,7 @@ import java.util.concurrent.CompletableFuture; import static org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; import static org.apache.raft.util.LifeCycle.State.*; -public class RaftServerImpl implements RaftServerProtocol, Closeable { +public class RaftServerImpl implements RaftServer { public static final Logger LOG = LoggerFactory.getLogger(RaftServerImpl.class); private static final String CLASS_NAME = RaftServerImpl.class.getSimpleName(); @@ -209,6 +209,7 @@ public class RaftServerImpl implements RaftServerProtocol, Closeable { }); } + @VisibleForTesting public boolean isAlive() { return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java b/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java deleted file mode 100644 index fb74a0e..0000000 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RaftServerRpc.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.server.impl; - -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.shaded.proto.RaftProtos.*; - -import java.io.IOException; -import java.net.InetSocketAddress; - -public interface RaftServerRpc { - void start(); - - void shutdown(); - - InetSocketAddress getInetSocketAddress(); - - AppendEntriesReplyProto sendAppendEntries( - AppendEntriesRequestProto request) throws IOException; - - InstallSnapshotReplyProto sendInstallSnapshot( - InstallSnapshotRequestProto request) throws IOException; - - RequestVoteReplyProto sendRequestVote(RequestVoteRequestProto request) - throws IOException; - - /** add information of the given peers */ - void addPeers(Iterable<RaftPeer> peers); -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java ---------------------------------------------------------------------- diff --git a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java index b897afd..39a4ac8 100644 --- a/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java +++ b/raft-server/src/main/java/org/apache/raft/server/impl/RequestDispatcher.java @@ -17,7 +17,9 @@ */ package org.apache.raft.server.impl; +import com.google.common.base.Preconditions; import org.apache.raft.protocol.*; +import org.apache.raft.server.RaftServer; import org.apache.raft.server.protocol.RaftServerProtocol; import org.apache.raft.shaded.proto.RaftProtos.*; import org.apache.raft.statemachine.StateMachine; @@ -47,9 +49,10 @@ public class RequestDispatcher implements RaftClientProtocol, RaftServerProtocol private final RaftServerImpl server; private final StateMachine stateMachine; - public RequestDispatcher(RaftServerImpl server) { - this.server = server; - this.stateMachine = server.getStateMachine(); + public RequestDispatcher(RaftServer server) { + Preconditions.checkArgument(server instanceof RaftServerImpl); + this.server = (RaftServerImpl)server; + this.stateMachine = this.server.getStateMachine(); } public CompletableFuture<RaftClientReply> handleClientRequest( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java index 45cec15..c66ef8f 100644 --- a/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java +++ b/raft-server/src/test/java/org/apache/raft/MiniRaftCluster.java @@ -27,7 +27,7 @@ import org.apache.raft.server.RaftServerConfigKeys; import org.apache.raft.server.impl.DelayLocalExecutionInjection; import org.apache.raft.server.impl.RaftConfiguration; import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.RaftServerRpc; import org.apache.raft.server.storage.MemoryRaftLog; import org.apache.raft.server.storage.RaftLog; import org.apache.raft.statemachine.BaseStateMachine; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/56e9b719/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java index 93e3f5c..cc3fb35 100644 --- a/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java +++ b/raft-server/src/test/java/org/apache/raft/server/simulation/SimulatedServerRpc.java @@ -23,7 +23,7 @@ import org.apache.raft.protocol.RaftClientRequest; import org.apache.raft.protocol.RaftPeer; import org.apache.raft.protocol.SetConfigurationRequest; import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.impl.RaftServerRpc; +import org.apache.raft.server.RaftServerRpc; import org.apache.raft.server.impl.RequestDispatcher; import org.apache.raft.shaded.proto.RaftProtos.*; import org.slf4j.Logger; @@ -36,7 +36,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -public class SimulatedServerRpc implements RaftServerRpc { +class SimulatedServerRpc implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); private final RaftServerImpl server; @@ -46,7 +46,7 @@ public class SimulatedServerRpc implements RaftServerRpc { private final ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder().setDaemon(true).build()); - public SimulatedServerRpc(RaftServerImpl server, + SimulatedServerRpc(RaftServerImpl server, SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { this.server = server;