Repository: incubator-ratis Updated Branches: refs/heads/master 718fa9ea3 -> 8ac50a721
RATIS-22. Use a factory method to create RaftClientRequestSender. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8ac50a72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8ac50a72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8ac50a72 Branch: refs/heads/master Commit: 8ac50a721505076dc3a7f7dc5416ed27a3273e8a Parents: 718fa9e Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Tue Feb 28 14:39:22 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Tue Feb 28 14:39:22 2017 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/client/ClientFactory.java | 35 ++++++++++++++ .../org/apache/ratis/client/RaftClient.java | 21 +++------ .../ratis/client/impl/RaftClientImpl.java | 24 ++++------ .../org/apache/ratis/rpc/SupportedRpcType.java | 2 +- .../java/org/apache/ratis/util/RaftUtils.java | 17 +++++++ .../java/org/apache/ratis/grpc/GrpcFactory.java | 49 ++++++++++++++++++++ .../grpc/client/RaftClientSenderWithGrpc.java | 19 ++------ .../ratis/grpc/server/GrpcServerFactory.java | 42 ----------------- .../ratis/grpc/MiniRaftClusterWithGRpc.java | 7 --- .../apache/ratis/hadooprpc/HadoopFactory.java | 9 +++- .../client/HadoopClientRequestSender.java | 5 +- .../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 8 +--- .../org/apache/ratis/netty/NettyFactory.java | 9 +++- .../netty/client/NettyClientRequestSender.java | 16 ++----- .../ratis/netty/MiniRaftClusterWithNetty.java | 7 --- .../java/org/apache/ratis/MiniRaftCluster.java | 11 +++-- .../MiniRaftClusterWithSimulatedRpc.java | 9 +--- .../ratis/server/simulation/SimulatedRpc.java | 12 ++++- 18 files changed, 165 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java new file mode 100644 index 0000000..b775319 --- /dev/null +++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.client; + +import org.apache.ratis.rpc.RpcFactory; + +/** A factory interface for creating client components. */ +public interface ClientFactory extends RpcFactory { + static ClientFactory cast(RpcFactory rpcFactory) { + if (rpcFactory instanceof ClientFactory) { + return (ClientFactory)rpcFactory; + } + throw new ClassCastException("Cannot cast " + rpcFactory.getClass() + + " to " + ClientFactory.class + + "; rpc type is " + rpcFactory.getRpcType()); + } + + /** Create a {@link RaftClientRequestSender}. */ + RaftClientRequestSender newRaftClientRequestSender(); +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 41022e2..4f86d40 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -17,20 +17,16 @@ */ package org.apache.ratis.client; -import com.google.common.base.Preconditions; import org.apache.ratis.client.impl.ClientImplUtils; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; import java.util.Collection; +import java.util.Objects; /** A client who sends requests to a raft service. */ public interface RaftClient extends Closeable { @@ -74,22 +70,19 @@ public interface RaftClient extends Closeable { /** @return a {@link RaftClient} object. */ public RaftClient build() { - Preconditions.checkNotNull(requestSender); - Preconditions.checkNotNull(servers); - if (clientId == null) { clientId = ClientId.createId(); } - if (leaderId == null) { - leaderId = servers.iterator().next().getId(); //use the first peer - } if (properties != null) { retryInterval = properties.getInt( RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_KEY, RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT); } - return ClientImplUtils.newRaftClient(clientId, servers, leaderId, - requestSender, retryInterval); + return ClientImplUtils.newRaftClient(clientId, + Objects.requireNonNull(servers, "The 'server' field is not initialized."), + leaderId, + Objects.requireNonNull(requestSender, "The 'requestSender' field is not initialized."), + retryInterval); } /** Set {@link RaftClient} ID. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index d29eebe..ff49ab6 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -26,16 +26,13 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.Arrays; import java.util.Collection; -import java.util.Map; -import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; /** A client who sends requests to a raft service. */ final class RaftClientImpl implements RaftClient { private final ClientId clientId; private final RaftClientRequestSender requestSender; - private final Map<RaftPeerId, RaftPeer> peers; + private final Collection<RaftPeer> peers; private final int retryInterval; private volatile RaftPeerId leaderId; @@ -45,10 +42,11 @@ final class RaftClientImpl implements RaftClient { int retryInterval) { this.clientId = clientId; this.requestSender = requestSender; - this.peers = peers.stream().collect( - Collectors.toMap(RaftPeer::getId, Function.identity())); + this.peers = peers; this.leaderId = leaderId != null? leaderId : peers.iterator().next().getId(); this.retryInterval = retryInterval; + + requestSender.addServers(peers); } @Override @@ -122,20 +120,18 @@ final class RaftClientImpl implements RaftClient { private void handleNotLeaderException(RaftClientRequest request, NotLeaderException nle) { - refreshPeers(nle.getPeers()); + refreshPeers(Arrays.asList(nle.getPeers())); final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null : nle.getSuggestedLeader().getId(); handleIOException(request, nle, newLeader); } - private void refreshPeers(RaftPeer[] newPeers) { - if (newPeers != null && newPeers.length > 0) { + private void refreshPeers(Collection<RaftPeer> newPeers) { + if (newPeers != null && newPeers.size() > 0) { peers.clear(); - for (RaftPeer p : newPeers) { - peers.put(p.getId(), p); - } + peers.addAll(newPeers); // also refresh the rpc proxies for these peers - requestSender.addServers(Arrays.asList(newPeers)); + requestSender.addServers(newPeers); } } @@ -144,7 +140,7 @@ final class RaftClientImpl implements RaftClient { LOG.debug("{}: Failed with {}", clientId, ioe); final RaftPeerId oldLeader = request.getServerId(); if (newLeader == null && oldLeader.equals(leaderId)) { - newLeader = RaftUtils.next(oldLeader, peers.keySet()); + newLeader = RaftUtils.next(oldLeader, RaftUtils.as(peers, RaftPeer::getId)); } if (newLeader != null && oldLeader.equals(leaderId)) { LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java index d222495..dcba59b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java +++ b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java @@ -23,7 +23,7 @@ import org.apache.ratis.util.RaftUtils; /** The RPC types supported. */ public enum SupportedRpcType implements RpcType { NETTY("org.apache.ratis.netty.NettyFactory"), - GRPC("org.apache.ratis.grpc.server.GrpcServerFactory"), + GRPC("org.apache.ratis.grpc.GrpcFactory"), HADOOP("org.apache.ratis.hadooprpc.HadoopFactory"); /** Same as {@link #valueOf(String)} except that this method is case insensitive. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java index 5f62f47..7f5703d 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; import java.util.function.Supplier; public abstract class RaftUtils { @@ -305,4 +306,20 @@ public abstract class RaftUtils { } return first; } + + public static <INPUT, OUTPUT> Iterable<OUTPUT> as( + Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) { + final Iterator<INPUT> i = iteration.iterator(); + return () -> new Iterator<OUTPUT>() { + @Override + public boolean hasNext() { + return i.hasNext(); + } + + @Override + public OUTPUT next() { + return converter.apply(i.next()); + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java new file mode 100644 index 0000000..df69490 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc; +import org.apache.ratis.grpc.server.GRpcLogAppender; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.impl.*; + +public class GrpcFactory implements ServerFactory, ClientFactory { + @Override + public SupportedRpcType getRpcType() { + return SupportedRpcType.GRPC; + } + + @Override + public LogAppender newLogAppender(RaftServerImpl server, LeaderState state, + FollowerInfo f) { + return new GRpcLogAppender(server, state, f); + } + + @Override + public RaftGRpcService newRaftServerRpc(RaftServerImpl server) { + return RaftGRpcService.newBuilder() + .setServer(server) + .build(); + } + + @Override + public RaftClientSenderWithGrpc newRaftClientRequestSender() { + return new RaftClientSenderWithGrpc(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java index 50b05da..9a0eca3 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java @@ -17,41 +17,32 @@ */ package org.apache.ratis.grpc.client; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.RaftUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.ratis.client.impl.ClientProtoUtils.*; - import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import static org.apache.ratis.client.impl.ClientProtoUtils.*; + public class RaftClientSenderWithGrpc implements RaftClientRequestSender { public static final Logger LOG = LoggerFactory.getLogger(RaftClientSenderWithGrpc.class); private final PeerProxyMap<RaftClientProtocolClient> proxies = new PeerProxyMap<>(RaftClientProtocolClient::new); - public RaftClientSenderWithGrpc(Collection<RaftPeer> peers) { - addServers(peers); - } - @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java deleted file mode 100644 index 9c1d6f0..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.server; - -import org.apache.ratis.grpc.RaftGRpcService; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.server.impl.*; - -public class GrpcServerFactory implements ServerFactory { - @Override - public SupportedRpcType getRpcType() { - return SupportedRpcType.GRPC; - } - - @Override - public LogAppender newLogAppender(RaftServerImpl server, LeaderState state, - FollowerInfo f) { - return new GRpcLogAppender(server, state, f); - } - - @Override - public RaftGRpcService newRaftServerRpc(RaftServerImpl server) { - return RaftGRpcService.newBuilder() - .setServer(server) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java index 186cac6..4c7d74d 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java @@ -20,9 +20,7 @@ package org.apache.ratis.grpc; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; @@ -60,11 +58,6 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { } @Override - public RaftClientRequestSender getRaftClientRequestSender() { - return new RaftClientSenderWithGrpc(getPeers()); - } - - @Override protected Collection<RaftPeer> addNewPeers( Collection<RaftServerImpl> newServers, boolean startService) { final Collection<RaftPeer> peers = toRaftPeers(newServers); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java index 063858a..9ff493f 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java @@ -18,12 +18,14 @@ package org.apache.ratis.hadooprpc; import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender; import org.apache.ratis.hadooprpc.server.HadoopRpcService; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; -public class HadoopFactory extends ServerFactory.BaseFactory { +public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFactory { private Configuration conf; public void setConf(Configuration conf) { @@ -42,4 +44,9 @@ public class HadoopFactory extends ServerFactory.BaseFactory { .setConf(conf) .build(); } + + @Override + public HadoopClientRequestSender newRaftClientRequestSender() { + return new HadoopClientRequestSender(conf); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java index 918a191..1a10dab 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java @@ -24,17 +24,14 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.util.PeerProxyMap; import java.io.IOException; -import java.util.Collection; public class HadoopClientRequestSender implements RaftClientRequestSender { private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies; - public HadoopClientRequestSender( - Collection<RaftPeer> peers, final Configuration conf) { + public HadoopClientRequestSender(final Configuration conf) { this.proxies = new PeerProxyMap<>( p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); - proxies.addPeers(peers); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java index c09c300..76acafd 100644 --- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -21,9 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender; import org.apache.ratis.hadooprpc.server.HadoopRpcServerConfigKeys; import org.apache.ratis.hadooprpc.server.HadoopRpcService; import org.apache.ratis.protocol.RaftPeerId; @@ -69,6 +67,7 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { super(ids, properties, formatted); this.hadoopConf = hadoopConf; getServers().stream().forEach(s -> setConf(s)); + ((HadoopFactory)clientFactory).setConf(hadoopConf); } private void setConf(RaftServerImpl server) { @@ -88,11 +87,6 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { } @Override - public RaftClientRequestSender getRaftClientRequestSender() { - return new HadoopClientRequestSender(getPeers(), hadoopConf); - } - - @Override public void blockQueueAndSetDelay(String leaderId, int delayMs) throws InterruptedException { RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java index ac71f44..fb27eaa 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java @@ -17,12 +17,14 @@ */ package org.apache.ratis.netty; +import org.apache.ratis.client.ClientFactory; +import org.apache.ratis.netty.client.NettyClientRequestSender; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.impl.ServerFactory; -public class NettyFactory extends ServerFactory.BaseFactory { +public class NettyFactory extends ServerFactory.BaseFactory implements ClientFactory { @Override public SupportedRpcType getRpcType() { return SupportedRpcType.NETTY; @@ -32,4 +34,9 @@ public class NettyFactory extends ServerFactory.BaseFactory { public NettyRpcService newRaftServerRpc(RaftServerImpl server) { return NettyRpcService.newBuilder().setServer(server).build(); } + + @Override + public NettyClientRequestSender newRaftClientRequestSender() { + return new NettyClientRequestSender(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java index 1604b5c..5b36fde 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java @@ -17,28 +17,20 @@ */ package org.apache.ratis.netty.client; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.netty.NettyRpcProxy; +import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; -import org.apache.ratis.client.RaftClientRequestSender; -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.netty.NettyRpcProxy; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.SetConfigurationRequest; import java.io.IOException; public class NettyClientRequestSender implements RaftClientRequestSender { private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap(); - public NettyClientRequestSender(Iterable<RaftPeer> servers) { - addServers(servers); - } - @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java index d6d1dc9..29857dd 100644 --- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -20,9 +20,7 @@ package org.apache.ratis.netty; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.netty.client.NettyClientRequestSender; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; @@ -56,11 +54,6 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { } @Override - public RaftClientRequestSender getRaftClientRequestSender() { - return new NettyClientRequestSender(getPeers()); - } - - @Override protected void blockQueueAndSetDelay(String leaderId, int delayMs) throws InterruptedException { RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index afd6eca..751854c 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -18,11 +18,12 @@ package org.apache.ratis; import com.google.common.base.Preconditions; +import org.apache.ratis.client.ClientFactory; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.*; import org.apache.ratis.server.storage.MemoryRaftLog; @@ -140,6 +141,7 @@ public abstract class MiniRaftCluster { return ids; } + protected final ClientFactory clientFactory; protected RaftConfiguration conf; protected final RaftProperties properties; private final String testBaseDir; @@ -150,6 +152,9 @@ public abstract class MiniRaftCluster { boolean formatted) { this.conf = initConfiguration(ids); this.properties = new RaftProperties(properties); + + final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); + this.clientFactory = ClientFactory.cast(rpcType.newFactory(properties)); this.testBaseDir = getBaseDirectory(); conf.getPeers().forEach( @@ -221,8 +226,6 @@ public abstract class MiniRaftCluster { return RaftUtils.newInstance(smClass); } - public abstract RaftClientRequestSender getRaftClientRequestSender(); - public static Collection<RaftPeer> toRaftPeers( Collection<RaftServerImpl> servers) { return servers.stream() @@ -391,7 +394,7 @@ public abstract class MiniRaftCluster { return RaftClient.newBuilder() .setServers(conf.getPeers()) .setLeaderId(leaderId) - .setRequestSender(getRaftClientRequestSender()) + .setRequestSender(clientFactory.newRaftClientRequestSender()) .setProperties(properties) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java index c978bbe..11c4c0a 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java @@ -19,11 +19,9 @@ package org.apache.ratis.server.simulation; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftConfigKeys; -import org.apache.ratis.client.RaftClientRequestSender; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.impl.RaftServerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +68,8 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { client2serverRequestReply = new SimulatedClientRequestReply(simulateLatencyMs); getServers().stream().forEach(s -> initRpc(s)); addPeersToRpc(toRaftPeers(getServers())); + ((SimulatedRpc.Factory)clientFactory).initRpc( + serverRequestReply, client2serverRequestReply); } private void initRpc(RaftServerImpl s) { @@ -111,11 +111,6 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster { } @Override - public RaftClientRequestSender getRaftClientRequestSender() { - return client2serverRequestReply; - } - - @Override public void blockQueueAndSetDelay(String leaderId, int delayMs) throws InterruptedException { // block leader sendRequest if delayMs > 0 http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8ac50a72/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java index ec85661..9d855c3 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.simulation; +import org.apache.ratis.client.ClientFactory; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.rpc.RpcFactory; import org.apache.ratis.rpc.RpcType; @@ -38,7 +39,7 @@ class SimulatedRpc implements RpcType { return new Factory(); } - static class Factory extends ServerFactory.BaseFactory { + static class Factory extends ServerFactory.BaseFactory implements ClientFactory { private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply; private SimulatedClientRequestReply client2serverRequestReply; @@ -51,7 +52,14 @@ class SimulatedRpc implements RpcType { @Override public SimulatedServerRpc newRaftServerRpc(RaftServerImpl server) { - return new SimulatedServerRpc(server, serverRequestReply, client2serverRequestReply); + return new SimulatedServerRpc(server, + Objects.requireNonNull(serverRequestReply), + Objects.requireNonNull(client2serverRequestReply)); + } + + @Override + public SimulatedClientRequestReply newRaftClientRequestSender() { + return Objects.requireNonNull(client2serverRequestReply); } @Override
