http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java new file mode 100644 index 0000000..aac7c31 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.protocol.*; +import org.apache.ratis.util.PeerProxyMap; + +import java.io.IOException; +import java.util.Collection; + +public class HadoopClientRequestSender implements RaftClientRequestSender { + + private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies; + + public HadoopClientRequestSender( + Collection<RaftPeer> peers, final Configuration conf) { + this.proxies = new PeerProxyMap<>( + p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); + proxies.addPeers(peers); + } + + @Override + public RaftClientReply sendRequest(RaftClientRequest request) + throws IOException { + final String serverId = request.getReplierId(); + final RaftClientProtocolClientSideTranslatorPB proxy = + proxies.getProxy(serverId); + try { + if (request instanceof SetConfigurationRequest) { + return proxy.setConfiguration((SetConfigurationRequest) request); + } else { + return proxy.submitClientRequest(request); + } + } catch (RemoteException e) { + throw e.unwrapRemoteException(StateMachineException.class, + ReconfigurationTimeoutException.class, + ReconfigurationInProgressException.class, RaftException.class); + } + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + proxies.addPeers(servers); + } + + @Override + public void close() { + proxies.close(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..a5c1a13 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.hadooprpc.Proxy; +import org.apache.ratis.protocol.RaftClientProtocol; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.shaded.com.google.protobuf.ServiceException; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.util.ProtoUtils; + +@InterfaceAudience.Private +public class RaftClientProtocolClientSideTranslatorPB + extends Proxy<RaftClientProtocolPB> + implements RaftClientProtocol { + + public RaftClientProtocolClientSideTranslatorPB( + String addressStr, Configuration conf) throws IOException { + super(RaftClientProtocolPB.class, addressStr, conf); + } + + @Override + public RaftClientReply submitClientRequest(RaftClientRequest request) + throws IOException { + final RaftClientRequestProto p = ClientProtoUtils.toRaftClientRequestProto(request); + try { + final RaftClientReplyProto reply = getProtocol().submitClientRequest(null, p); + return ClientProtoUtils.toRaftClientReply(reply); + } catch (ServiceException se) { + throw ProtoUtils.toIOException(se); + } + } + + @Override + public RaftClientReply setConfiguration(SetConfigurationRequest request) + throws IOException { + final SetConfigurationRequestProto p + = ClientProtoUtils.toSetConfigurationRequestProto(request); + try { + final RaftClientReplyProto reply = getProtocol().setConfiguration(null, p); + return ClientProtoUtils.toRaftClientReply(reply); + } catch (ServiceException se) { + throw ProtoUtils.toIOException(se); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java new file mode 100644 index 0000000..908cd99 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.ratis.hadooprpc.HadoopConstants; +import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +@KerberosInfo( + serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY, + clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY) +@ProtocolInfo( + protocolName = HadoopConstants.RAFT_CLIENT_PROTOCOL_NAME, + protocolVersion = 1) +public interface RaftClientProtocolPB extends + RaftClientProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..08cf589 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.protocol.RaftClientProtocol; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.shaded.com.google.protobuf.RpcController; +import org.apache.ratis.shaded.com.google.protobuf.ServiceException; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; + +@InterfaceAudience.Private +public class RaftClientProtocolServerSideTranslatorPB + implements RaftClientProtocolPB { + private final RaftClientProtocol impl; + + public RaftClientProtocolServerSideTranslatorPB(RaftClientProtocol impl) { + this.impl = impl; + } + + @Override + public RaftClientReplyProto submitClientRequest( + RpcController unused, RaftClientRequestProto proto) + throws ServiceException { + final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(proto); + try { + final RaftClientReply reply = impl.submitClientRequest(request); + return ClientProtoUtils.toRaftClientReplyProto(reply); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public RaftClientReplyProto setConfiguration( + RpcController unused, SetConfigurationRequestProto proto) + throws ServiceException { + final SetConfigurationRequest request; + try { + request = ClientProtoUtils.toSetConfigurationRequest(proto); + final RaftClientReply reply = impl.setConfiguration(request); + return ClientProtoUtils.toRaftClientReplyProto(reply); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java new file mode 100644 index 0000000..b7ac64a --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.server; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; +import org.apache.hadoop.ipc.RPC; +import org.apache.ratis.hadooprpc.Proxy; +import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB; +import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; +import org.apache.ratis.protocol.RaftClientProtocol; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.protocol.RaftServerProtocol; +import org.apache.ratis.shaded.com.google.protobuf.BlockingService; +import org.apache.ratis.shaded.com.google.protobuf.ServiceException; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService; +import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.PeerProxyMap; +import org.apache.ratis.util.ProtoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** Server side Hadoop RPC service. */ +public class HadoopRpcService implements RaftServerRpc { + public static final Logger LOG = LoggerFactory.getLogger(HadoopRpcService.class); + static final String CLASS_NAME = HadoopRpcService.class.getSimpleName(); + public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; + + private final String id; + private final RPC.Server ipcServer; + private final InetSocketAddress ipcServerAddress; + + private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies; + + public HadoopRpcService(RaftServer server, final Configuration conf) + throws IOException { + this.proxies = new PeerProxyMap<>( + p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf)); + this.id = server.getId(); + this.ipcServer = newRpcServer(server, conf); + this.ipcServerAddress = ipcServer.getListenerAddress(); + + addRaftClientProtocol(server, conf); + + LOG.info(getClass().getSimpleName() + " created RPC.Server at " + + ipcServerAddress); + } + + @Override + public InetSocketAddress getInetSocketAddress() { + return ipcServerAddress; + } + + private RPC.Server newRpcServer(RaftServerProtocol serverProtocol, final Configuration conf) + throws IOException { + final RaftServerConfigKeys.Get get = new RaftServerConfigKeys.Get() { + @Override + protected int getInt(String key, int defaultValue) { + return conf.getInt(key, defaultValue); + } + + @Override + protected String getTrimmed(String key, String defaultValue) { + return conf.getTrimmed(key, defaultValue); + } + }; + + final int handlerCount = get.ipc().handlers(); + final InetSocketAddress address = get.ipc().address(); + + final BlockingService service + = RaftServerProtocolService.newReflectiveBlockingService( + new RaftServerProtocolServerSideTranslatorPB(serverProtocol)); + RPC.setProtocolEngine(conf, RaftServerProtocolPB.class, ProtobufRpcEngineShaded.class); + return new RPC.Builder(conf) + .setProtocol(RaftServerProtocolPB.class) + .setInstance(service) + .setBindAddress(address.getHostName()) + .setPort(address.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .build(); + } + + private void addRaftClientProtocol(RaftClientProtocol clientProtocol, Configuration conf) { + final Class<?> protocol = RaftClientProtocolPB.class; + RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class); + + final BlockingService service + = RaftClientProtocolService.newReflectiveBlockingService( + new RaftClientProtocolServerSideTranslatorPB(clientProtocol)); + ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service); + } + + @Override + public void start() { + ipcServer.start(); + } + + @Override + public void close() { + ipcServer.stop(); + } + + @Override + public AppendEntriesReplyProto appendEntries( + AppendEntriesRequestProto request) throws IOException { + Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + + final RaftServerProtocolPB proxy = proxies.getProxy( + request.getServerRequest().getReplyId()).getProtocol(); + try { + return proxy.appendEntries(null, request); + } catch (ServiceException se) { + throw ProtoUtils.toIOException(se); + } + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + + final RaftServerProtocolPB proxy = proxies.getProxy( + request.getServerRequest().getReplyId()).getProtocol(); + try { + return proxy.installSnapshot(null, request); + } catch (ServiceException se) { + throw ProtoUtils.toIOException(se); + } + } + + @Override + public RequestVoteReplyProto requestVote( + RequestVoteRequestProto request) throws IOException { + Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + + final RaftServerProtocolPB proxy = proxies.getProxy( + request.getServerRequest().getReplyId()).getProtocol(); + try { + return proxy.requestVote(null, request); + } catch (ServiceException se) { + throw ProtoUtils.toIOException(se); + } + } + + @Override + public void addPeers(Iterable<RaftPeer> peers) { + proxies.addPeers(peers); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java new file mode 100644 index 0000000..8b92cc4 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.server; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.ratis.hadooprpc.HadoopConstants; +import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +@KerberosInfo( + serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY, + clientPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY) +@ProtocolInfo( + protocolName = HadoopConstants.RAFT_SERVER_PROTOCOL_NAME, + protocolVersion = 1) +public interface RaftServerProtocolPB extends + RaftServerProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..a496793 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.server; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.ratis.server.protocol.RaftServerProtocol; +import org.apache.ratis.shaded.com.google.protobuf.RpcController; +import org.apache.ratis.shaded.com.google.protobuf.ServiceException; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto; + +@InterfaceAudience.Private +public class RaftServerProtocolServerSideTranslatorPB + implements RaftServerProtocolPB { + private final RaftServerProtocol impl; + + public RaftServerProtocolServerSideTranslatorPB(RaftServerProtocol impl) { + this.impl = impl; + } + + @Override + public RequestVoteReplyProto requestVote( + RpcController unused, RequestVoteRequestProto request) + throws ServiceException { + try { + return impl.requestVote(request); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public AppendEntriesReplyProto appendEntries( + RpcController unused, AppendEntriesRequestProto request) + throws ServiceException { + try { + return impl.appendEntries(request); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public InstallSnapshotReplyProto installSnapshot(RpcController controller, + InstallSnapshotRequestProto request) throws ServiceException { + try { + return impl.installSnapshot(request); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java new file mode 100644 index 0000000..964f3a2 --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender; +import org.apache.ratis.hadooprpc.server.HadoopRpcService; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase { + static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithHadoopRpc.class); + + public static final Factory<MiniRaftClusterWithHadoopRpc> FACTORY + = new Factory<MiniRaftClusterWithHadoopRpc>() { + @Override + public MiniRaftClusterWithHadoopRpc newCluster( + String[] ids, RaftProperties prop, boolean formatted) throws IOException { + final Configuration conf = new Configuration(); + conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0"); + return new MiniRaftClusterWithHadoopRpc(ids, prop, conf, formatted); + } + }; + + public static final DelayLocalExecutionInjection sendServerRequest = + new DelayLocalExecutionInjection(HadoopRpcService.SEND_SERVER_REQUEST); + + private final Configuration hadoopConf; + + public MiniRaftClusterWithHadoopRpc(int numServers, RaftProperties properties, + Configuration conf) throws IOException { + this(generateIds(numServers, 0), properties, conf, true); + } + + public MiniRaftClusterWithHadoopRpc(String[] ids, RaftProperties properties, + Configuration hadoopConf, boolean formatted) throws IOException { + super(ids, properties, formatted); + this.hadoopConf = hadoopConf; + + init(initRpcServices(getServers(), hadoopConf)); + } + + private static Map<RaftPeer, HadoopRpcService> initRpcServices( + Collection<RaftServerImpl> servers, Configuration hadoopConf) throws IOException { + final Map<RaftPeer, HadoopRpcService> peerRpcs = new HashMap<>(); + + for(RaftServerImpl s : servers) { + final HadoopRpcService rpc = new HadoopRpcService(s, hadoopConf); + peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); + } + return peerRpcs; + } + + @Override + protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { + Configuration hconf = new Configuration(hadoopConf); + hconf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, peer.getAddress()); + + RaftServerImpl server = servers.get(peer.getId()); + final HadoopRpcService rpc = new HadoopRpcService(server, hconf); + Preconditions.checkState( + rpc.getInetSocketAddress().toString().contains(peer.getAddress()), + "address in the raft conf: %s, address in rpc server: %s", + peer.getAddress(), rpc.getInetSocketAddress().toString()); + server.setServerRpc(rpc); + return server; + } + + @Override + public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, + Collection<RaftServerImpl> newServers, boolean startService) + throws IOException { + return addNewPeers(initRpcServices(newServers, hadoopConf), + newServers, startService); + } + + @Override + public RaftClientRequestSender getRaftClientRequestSender() { + return new HadoopClientRequestSender(getPeers(), hadoopConf); + } + + @Override + public void blockQueueAndSetDelay(String leaderId, int delayMs) + throws InterruptedException { + RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest, + leaderId, delayMs, getMaxTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java new file mode 100644 index 0000000..8bc5ae6 --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftNotLeaderExceptionBaseTest; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; + +import java.io.IOException; + +public class TestNotLeaderExceptionWithHadoopRpc extends RaftNotLeaderExceptionBaseTest { + @Override + public MiniRaftCluster initCluster() throws IOException { + String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0); + final Configuration conf = new Configuration(); + conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0"); + RaftProperties prop = new RaftProperties(); + return new MiniRaftClusterWithHadoopRpc(s, prop, conf, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java new file mode 100644 index 0000000..cd502ac --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; + +import java.io.IOException; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY; + +public class TestRaftReconfigurationWithHadoopRpc + extends RaftReconfigurationBaseTest { + @Override + public MiniRaftCluster getCluster(int peerNum) throws IOException { + final Configuration hadoopConf = new Configuration(); + hadoopConf.setInt(IPC_CLIENT_CONNECT_TIMEOUT_KEY, 1000); + hadoopConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + hadoopConf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0"); + return new MiniRaftClusterWithHadoopRpc(peerNum, prop, hadoopConf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java new file mode 100644 index 0000000..7bdafb0 --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.statemachine.RaftSnapshotBaseTest; + +import java.io.IOException; + +public class TestRaftSnapshotWithHadoopRpc extends RaftSnapshotBaseTest { + @Override + public MiniRaftCluster initCluster(int numServer, RaftProperties prop) + throws IOException { + return MiniRaftClusterWithHadoopRpc.FACTORY.newCluster(numServer, prop, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java new file mode 100644 index 0000000..9dd3f27 --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Level; +import org.apache.ratis.RaftBasicTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; +import org.junit.Test; + +import static org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServerRequest; + +import java.io.IOException; + +public class TestRaftWithHadoopRpc extends RaftBasicTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + RaftUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithHadoopRpc cluster; + + public TestRaftWithHadoopRpc() throws IOException { + Configuration conf = new Configuration(); + conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0"); + cluster = new MiniRaftClusterWithHadoopRpc(NUM_SERVERS, getProperties(), conf); + } + + @Override + public MiniRaftClusterWithHadoopRpc getCluster() { + return cluster; + } + + @Override + @Test + public void testEnforceLeader() throws Exception { + super.testEnforceLeader(); + + sendServerRequest.clear(); + BlockRequestHandlingInjection.getInstance().unblockAll(); + } + + @Override + @Test + public void testWithLoad() throws Exception { + super.testWithLoad(); + BlockRequestHandlingInjection.getInstance().unblockAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/resources/log4j.properties b/ratis-hadoop/src/test/resources/log4j.properties new file mode 100644 index 0000000..ced0687 --- /dev/null +++ b/ratis-hadoop/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-netty/pom.xml b/ratis-netty/pom.xml new file mode 100644 index 0000000..cdffdc2 --- /dev/null +++ b/ratis-netty/pom.xml @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>ratis-project-dist</artifactId> + <groupId>org.apache.ratis</groupId> + <version>1.0-SNAPSHOT</version> + <relativePath>../ratis-project-dist</relativePath> + </parent> + + <artifactId>ratis-netty</artifactId> + <name>Ratis Netty Support</name> + + <dependencies> + <dependency> + <artifactId>ratis-proto-shaded</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java new file mode 100644 index 0000000..1aea87b --- /dev/null +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import java.io.Closeable; +import java.net.InetSocketAddress; + +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.NetUtils; + +public class NettyClient implements Closeable { + private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); + + private Channel channel; + + /** Connects to the given server address. */ + public void connect(String serverAddress, EventLoopGroup group, + ChannelInitializer<SocketChannel> initializer) + throws InterruptedException { + final InetSocketAddress address = NetUtils.newInetSocketAddress(serverAddress); + Preconditions.checkNotNull(address, + "Failed to create InetSocketAddress from %s.", serverAddress); + + lifeCycle.startAndTransition( + () -> channel = new Bootstrap() + .group(group) + .channel(NioSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .handler(initializer) + .connect(address) + .sync() + .channel(), + InterruptedException.class); + } + + @Override + public void close() { + lifeCycle.checkStateAndClose(() -> { + channel.close().syncUninterruptibly(); + }); + } + + public ChannelFuture writeAndFlush(Object msg) { + lifeCycle.assertCurrentState(LifeCycle.State.RUNNING); + return channel.writeAndFlush(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java new file mode 100644 index 0000000..9b8553b --- /dev/null +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import static org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto; +import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; +import org.apache.ratis.util.PeerProxyMap; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.RaftUtils; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; + +public class NettyRpcProxy implements Closeable { + public static class PeerMap extends PeerProxyMap<NettyRpcProxy> { + private final EventLoopGroup group = new NioEventLoopGroup(); + + @Override + public NettyRpcProxy createProxyImpl(RaftPeer peer) + throws IOException { + try { + return new NettyRpcProxy(peer, group); + } catch (InterruptedException e) { + throw RaftUtils.toInterruptedIOException("Failed connecting to " + peer, e); + } + } + + @Override + public void close() { + super.close(); + group.shutdownGracefully(); + } + } + + public static long getSeqNum(RaftNettyServerReplyProto proto) { + switch (proto.getRaftNettyServerReplyCase()) { + case REQUESTVOTEREPLY: + return proto.getRequestVoteReply().getServerReply().getSeqNum(); + case APPENDENTRIESREPLY: + return proto.getAppendEntriesReply().getServerReply().getSeqNum(); + case INSTALLSNAPSHOTREPLY: + return proto.getInstallSnapshotReply().getServerReply().getSeqNum(); + case RAFTCLIENTREPLY: + return proto.getRaftClientReply().getRpcReply().getSeqNum(); + case EXCEPTIONREPLY: + return proto.getExceptionReply().getRpcReply().getSeqNum(); + case RAFTNETTYSERVERREPLY_NOT_SET: + throw new IllegalArgumentException("Reply case not set in proto: " + + proto.getRaftNettyServerReplyCase()); + default: + throw new UnsupportedOperationException("Reply case not supported: " + + proto.getRaftNettyServerReplyCase()); + } + } + + + class Connection implements Closeable { + private final NettyClient client = new NettyClient(); + private final Queue<CompletableFuture<RaftNettyServerReplyProto>> replies + = new LinkedList<>(); + + Connection(EventLoopGroup group) throws InterruptedException { + final ChannelInboundHandler inboundHandler + = new SimpleChannelInboundHandler<RaftNettyServerReplyProto>() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, + RaftNettyServerReplyProto proto) { + final CompletableFuture<RaftNettyServerReplyProto> future = pollReply(); + if (future == null) { + throw new IllegalStateException("Request #" + getSeqNum(proto) + + " not found"); + } + if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) { + final Object ioe = ProtoUtils.toObject(proto.getExceptionReply().getException()); + future.completeExceptionally((IOException)ioe); + } else { + future.complete(proto); + } + } + }; + final ChannelInitializer<SocketChannel> initializer + = new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + final ChannelPipeline p = ch.pipeline(); + + p.addLast(new ProtobufVarint32FrameDecoder()); + p.addLast(new ProtobufDecoder(RaftNettyServerReplyProto.getDefaultInstance())); + p.addLast(new ProtobufVarint32LengthFieldPrepender()); + p.addLast(new ProtobufEncoder()); + + p.addLast(inboundHandler); + } + }; + + client.connect(peer.getAddress(), group, initializer); + } + + synchronized ChannelFuture offer(RaftNettyServerRequestProto request, + CompletableFuture<RaftNettyServerReplyProto> reply) { + replies.offer(reply); + return client.writeAndFlush(request); + } + + synchronized CompletableFuture<RaftNettyServerReplyProto> pollReply() { + return replies.poll(); + } + + @Override + public synchronized void close() { + client.close(); + if (!replies.isEmpty()) { + final IOException e = new IOException("Connection to " + peer + " is closed."); + replies.stream().forEach(f -> f.completeExceptionally(e)); + replies.clear(); + } + } + } + + private final RaftPeer peer; + private final Connection connection; + + public NettyRpcProxy(RaftPeer peer, EventLoopGroup group) throws InterruptedException { + this.peer = peer; + this.connection = new Connection(group); + } + + @Override + public void close() { + connection.close(); + } + + public RaftNettyServerReplyProto send( + RaftRpcRequestProto request, RaftNettyServerRequestProto proto) + throws IOException { + final CompletableFuture<RaftNettyServerReplyProto> reply = new CompletableFuture<>(); + final ChannelFuture channelFuture = connection.offer(proto, reply); + + try { + channelFuture.sync(); + return reply.get(); + } catch (InterruptedException e) { + throw RaftUtils.toInterruptedIOException(ProtoUtils.toString(request) + + " sending from " + peer + " is interrupted.", e); + } catch (ExecutionException e) { + throw RaftUtils.toIOException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java new file mode 100644 index 0000000..38d806b --- /dev/null +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty.client; + +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.netty.NettyRpcProxy; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.SetConfigurationRequest; + +import java.io.IOException; + +public class NettyClientRequestSender implements RaftClientRequestSender { + private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap(); + + public NettyClientRequestSender(Iterable<RaftPeer> servers) { + addServers(servers); + } + + @Override + public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { + final String serverId = request.getReplierId(); + final NettyRpcProxy proxy = proxies.getProxy(serverId); + + final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); + final RaftRpcRequestProto rpcRequest; + if (request instanceof SetConfigurationRequest) { + final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto( + (SetConfigurationRequest)request); + b.setSetConfigurationRequest(proto); + rpcRequest = proto.getRpcRequest(); + } else { + final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request); + b.setRaftClientRequest(proto); + rpcRequest = proto.getRpcRequest(); + } + return ClientProtoUtils.toRaftClientReply( + proxy.send(rpcRequest, b.build()).getRaftClientReply()); + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + proxies.addPeers(servers); + } + + @Override + public void close() { + proxies.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java new file mode 100644 index 0000000..153f61e --- /dev/null +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty.server; + +import com.google.common.base.Preconditions; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; + +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder; +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder; +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyExceptionReplyProto; +import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto; +import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.netty.NettyRpcProxy; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.ProtoUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; + +/** + * A netty server endpoint that acts as the communication layer. + */ +public final class NettyRpcService implements RaftServerRpc { + static final String CLASS_NAME = NettyRpcService.class.getSimpleName(); + public static final String SEND_SERVER_REQUEST = CLASS_NAME + ".sendServerRequest"; + + private final LifeCycle lifeCycle = new LifeCycle(getClass().getSimpleName()); + private final RaftServer server; + private final String id; + + private final EventLoopGroup bossGroup = new NioEventLoopGroup(); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); + private final ChannelFuture channelFuture; + + private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap(); + + @ChannelHandler.Sharable + class InboundHandler extends SimpleChannelInboundHandler<RaftNettyServerRequestProto> { + @Override + protected void channelRead0(ChannelHandlerContext ctx, RaftNettyServerRequestProto proto) { + final RaftNettyServerReplyProto reply = handle(proto); + ctx.writeAndFlush(reply); + } + } + + /** Constructs a netty server with the given port. */ + public NettyRpcService(int port, RaftServer server) { + this.server = server; + this.id = server.getId(); + + final ChannelInitializer<SocketChannel> initializer + = new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + final ChannelPipeline p = ch.pipeline(); + + p.addLast(new ProtobufVarint32FrameDecoder()); + p.addLast(new ProtobufDecoder(RaftNettyServerRequestProto.getDefaultInstance())); + p.addLast(new ProtobufVarint32LengthFieldPrepender()); + p.addLast(new ProtobufEncoder()); + + p.addLast(new InboundHandler()); + } + }; + + channelFuture = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(initializer) + .bind(port); + } + + private Channel getChannel() { + return channelFuture.awaitUninterruptibly().channel(); + } + + @Override + public void start() { + lifeCycle.startAndTransition(() -> channelFuture.syncUninterruptibly()); + } + + @Override + public void close() { + lifeCycle.checkStateAndClose(() -> { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + final ChannelFuture f = getChannel().close(); + proxies.close(); + f.syncUninterruptibly(); + }); + } + + @Override + public InetSocketAddress getInetSocketAddress() { + return (InetSocketAddress)getChannel().localAddress(); + } + + RaftNettyServerReplyProto handle(RaftNettyServerRequestProto proto) { + RaftRpcRequestProto rpcRequest = null; + try { + switch (proto.getRaftNettyServerRequestCase()) { + case REQUESTVOTEREQUEST: { + final RequestVoteRequestProto request = proto.getRequestVoteRequest(); + rpcRequest = request.getServerRequest(); + final RequestVoteReplyProto reply = server.requestVote(request); + return RaftNettyServerReplyProto.newBuilder() + .setRequestVoteReply(reply) + .build(); + } + case APPENDENTRIESREQUEST: { + final AppendEntriesRequestProto request = proto.getAppendEntriesRequest(); + rpcRequest = request.getServerRequest(); + final AppendEntriesReplyProto reply = server.appendEntries(request); + return RaftNettyServerReplyProto.newBuilder() + .setAppendEntriesReply(reply) + .build(); + } + case INSTALLSNAPSHOTREQUEST: { + final InstallSnapshotRequestProto request = proto.getInstallSnapshotRequest(); + rpcRequest = request.getServerRequest(); + final InstallSnapshotReplyProto reply = server.installSnapshot(request); + return RaftNettyServerReplyProto.newBuilder() + .setInstallSnapshotReply(reply) + .build(); + } + case RAFTCLIENTREQUEST: { + final RaftClientRequestProto request = proto.getRaftClientRequest(); + rpcRequest = request.getRpcRequest(); + final RaftClientReply reply = server.submitClientRequest( + ClientProtoUtils.toRaftClientRequest(request)); + return RaftNettyServerReplyProto.newBuilder() + .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) + .build(); + } + case SETCONFIGURATIONREQUEST: { + final SetConfigurationRequestProto request = proto.getSetConfigurationRequest(); + rpcRequest = request.getRpcRequest(); + final RaftClientReply reply = server.setConfiguration( + ClientProtoUtils.toSetConfigurationRequest(request)); + return RaftNettyServerReplyProto.newBuilder() + .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) + .build(); + } + case RAFTNETTYSERVERREQUEST_NOT_SET: + throw new IllegalArgumentException("Request case not set in proto: " + + proto.getRaftNettyServerRequestCase()); + default: + throw new UnsupportedOperationException("Request case not supported: " + + proto.getRaftNettyServerRequestCase()); + } + } catch (IOException ioe) { + Preconditions.checkNotNull(rpcRequest); + return toRaftNettyServerReplyProto(rpcRequest, ioe); + } + } + + private static RaftNettyServerReplyProto toRaftNettyServerReplyProto( + RaftRpcRequestProto request, IOException e) { + final RaftRpcReplyProto.Builder rpcReply = ClientProtoUtils.toRaftRpcReplyProtoBuilder( + request.getRequestorId(), + request.getReplyId(), + request.getSeqNum(), false); + final RaftNettyExceptionReplyProto.Builder ioe = RaftNettyExceptionReplyProto.newBuilder() + .setRpcReply(rpcReply) + .setException(ProtoUtils.toByteString(e)); + return RaftNettyServerReplyProto.newBuilder().setExceptionReply(ioe).build(); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException { + Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + + final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() + .setRequestVoteRequest(request) + .build(); + final RaftRpcRequestProto serverRequest = request.getServerRequest(); + return sendRaftNettyServerRequestProto(serverRequest, proto).getRequestVoteReply(); + } + + @Override + public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException { + Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + + final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() + .setAppendEntriesRequest(request) + .build(); + final RaftRpcRequestProto serverRequest = request.getServerRequest(); + return sendRaftNettyServerRequestProto(serverRequest, proto).getAppendEntriesReply(); + } + + @Override + public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException { + Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId())); + CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request); + + final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder() + .setInstallSnapshotRequest(request) + .build(); + final RaftRpcRequestProto serverRequest = request.getServerRequest(); + return sendRaftNettyServerRequestProto(serverRequest, proto).getInstallSnapshotReply(); + } + + private RaftNettyServerReplyProto sendRaftNettyServerRequestProto( + RaftRpcRequestProto request, RaftNettyServerRequestProto proto) + throws IOException { + final String id = request.getReplyId(); + final NettyRpcProxy p = proxies.getProxy(id); + try { + return p.send(request, proto); + } catch (ClosedChannelException cce) { + proxies.resetProxy(id); + throw cce; + } + } + + @Override + public void addPeers(Iterable<RaftPeer> peers) { + proxies.addPeers(peers); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java new file mode 100644 index 0000000..92e7722 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.netty.client.NettyClientRequestSender; +import org.apache.ratis.netty.server.NettyRpcService; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.impl.DelayLocalExecutionInjection; +import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.NetUtils; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase { + public static final Factory<MiniRaftClusterWithNetty> FACTORY + = new Factory<MiniRaftClusterWithNetty>() { + @Override + public MiniRaftClusterWithNetty newCluster( + String[] ids, RaftProperties prop, boolean formatted) { + return new MiniRaftClusterWithNetty(ids, prop, formatted); + } + }; + + public static final DelayLocalExecutionInjection sendServerRequest + = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST); + + public MiniRaftClusterWithNetty(int numServers, RaftProperties properties) { + this(generateIds(numServers, 0), properties, true); + } + + public MiniRaftClusterWithNetty( + String[] ids, RaftProperties properties, boolean formatted) { + super(ids, properties, formatted); + init(initRpcServices(getServers(), getConf())); + } + + private static String getAddress(String id, RaftConfiguration conf) { + final RaftPeer peer = conf.getPeer(id); + if (peer != null) { + final String address = peer.getAddress(); + if (address != null) { + return address; + } + } + return "0.0.0.0:0"; + } + + private static NettyRpcService newNettyRpcService( + RaftServerImpl s, RaftConfiguration conf) { + final String address = getAddress(s.getId(), conf); + final int port = NetUtils.newInetSocketAddress(address).getPort(); + return new NettyRpcService(port, s); + } + + private static Map<RaftPeer, NettyRpcService> initRpcServices( + Collection<RaftServerImpl> servers, RaftConfiguration conf) { + final Map<RaftPeer, NettyRpcService> peerRpcs = new HashMap<>(); + + for (RaftServerImpl s : servers) { + final NettyRpcService rpc = newNettyRpcService(s, conf); + peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); + } + + return peerRpcs; + } + + @Override + protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { + final RaftServerImpl s = servers.get(peer.getId()); + final NettyRpcService rpc = newNettyRpcService(s, conf); + s.setServerRpc(rpc); + return s; + } + + @Override + protected Collection<RaftPeer> addNewPeers( + Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers, + boolean startService) throws IOException { + return addNewPeers(initRpcServices(newServers, conf), + newServers, startService); + } + + @Override + public RaftClientRequestSender getRaftClientRequestSender() { + return new NettyClientRequestSender(getPeers()); + } + + @Override + protected void blockQueueAndSetDelay(String leaderId, int delayMs) + throws InterruptedException { + RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest, + leaderId, delayMs, getMaxTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java new file mode 100644 index 0000000..9c267e7 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftNotLeaderExceptionBaseTest; +import org.apache.ratis.conf.RaftProperties; + +import java.io.IOException; + +public class TestNotLeaderExceptionWithNetty extends RaftNotLeaderExceptionBaseTest { + @Override + public MiniRaftCluster initCluster() throws IOException { + String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0); + RaftProperties prop = new RaftProperties(); + return new MiniRaftClusterWithNetty(s, prop, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java new file mode 100644 index 0000000..cfa9729 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import java.io.IOException; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; + +public class TestRaftReconfigurationWithNetty + extends RaftReconfigurationBaseTest { + @Override + public MiniRaftCluster getCluster(int peerNum) throws IOException { + return MiniRaftClusterWithNetty.FACTORY.newCluster(peerNum, prop, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java new file mode 100644 index 0000000..18807c0 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.statemachine.RaftSnapshotBaseTest; + +import java.io.IOException; + +public class TestRaftSnapshotWithNetty extends RaftSnapshotBaseTest { + @Override + public MiniRaftCluster initCluster(int numServer, RaftProperties prop) + throws IOException { + return MiniRaftClusterWithNetty.FACTORY.newCluster(numServer, prop, true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java new file mode 100644 index 0000000..3954065 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.log4j.Level; +import org.apache.ratis.RaftBasicTests; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.server.impl.BlockRequestHandlingInjection; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.util.RaftUtils; +import org.junit.Test; + +import java.io.IOException; + +public class TestRaftWithNetty extends RaftBasicTests { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + private final MiniRaftClusterWithNetty cluster; + + public TestRaftWithNetty() throws IOException { + cluster = new MiniRaftClusterWithNetty(NUM_SERVERS, getProperties()); + } + + @Override + public MiniRaftClusterWithNetty getCluster() { + return cluster; + } + + @Override + @Test + public void testEnforceLeader() throws Exception { + super.testEnforceLeader(); + + MiniRaftClusterWithNetty.sendServerRequest.clear(); + BlockRequestHandlingInjection.getInstance().unblockAll(); + } + + @Override + @Test + public void testWithLoad() throws Exception { + super.testWithLoad(); + BlockRequestHandlingInjection.getInstance().unblockAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/resources/log4j.properties b/ratis-netty/src/test/resources/log4j.properties new file mode 100644 index 0000000..ced0687 --- /dev/null +++ b/ratis-netty/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n