Repository: incubator-ratis Updated Branches: refs/heads/master 2a781c220 -> 7edcd52b0
RATIS-121. In RaftServer.Builder, allow serverId and group to be initialized automatically. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7edcd52b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7edcd52b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7edcd52b Branch: refs/heads/master Commit: 7edcd52b0c90a2af9b9e25e84f71adf0d2c42506 Parents: 2a781c2 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Tue Oct 24 13:12:33 2017 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Tue Oct 24 14:09:04 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/protocol/RaftGroup.java | 6 ++ .../org/apache/ratis/protocol/RaftGroupId.java | 5 ++ .../test/java/org/apache/ratis/BaseTest.java | 6 +- .../arithmetic/expression/TestExpression.java | 4 +- .../org/apache/ratis/grpc/RaftGRpcService.java | 17 +++-- .../grpc/client/RaftClientProtocolService.java | 15 ++-- .../ratis/grpc/server/AdminProtocolService.java | 7 +- .../grpc/server/RaftServerProtocolService.java | 33 ++++----- .../hadooprpc/server/HadoopRpcService.java | 27 +++---- .../ratis/netty/server/NettyRpcService.java | 45 +++++------- .../org/apache/ratis/server/RaftServer.java | 6 +- .../ratis/server/impl/RaftServerProxy.java | 19 +++-- .../server/impl/RaftServerRpcWithProxy.java | 76 ++++++++++++++++++++ .../ratis/server/impl/ServerImplUtils.java | 10 ++- .../org/apache/ratis/protocol/TestRaftId.java | 4 +- .../ratis/server/simulation/RequestHandler.java | 17 +++-- .../server/simulation/SimulatedServerRpc.java | 10 +-- 17 files changed, 197 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java index 1096518..f4a16c4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java @@ -26,6 +26,12 @@ import java.util.*; * peers. */ public class RaftGroup { + private static RaftGroup EMPTY_GROUP = new RaftGroup(RaftGroupId.emptyGroupId(), Collections.emptyList()); + + public static RaftGroup emptyGroup() { + return EMPTY_GROUP; + } + /** UTF-8 string as id */ private final RaftGroupId groupId; /** The group of raft peers */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java index 0ddc7dc..e873ab8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroupId.java @@ -22,6 +22,11 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString; import java.util.UUID; public class RaftGroupId extends RaftId { + private static final RaftGroupId EMPTY_GROUP_ID = new RaftGroupId(new UUID(0L, 0L)); + + public static RaftGroupId emptyGroupId() { + return EMPTY_GROUP_ID; + } public static RaftGroupId randomId() { return new RaftGroupId(UUID.randomUUID()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java index 2b308b5..1c27420 100644 --- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java +++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java @@ -43,10 +43,10 @@ public abstract class BaseTest { } @Rule - public final Timeout globalTimeout = new Timeout(getGlobalTimeoutMs()); + public final Timeout globalTimeout = new Timeout(getGlobalTimeoutSeconds() * 1000); - public int getGlobalTimeoutMs() { - return 100_000; + public int getGlobalTimeoutSeconds() { + return 100; } private static final Supplier<File> rootTestDir = JavaUtils.memoize( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java index a21c11f..6996fe9 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java @@ -27,8 +27,8 @@ import java.util.concurrent.ThreadLocalRandom; public class TestExpression extends BaseTest { @Override - public int getGlobalTimeoutMs() { - return 1000; + public int getGlobalTimeoutSeconds() { + return 1; } @Test http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 96a7a45..853830e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Supplier; /** A grpc implementation of {@link RaftServerRpc}. */ public class RaftGRpcService implements RaftServerRpc { @@ -70,7 +71,7 @@ public class RaftGRpcService implements RaftServerRpc { private final InetSocketAddress address; private final Map<RaftPeerId, RaftServerProtocolClient> peers = Collections.synchronizedMap(new HashMap<>()); - private final RaftPeerId selfId; + private final Supplier<RaftPeerId> idSupplier; private RaftGRpcService(RaftServer server) { this(server, @@ -79,11 +80,11 @@ public class RaftGRpcService implements RaftServerRpc { } private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) { ServerBuilder serverBuilder = ServerBuilder.forPort(port); - selfId = raftServer.getId(); + idSupplier = raftServer::getId; server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) - .addService(new RaftServerProtocolService(selfId, raftServer)) - .addService(new RaftClientProtocolService(selfId, raftServer)) - .addService(new AdminProtocolService(selfId, raftServer)) + .addService(new RaftServerProtocolService(idSupplier, raftServer)) + .addService(new RaftClientProtocolService(idSupplier, raftServer)) + .addService(new AdminProtocolService(raftServer)) .build(); // start service to determine the port (in case port is configured as 0) @@ -92,6 +93,10 @@ public class RaftGRpcService implements RaftServerRpc { LOG.info("Server started, listening on " + address.getPort()); } + RaftPeerId getId() { + return idSupplier.get(); + } + @Override public SupportedRpcType getRpcType() { return SupportedRpcType.GRPC; @@ -145,7 +150,7 @@ public class RaftGRpcService implements RaftServerRpc { @Override public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { - CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId, + CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), null, request); RaftServerProtocolClient target = Objects.requireNonNull( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java index 1cd913a..5d68d42 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase { static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); @@ -66,14 +67,18 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase } private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE); - private final RaftPeerId id; + private final Supplier<RaftPeerId> idSupplier; private final RaftClientAsynchronousProtocol protocol; - public RaftClientProtocolService(RaftPeerId id, RaftClientAsynchronousProtocol protocol) { - this.id = id; + public RaftClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) { + this.idSupplier = idSupplier; this.protocol = protocol; } + RaftPeerId getId() { + return idSupplier.get(); + } + @Override public void setConfiguration(SetConfigurationRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) { @@ -145,7 +150,7 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase }); } catch (Throwable e) { LOG.info("{} got exception when handling client append request {}: {}", - id, request.getRpcRequest(), e); + getId(), request.getRpcRequest(), e); responseObserver.onError(RaftGrpcUtil.wrapException(e)); } } @@ -165,7 +170,7 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase @Override public void onError(Throwable t) { // for now we just log a msg - LOG.warn("{} onError: client Append cancelled", id, t); + LOG.warn("{} onError: client Append cancelled", getId(), t); synchronized (pendingList) { pendingList.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java index d2aae53..e4d169a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java @@ -20,22 +20,19 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.AdminAsynchronousProtocol; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.ReinitializeRequest; import org.apache.ratis.protocol.ServerInformatonRequest; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto; import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase; public class AdminProtocolService extends AdminProtocolServiceImplBase { - private final RaftPeerId id; private final AdminAsynchronousProtocol protocol; - public AdminProtocolService(RaftPeerId id, AdminAsynchronousProtocol protocol) { - this.id = id; + public AdminProtocolService(AdminAsynchronousProtocol protocol) { this.protocol = protocol; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java index 3e5ae0d..8c2f31f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java @@ -21,27 +21,28 @@ import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.Supplier; + public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase { public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class); - private final RaftPeerId id; + private final Supplier<RaftPeerId> idSupplier; private final RaftServerProtocol server; - public RaftServerProtocolService(RaftPeerId id, RaftServerProtocol server) { - this.id = id; + public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServerProtocol server) { + this.idSupplier = idSupplier; this.server = server; } + RaftPeerId getId() { + return idSupplier.get(); + } + @Override public void requestVote(RequestVoteRequestProto request, StreamObserver<RequestVoteReplyProto> responseObserver) { @@ -51,7 +52,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase responseObserver.onCompleted(); } catch (Throwable e) { LOG.info("{} got exception when handling requestVote {}: {}", - id, request.getServerRequest(), e); + getId(), request.getServerRequest(), e); responseObserver.onError(RaftGrpcUtil.wrapException(e)); } } @@ -67,7 +68,7 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase responseObserver.onNext(reply); } catch (Throwable e) { LOG.info("{} got exception when handling appendEntries {}: {}", - id, request.getServerRequest(), e); + getId(), request.getServerRequest(), e); responseObserver.onError(RaftGrpcUtil.wrapException(e)); } } @@ -75,12 +76,12 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase @Override public void onError(Throwable t) { // for now we just log a msg - LOG.info("{}: appendEntries on error. Exception: {}", id, t); + LOG.info("{}: appendEntries on error. Exception: {}", getId(), t); } @Override public void onCompleted() { - LOG.info("{}: appendEntries completed", id); + LOG.info("{}: appendEntries completed", getId()); responseObserver.onCompleted(); } }; @@ -97,19 +98,19 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase responseObserver.onNext(reply); } catch (Throwable e) { LOG.info("{} got exception when handling installSnapshot {}: {}", - id, request.getServerRequest(), e); + getId(), request.getServerRequest(), e); responseObserver.onError(RaftGrpcUtil.wrapException(e)); } } @Override public void onError(Throwable t) { - LOG.info("{}: installSnapshot on error. Exception: {}", id, t); + LOG.info("{}: installSnapshot on error. Exception: {}", getId(), t); } @Override public void onCompleted() { - LOG.info("{}: installSnapshot completed", id); + LOG.info("{}: installSnapshot completed", getId()); responseObserver.onCompleted(); } }; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index 1fea9ff..5e571d4 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -24,11 +24,11 @@ import org.apache.ratis.hadooprpc.HadoopConfigKeys; import org.apache.ratis.hadooprpc.Proxy; import org.apache.ratis.hadooprpc.client.CombinedClientProtocolPB; import org.apache.ratis.hadooprpc.client.CombinedClientProtocolServerSideTranslatorPB; -import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.RaftServerRpcWithProxy; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.shaded.com.google.protobuf.BlockingService; import org.apache.ratis.shaded.com.google.protobuf.ByteString; @@ -47,7 +47,7 @@ import java.io.IOException; import java.net.InetSocketAddress; /** Server side Hadoop RPC service. */ -public class HadoopRpcService implements RaftServerRpc { +public class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftServerProtocolPB>, PeerProxyMap<Proxy<RaftServerProtocolPB>>> { public static final Logger LOG = LoggerFactory.getLogger(HadoopRpcService.class); static final String CLASS_NAME = HadoopRpcService.class.getSimpleName(); public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; @@ -84,16 +84,13 @@ public class HadoopRpcService implements RaftServerRpc { return new Builder(); } - private final RaftPeerId id; private final RPC.Server ipcServer; private final InetSocketAddress ipcServerAddress; - private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies; - private HadoopRpcService(RaftServer server, final Configuration conf) { - this.id = server.getId(); - this.proxies = new PeerProxyMap<>(id.toString(), - p -> new Proxy<>(RaftServerProtocolPB.class, p.getAddress(), conf)); + super(server::getId, + id -> new PeerProxyMap<>(id.toString(), + p -> new Proxy<>(RaftServerProtocolPB.class, p.getAddress(), conf))); try { this.ipcServer = newRpcServer(server, conf); } catch (IOException e) { @@ -147,13 +144,14 @@ public class HadoopRpcService implements RaftServerRpc { } @Override - public void start() { + public void startImpl() { ipcServer.start(); } @Override - public void close() { + public void closeImpl() { ipcServer.stop(); + super.closeImpl(); } @Override @@ -181,9 +179,9 @@ public class HadoopRpcService implements RaftServerRpc { REQUEST request, ByteString replyId, CheckedFunction<RaftServerProtocolPB, REPLY, ServiceException> f) throws IOException { - CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request); - final RaftServerProtocolPB proxy = proxies.getProxy( + final RaftServerProtocolPB proxy = getProxies().getProxy( RaftPeerId.valueOf(replyId)).getProtocol(); try { return f.apply(proxy); @@ -191,9 +189,4 @@ public class HadoopRpcService implements RaftServerRpc { throw ProtoUtils.toIOException(se); } } - - @Override - public void addPeers(Iterable<RaftPeer> peers) { - proxies.addPeers(peers); - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 8089821..cb337b5 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -27,6 +27,7 @@ import org.apache.ratis.protocol.ServerInformationReply; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.RaftServerRpcWithProxy; import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap; import org.apache.ratis.shaded.io.netty.channel.*; import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup; @@ -43,6 +44,7 @@ import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyExceptionReplyPr import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto; import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.ProtoUtils; @@ -50,11 +52,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Objects; +import java.util.function.Supplier; /** * A netty server endpoint that acts as the communication layer. */ -public final class NettyRpcService implements RaftServerRpc { +public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy, NettyRpcProxy.PeerMap> { static final String CLASS_NAME = NettyRpcService.class.getSimpleName(); public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; @@ -76,16 +79,12 @@ public final class NettyRpcService implements RaftServerRpc { return new Builder(); } - private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); private final RaftServer server; - private final RaftPeerId id; private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workerGroup = new NioEventLoopGroup(); private final ChannelFuture channelFuture; - private final NettyRpcProxy.PeerMap proxies; - @ChannelHandler.Sharable class InboundHandler extends SimpleChannelInboundHandler<RaftNettyServerRequestProto> { @Override @@ -97,9 +96,8 @@ public final class NettyRpcService implements RaftServerRpc { /** Constructs a netty server with the given port. */ private NettyRpcService(RaftServer server) { + super(server::getId, id -> new NettyRpcProxy.PeerMap(id.toString())); this.server = server; - this.id = server.getId(); - this.proxies = new NettyRpcProxy.PeerMap(id.toString()); final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @@ -135,19 +133,17 @@ public final class NettyRpcService implements RaftServerRpc { } @Override - public void start() { - lifeCycle.startAndTransition(() -> channelFuture.syncUninterruptibly()); + public void startImpl() { + channelFuture.syncUninterruptibly(); } @Override - public void close() { - lifeCycle.checkStateAndClose(() -> { - bossGroup.shutdownGracefully(); - workerGroup.shutdownGracefully(); - final ChannelFuture f = getChannel().close(); - proxies.close(); - f.syncUninterruptibly(); - }); + public void closeImpl() { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + final ChannelFuture f = getChannel().close(); + super.closeImpl(); + f.syncUninterruptibly(); } @Override @@ -247,7 +243,7 @@ public final class NettyRpcService implements RaftServerRpc { @Override public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { - CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request); final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() .setRequestVoteRequest(request) @@ -258,7 +254,7 @@ public final class NettyRpcService implements RaftServerRpc { @Override public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException { - CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request); final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() .setAppendEntriesRequest(request) @@ -269,7 +265,7 @@ public final class NettyRpcService implements RaftServerRpc { @Override public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { - CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request); final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() .setInstallSnapshotRequest(request) @@ -282,17 +278,12 @@ public final class NettyRpcService implements RaftServerRpc { RaftRpcRequestProto request, RaftNettyServerRequestProto proto) throws IOException { final RaftPeerId id = RaftPeerId.valueOf(request.getReplyId()); - final NettyRpcProxy p = proxies.getProxy(id); + final NettyRpcProxy p = getProxies().getProxy(id); try { return p.send(request, proto); } catch (ClosedChannelException cce) { - proxies.resetProxy(id); + getProxies().resetProxy(id); throw cce; } } - - @Override - public void addPeers(Iterable<RaftPeer> peers) { - proxies.addPeers(peers); - } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index 622e75a..a3d9d91 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -61,15 +61,15 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, class Builder { private RaftPeerId serverId; private StateMachine stateMachine; - private RaftGroup group; + private RaftGroup group = RaftGroup.emptyGroup(); private RaftProperties properties; private Parameters parameters; /** @return a {@link RaftServer} object. */ public RaftServer build() throws IOException { return ServerImplUtils.newRaftServer( - Objects.requireNonNull(serverId, "The 'serverId' field is not initialized."), - Objects.requireNonNull(group, "The 'peers' field is not initialized."), + serverId, + group, Objects.requireNonNull(stateMachine, "The 'stateMachine' is not initialized."), Objects.requireNonNull(properties, "The 'properties' field is not initialized."), parameters); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 0a16954..7f7a2bf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -32,8 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; +import java.net.InetSocketAddress; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -54,14 +55,14 @@ public class RaftServerProxy implements RaftServer { RaftServerProxy(RaftPeerId id, StateMachine stateMachine, RaftGroup group, RaftProperties properties, Parameters parameters) throws IOException { - this.id = id; this.properties = properties; this.stateMachine = stateMachine; final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); this.factory = ServerFactory.cast(rpcType.newFactory(parameters)); - this.serverRpc = initRaftServerRpc(factory, this, group); + this.serverRpc = factory.newRaftServerRpc(this); + this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc)); this.impl = CompletableFuture.completedFuture(initImpl(group)); } @@ -69,14 +70,10 @@ public class RaftServerProxy implements RaftServer { return new RaftServerImpl(id, group, this, properties); } - private static RaftServerRpc initRaftServerRpc( - ServerFactory factory, RaftServer server, RaftGroup group) { - final RaftServerRpc rpc = factory.newRaftServerRpc(server); - // add peers into rpc service - if (group != null) { - rpc.addPeers(group.getPeers()); - } - return rpc; + private static String getIdStringFrom(RaftServerRpc rpc) { + final InetSocketAddress address = rpc.getInetSocketAddress(); + return address != null? address.getHostName() + "_" + address.getPort() + : rpc.getRpcType() + "-" + UUID.randomUUID(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java new file mode 100644 index 0000000..fe41859 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.PeerProxyMap; + +import java.io.Closeable; +import java.util.function.Function; +import java.util.function.Supplier; + +/** Implementing {@link RaftServerRpc} using a {@link PeerProxyMap}. */ +public abstract class RaftServerRpcWithProxy<PROXY extends Closeable, PROXIES extends PeerProxyMap<PROXY>> + implements RaftServerRpc { + private final Supplier<RaftPeerId> idSupplier; + private final Supplier<LifeCycle> lifeCycleSupplier; + private final Supplier<PROXIES> proxiesSupplier; + + public RaftServerRpcWithProxy(Supplier<RaftPeerId> idSupplier, Function<RaftPeerId, PROXIES> proxyCreater) { + this.idSupplier = idSupplier; + this.lifeCycleSupplier = JavaUtils.memoize(() -> new LifeCycle(getId())); + this.proxiesSupplier = JavaUtils.memoize(() -> proxyCreater.apply(getId())); + } + + public RaftPeerId getId() { + return idSupplier.get(); + } + + public LifeCycle getLifeCycle() { + return lifeCycleSupplier.get(); + } + + public PROXIES getProxies() { + return proxiesSupplier.get(); + } + + @Override + public void addPeers(Iterable<RaftPeer> peers) { + getProxies().addPeers(peers); + } + + @Override + public final void start() { + getLifeCycle().startAndTransition(() -> startImpl()); + } + + public abstract void startImpl(); + + @Override + public final void close() { + getLifeCycle().checkStateAndClose(() -> closeImpl()); + } + + public void closeImpl() { + getProxies().close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 544ed13..15ee155 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -33,17 +33,23 @@ public class ServerImplUtils { public static RaftServerProxy newRaftServer( RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftProperties properties, Parameters parameters) throws IOException { + final RaftServerProxy proxy; try { // attempt multiple times to avoid temporary bind exception - return JavaUtils.attempt( + proxy = JavaUtils.attempt( () -> new RaftServerProxy(id, stateMachine, group, properties, parameters), - 5, 500L, "newRaftServer", RaftServerImpl.LOG); + 5, 500L, "new RaftServerProxy", RaftServerProxy.LOG); } catch (InterruptedException e) { throw IOUtils.toInterruptedIOException( "Interrupted when creating RaftServer " + id + ", " + group, e); } catch (IOException e) { throw new IOException("Failed to create RaftServer " + id + ", " + group, e); } + // add peers into rpc service + if (!group.getPeers().isEmpty()) { + proxy.getServerRpc().addPeers(group.getPeers()); + } + return proxy; } public static TermIndex newTermIndex(long term, long index) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java index e83e32a..b454c31 100644 --- a/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java +++ b/ratis-server/src/test/java/org/apache/ratis/protocol/TestRaftId.java @@ -24,8 +24,8 @@ import org.junit.Test; public class TestRaftId extends BaseTest { @Override - public int getGlobalTimeoutMs() { - return 1000; + public int getGlobalTimeoutSeconds() { + return 1; } @Test http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java index bd60a3b..5c12ef4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RequestHandler.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; +import java.util.function.Supplier; public class RequestHandler<REQUEST extends RaftRpcMessage, REPLY extends RaftRpcMessage> { @@ -40,17 +41,17 @@ public class RequestHandler<REQUEST extends RaftRpcMessage, REPLY handleRequest(REQUEST r) throws IOException; } - private final String serverId; + private final Supplier<String> serverIdSupplier; private final String name; private final SimulatedRequestReply<REQUEST, REPLY> rpc; private final HandlerInterface<REQUEST, REPLY> handlerImpl; private final List<HandlerDaemon> daemons; - RequestHandler(String serverId, String name, + RequestHandler(Supplier<String> serverIdSupplier, String name, SimulatedRequestReply<REQUEST, REPLY> rpc, HandlerInterface<REQUEST, REPLY> handlerImpl, int numHandlers) { - this.serverId = serverId; + this.serverIdSupplier = serverIdSupplier; this.name = name; this.rpc = rpc; this.handlerImpl = handlerImpl; @@ -61,12 +62,16 @@ public class RequestHandler<REQUEST extends RaftRpcMessage, } } + private String getServerId() { + return serverIdSupplier.get(); + } + void startDaemon() { daemons.forEach(Thread::start); } void shutdown() { - rpc.shutdown(serverId); + rpc.shutdown(getServerId()); } void interruptAndJoinDaemon() throws InterruptedException { @@ -106,14 +111,14 @@ public class RequestHandler<REQUEST extends RaftRpcMessage, @Override public String toString() { - return serverId + "." + name + id; + return getServerId() + "." + name + id; } @Override public void run() { while (handlerImpl.isAlive()) { try { - handleRequest(rpc.takeRequest(serverId)); + handleRequest(rpc.takeRequest(getServerId())); } catch (InterruptedIOException e) { LOG.info(this + " is interrupted by " + e); LOG.trace("TRACE", e); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7edcd52b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java index 91b7ad5..67e1c25 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java @@ -21,7 +21,6 @@ import org.apache.ratis.RaftTestUtil; import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.Daemon; @@ -35,6 +34,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; class SimulatedServerRpc implements RaftServerRpc { static final Logger LOG = LoggerFactory.getLogger(SimulatedServerRpc.class); @@ -48,10 +48,10 @@ class SimulatedServerRpc implements RaftServerRpc { SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply, SimulatedRequestReply<RaftClientRequest, RaftClientReply> clientRequestReply) { this.server = (RaftServerProxy)server; - this.serverHandler = new RequestHandler<>(server.getId().toString(), - "serverHandler", serverRequestReply, serverHandlerImpl, 3); - this.clientHandler = new RequestHandler<>(server.getId().toString(), - "clientHandler", clientRequestReply, clientHandlerImpl, 3); + + final Supplier<String> id = () -> server.getId().toString(); + this.serverHandler = new RequestHandler<>(id, "serverHandler", serverRequestReply, serverHandlerImpl, 3); + this.clientHandler = new RequestHandler<>(id, "clientHandler", clientRequestReply, clientHandlerImpl, 3); } @Override
