RATIS-86. Support raft server re-initialization. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/06002e67 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/06002e67 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/06002e67 Branch: refs/heads/master Commit: 06002e67a3476a491e4fc0f1638123f5957c452e Parents: 291f51b Author: Jing Zhao <[email protected]> Authored: Fri May 19 11:23:33 2017 -0700 Committer: Jing Zhao <[email protected]> Committed: Fri May 19 11:23:33 2017 -0700 ---------------------------------------------------------------------- .../org/apache/ratis/client/RaftClient.java | 3 + .../ratis/client/impl/ClientProtoUtils.java | 22 +++++ .../ratis/client/impl/RaftClientImpl.java | 22 ++++- .../protocol/AdminAsynchronousProtocol.java | 27 +++++++ .../apache/ratis/protocol/AdminProtocol.java | 25 ++++++ .../ratis/protocol/ReinitializeRequest.java | 39 +++++++++ .../org/apache/ratis/util/CheckedSupplier.java | 30 +++++++ .../java/org/apache/ratis/util/IOUtils.java | 4 +- .../java/org/apache/ratis/util/JavaUtils.java | 63 +++++++++++++++ .../org/apache/ratis/grpc/RaftGRpcService.java | 2 + .../org/apache/ratis/grpc/RaftGrpcUtil.java | 32 ++++++-- .../apache/ratis/grpc/client/GrpcClientRpc.java | 7 +- .../grpc/client/RaftClientProtocolClient.java | 26 ++++-- .../grpc/client/RaftClientProtocolService.java | 27 ++----- .../ratis/grpc/server/AdminProtocolService.java | 45 +++++++++++ .../grpc/TestReinitializationWithGrpc.java | 28 +++++++ .../apache/ratis/hadooprpc/HadoopConstants.java | 4 +- .../client/CombinedClientProtocol.java | 25 ++++++ ...nedClientProtocolClientSideTranslatorPB.java | 83 +++++++++++++++++++ .../client/CombinedClientProtocolPB.java | 37 +++++++++ ...nedClientProtocolServerSideTranslatorPB.java | 82 +++++++++++++++++++ .../ratis/hadooprpc/client/HadoopClientRpc.java | 10 ++- ...aftClientProtocolClientSideTranslatorPB.java | 70 ---------------- .../hadooprpc/client/RaftClientProtocolPB.java | 37 --------- ...aftClientProtocolServerSideTranslatorPB.java | 69 ---------------- .../hadooprpc/server/HadoopRpcService.java | 18 ++--- .../TestReinitializationWithHadoopRpc.java | 28 +++++++ .../ratis/netty/client/NettyClientRpc.java | 8 +- .../ratis/netty/server/NettyRpcService.java | 9 +++ .../netty/TestReinitializationWithNetty.java | 28 +++++++ ratis-proto-shaded/src/main/proto/GRpc.proto | 6 ++ ratis-proto-shaded/src/main/proto/Hadoop.proto | 6 +- ratis-proto-shaded/src/main/proto/Netty.proto | 1 + ratis-proto-shaded/src/main/proto/Raft.proto | 6 ++ .../org/apache/ratis/server/RaftServer.java | 8 +- .../apache/ratis/server/impl/LeaderState.java | 3 +- .../ratis/server/impl/PeerConfiguration.java | 1 - .../ratis/server/impl/RaftServerImpl.java | 2 +- .../ratis/server/impl/RaftServerProxy.java | 85 +++++++++++++++++--- .../java/org/apache/ratis/MiniRaftCluster.java | 24 +++--- .../java/org/apache/ratis/RaftBasicTests.java | 2 +- .../java/org/apache/ratis/RaftTestUtil.java | 23 +++++- .../impl/RaftReconfigurationBaseTest.java | 2 +- .../server/impl/ReinitializationBaseTest.java | 84 +++++++++++++++++++ .../ratis/server/simulation/SimulatedRpc.java | 1 - .../server/simulation/SimulatedServerRpc.java | 27 +++---- .../TestReinitializationWithSimulatedRpc.java | 28 +++++++ 47 files changed, 938 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 8f3c465..956a6de 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -52,6 +52,9 @@ public interface RaftClient extends Closeable { /** Send set configuration request to the raft service. */ RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException; + /** Send reinitialize request to the service. */ + RaftClientReply reinitialize(RaftPeer[] serversInNewConf, RaftPeerId server) throws IOException; + /** @return a {@link Builder}. */ static Builder newBuilder() { return new Builder(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 62a9ee4..ff49109 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -185,4 +185,26 @@ public class ClientProtoUtils { Arrays.asList(request.getPeersInNewConf()))) .build(); } + + public static ReinitializeRequest toReinitializeRequest( + ReinitializeRequestProto p) { + final RaftRpcRequestProto m = p.getRpcRequest(); + final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList()); + return new ReinitializeRequest( + new ClientId(m.getRequestorId().toByteArray()), + RaftPeerId.valueOf(m.getReplyId()), + p.getRpcRequest().getCallId(), peers); + } + + public static ReinitializeRequestProto toReinitializeRequestProto( + ReinitializeRequest request) { + return ReinitializeRequestProto.newBuilder() + .setRpcRequest(toRaftRpcRequestProtoBuilder( + request.getClientId().toBytes(), + request.getServerId().toBytes(), + request.getCallId())) + .addAllPeers(ProtoUtils.toRaftPeerProtos( + Arrays.asList(request.getPeersInNewConf()))) + .build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 2125ce0..40e670d 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.stream.Collector; import java.util.stream.Collectors; /** A client who sends requests to a raft service. */ @@ -86,12 +87,25 @@ final class RaftClientImpl implements RaftClient { throws IOException { final long callId = nextCallId(); // also refresh the rpc proxies for these peers - clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains) - .collect(Collectors.toCollection(ArrayList::new))); + addServers(peersInNewConf); return sendRequestWithRetry(() -> new SetConfigurationRequest( clientId, leaderId, callId, peersInNewConf)); } + @Override + public RaftClientReply reinitialize(RaftPeer[] peersInNewConf, RaftPeerId server) + throws IOException { + final long callId = nextCallId(); + addServers(peersInNewConf); + return sendRequest(new ReinitializeRequest( + clientId, server, callId, peersInNewConf)); + } + + private void addServers(RaftPeer[] peersInNewConf) { + clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains) + .collect(Collectors.toList())); + } + private RaftClientReply sendRequestWithRetry( Supplier<RaftClientRequest> supplier) throws InterruptedIOException, StateMachineException { @@ -157,6 +171,10 @@ final class RaftClientImpl implements RaftClient { RaftPeerId newLeader) { LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId, newLeader, ioe); + if (LOG.isTraceEnabled()) { + LOG.trace("Stack trace", new Throwable("TRACE")); + } + final RaftPeerId oldLeader = request.getServerId(); if (newLeader == null && oldLeader.equals(leaderId)) { newLeader = CollectionUtils.next(oldLeader, CollectionUtils.as(peers, RaftPeer::getId)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java new file mode 100644 index 0000000..663751b --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +/** Asynchronous version of {@link AdminProtocol}. */ +public interface AdminAsynchronousProtocol { + CompletableFuture<RaftClientReply> reinitializeAsync( + ReinitializeRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java new file mode 100644 index 0000000..0e7d6b6 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.io.IOException; + +/** For server administration. */ +public interface AdminProtocol { + RaftClientReply reinitialize(ReinitializeRequest request) throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java new file mode 100644 index 0000000..0a89340 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +import java.util.Arrays; + +public class ReinitializeRequest extends RaftClientRequest { + private final RaftPeer[] peers; + + public ReinitializeRequest(ClientId clientId, RaftPeerId serverId, + long callId, RaftPeer[] peers) { + super(clientId, serverId, callId, null); + this.peers = peers; + } + + public RaftPeer[] getPeersInNewConf() { + return peers; + } + + @Override + public String toString() { + return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java new file mode 100644 index 0000000..0c9de31 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.function.Supplier; + +/** Function with a throws-clause. */ +@FunctionalInterface +public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> { + /** + * The same as {@link Supplier#get()} + * except that this method is declared with a throws-clause. + */ + OUTPUT get() throws THROWABLE; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java index ba5e78e..4976be8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java @@ -43,7 +43,9 @@ public interface IOUtils { } static IOException asIOException(Throwable t) { - return t instanceof IOException? (IOException)t : new IOException(t); + return t == null? null + : t instanceof IOException? (IOException)t + : new IOException(t); } static IOException toIOException(ExecutionException e) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java new file mode 100644 index 0000000..5da2012 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java @@ -0,0 +1,63 @@ +/* + * * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * General Java utility methods. + */ +public interface JavaUtils { + Logger LOG = LoggerFactory.getLogger(JavaUtils.class); + + /** + * Invoke {@link Callable#call()} and, if there any, + * wrap the checked exception by {@link RuntimeException}. + */ + static <T> T callAsUnchecked(Callable<T> callable) { + try { + return callable.call(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Get the value from the future and then consume it. + */ + static <T> void getAndConsume(CompletableFuture<T> future, Consumer<T> consumer) { + final T t; + try { + t = future.get(); + } catch (Exception ignored) { + LOG.warn("Failed to get()", ignored); + return; + } + consumer.accept(t); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 0deb3f4..96a7a45 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -18,6 +18,7 @@ package org.apache.ratis.grpc; import org.apache.ratis.grpc.client.RaftClientProtocolService; +import org.apache.ratis.grpc.server.AdminProtocolService; import org.apache.ratis.grpc.server.RaftServerProtocolClient; import org.apache.ratis.grpc.server.RaftServerProtocolService; import org.apache.ratis.protocol.RaftPeer; @@ -82,6 +83,7 @@ public class RaftGRpcService implements RaftServerRpc { server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) .addService(new RaftServerProtocolService(selfId, raftServer)) .addService(new RaftClientProtocolService(selfId, raftServer)) + .addService(new AdminProtocolService(selfId, raftServer)) .build(); // start service to determine the port (in case port is configured as 0) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java index b89c297..fdc9ce8 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java @@ -17,20 +17,26 @@ */ package org.apache.ratis.grpc; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.shaded.io.grpc.Metadata; import org.apache.ratis.shaded.io.grpc.Status; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.util.CheckedSupplier; import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.ReflectionUtils; import org.apache.ratis.util.StringUtils; import java.io.IOException; +import java.util.concurrent.CompletableFuture; -public class RaftGrpcUtil { - public static final Metadata.Key<String> EXCEPTION_TYPE_KEY = +public interface RaftGrpcUtil { + Metadata.Key<String> EXCEPTION_TYPE_KEY = Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); - public static StatusRuntimeException wrapException(Throwable t) { + static StatusRuntimeException wrapException(Throwable t) { Metadata trailers = new Metadata(); trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); return new StatusRuntimeException( @@ -38,7 +44,7 @@ public class RaftGrpcUtil { trailers); } - public static IOException unwrapException(StatusRuntimeException se) { + static IOException unwrapException(StatusRuntimeException se) { final Metadata trailers = se.getTrailers(); final Status status = se.getStatus(); if (trailers != null && status != null) { @@ -57,7 +63,7 @@ public class RaftGrpcUtil { return new IOException(se); } - public static IOException unwrapIOException(Throwable t) { + static IOException unwrapIOException(Throwable t) { final IOException e; if (t instanceof StatusRuntimeException) { e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); @@ -67,4 +73,20 @@ public class RaftGrpcUtil { return e; } + static void asyncCall( + StreamObserver<RaftClientReplyProto> responseObserver, + CheckedSupplier<CompletableFuture<RaftClientReply>, IOException> supplier) { + try { + supplier.get().whenCompleteAsync((reply, exception) -> { + if (exception != null) { + responseObserver.onError(RaftGrpcUtil.wrapException(exception)); + } else { + responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply)); + responseObserver.onCompleted(); + } + }); + } catch (Exception e) { + responseObserver.onError(RaftGrpcUtil.wrapException(e)); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index b28415c..b30640b 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -22,6 +22,7 @@ import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; @@ -48,7 +49,11 @@ public class GrpcClientRpc implements RaftClientRpc { throws IOException { final RaftPeerId serverId = request.getServerId(); final RaftClientProtocolClient proxy = proxies.getProxy(serverId); - if (request instanceof SetConfigurationRequest) { + if (request instanceof ReinitializeRequest) { + RaftProtos.ReinitializeRequestProto proto = + toReinitializeRequestProto((ReinitializeRequest) request); + return toRaftClientReply(proxy.reinitialize(proto)); + } else if (request instanceof SetConfigurationRequest) { SetConfigurationRequestProto setConf = toSetConfigurationRequestProto((SetConfigurationRequest) request); return toRaftClientReply(proxy.setConfiguration(setConf)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index 74fb253..21254f3 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -17,18 +17,22 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.shaded.io.grpc.ManagedChannel; import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc; +import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.util.CheckedSupplier; import java.io.Closeable; import java.io.IOException; @@ -38,6 +42,7 @@ public class RaftClientProtocolClient implements Closeable { private final ManagedChannel channel; private final RaftClientProtocolServiceBlockingStub blockingStub; private final RaftClientProtocolServiceStub asyncStub; + private final AdminProtocolServiceBlockingStub adminBlockingStub; public RaftClientProtocolClient(RaftPeer target) { this.target = target; @@ -45,6 +50,7 @@ public class RaftClientProtocolClient implements Closeable { .usePlaintext(true).build(); blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); + adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel); } @Override @@ -52,12 +58,22 @@ public class RaftClientProtocolClient implements Closeable { channel.shutdownNow(); } - public RaftClientReplyProto setConfiguration( + RaftClientReplyProto reinitialize( + ReinitializeRequestProto request) throws IOException { + return blockingCall(() -> adminBlockingStub.reinitialize(request)); + } + + RaftClientReplyProto setConfiguration( SetConfigurationRequestProto request) throws IOException { + return blockingCall(() -> blockingStub.setConfiguration(request)); + } + + private static RaftClientReplyProto blockingCall( + CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier + ) throws IOException { try { - return blockingStub.setConfiguration(request); + return supplier.get(); } catch (StatusRuntimeException e) { - // unwrap StatusRuntimeException throw RaftGrpcUtil.unwrapException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java index 97e32c1..e11a9cf 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -17,16 +17,17 @@ */ package org.apache.ratis.grpc.client; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; +import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.SetConfigurationRequest; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; -import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,22 +75,10 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase } @Override - public void setConfiguration(SetConfigurationRequestProto request, + public void setConfiguration(SetConfigurationRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) { - try { - CompletableFuture<RaftClientReply> future = protocol.setConfigurationAsync( - ClientProtoUtils.toSetConfigurationRequest(request)); - future.whenCompleteAsync((reply, exception) -> { - if (exception != null) { - responseObserver.onError(RaftGrpcUtil.wrapException(exception)); - } else { - responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply)); - responseObserver.onCompleted(); - } - }); - } catch (Exception e) { - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } + final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto); + RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java new file mode 100644 index 0000000..4e7ff9a --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.AdminAsynchronousProtocol; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.ReinitializeRequest; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; +import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase; + +public class AdminProtocolService extends AdminProtocolServiceImplBase { + private final RaftPeerId id; + private final AdminAsynchronousProtocol protocol; + + public AdminProtocolService(RaftPeerId id, AdminAsynchronousProtocol protocol) { + this.id = id; + this.protocol = protocol; + } + + @Override + public void reinitialize(ReinitializeRequestProto proto, + StreamObserver<RaftClientReplyProto> responseObserver) { + final ReinitializeRequest request = ClientProtoUtils.toReinitializeRequest(proto); + RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.reinitializeAsync(request)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java new file mode 100644 index 0000000..27cbf1e --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.ReinitializationBaseTest; + +public class TestReinitializationWithGrpc extends ReinitializationBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithGRpc.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java index a50b938..1f9c15d 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java @@ -24,6 +24,6 @@ public interface HadoopConstants { = "raft.client.kerberos.principal"; String RAFT_SERVER_PROTOCOL_NAME = "org.apache.hadoop.raft.server.protocol.RaftServerProtocol"; - String RAFT_CLIENT_PROTOCOL_NAME - = "org.apache.hadoop.raft.protocol.RaftClientProtocol"; + String COMBINED_CLIENT_PROTOCOL_NAME + = "org.apache.ratis.hadooprpc.client.CombinedClientProtocol"; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java new file mode 100644 index 0000000..6281987 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import org.apache.ratis.protocol.AdminProtocol; +import org.apache.ratis.protocol.RaftClientProtocol; + +public interface CombinedClientProtocol + extends RaftClientProtocol, AdminProtocol { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java new file mode 100644 index 0000000..8d1eff2 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.hadooprpc.Proxy; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.ReinitializeRequest; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.shaded.com.google.common.base.Function; +import org.apache.ratis.shaded.com.google.protobuf.ServiceException; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.util.CheckedFunction; +import org.apache.ratis.util.ProtoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + [email protected] +public class CombinedClientProtocolClientSideTranslatorPB + extends Proxy<CombinedClientProtocolPB> + implements CombinedClientProtocol { + private static final Logger LOG = LoggerFactory.getLogger(CombinedClientProtocolClientSideTranslatorPB.class); + + public CombinedClientProtocolClientSideTranslatorPB( + String addressStr, Configuration conf) throws IOException { + super(CombinedClientProtocolPB.class, addressStr, conf); + } + + @Override + public RaftClientReply submitClientRequest(RaftClientRequest request) + throws IOException { + return handleRequest(request, ClientProtoUtils::toRaftClientRequestProto, + p -> getProtocol().submitClientRequest(null, p)); + } + + @Override + public RaftClientReply setConfiguration(SetConfigurationRequest request) + throws IOException { + return handleRequest(request, ClientProtoUtils::toSetConfigurationRequestProto, + p -> getProtocol().setConfiguration(null, p)); + } + + @Override + public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException { + return handleRequest(request, ClientProtoUtils::toReinitializeRequestProto, + p -> getProtocol().reinitialize(null, p)); + } + + static <REQUEST extends RaftClientRequest, PROTO> RaftClientReply handleRequest( + REQUEST request, Function<REQUEST, PROTO> toProto, + CheckedFunction<PROTO, RaftClientReplyProto, ServiceException> handler) + throws IOException { + final PROTO proto = toProto.apply(request); + try { + final RaftClientReplyProto reply = handler.apply(proto); + return ClientProtoUtils.toRaftClientReply(reply); + } catch (ServiceException se) { + LOG.trace("Failed to handle " + request, se); + throw ProtoUtils.toIOException(se); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java new file mode 100644 index 0000000..e9af3b0 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.security.KerberosInfo; +import org.apache.ratis.hadooprpc.HadoopConstants; +import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.CombinedClientProtocolService; + [email protected] [email protected] +@KerberosInfo( + serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY, + clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY) +@ProtocolInfo( + protocolName = HadoopConstants.COMBINED_CLIENT_PROTOCOL_NAME, + protocolVersion = 1) +public interface CombinedClientProtocolPB extends + CombinedClientProtocolService.BlockingInterface { +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java new file mode 100644 index 0000000..ef9e733 --- /dev/null +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc.client; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.protocol.*; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.shaded.com.google.protobuf.RpcController; +import org.apache.ratis.shaded.com.google.protobuf.ServiceException; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; + [email protected] +public class CombinedClientProtocolServerSideTranslatorPB + implements CombinedClientProtocolPB { + private final RaftServer impl; + + public CombinedClientProtocolServerSideTranslatorPB(RaftServer impl) { + this.impl = impl; + } + + @Override + public RaftClientReplyProto submitClientRequest( + RpcController unused, RaftClientRequestProto proto) + throws ServiceException { + final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(proto); + try { + final RaftClientReply reply = impl.submitClientRequest(request); + return ClientProtoUtils.toRaftClientReplyProto(reply); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public RaftClientReplyProto setConfiguration( + RpcController unused, SetConfigurationRequestProto proto) + throws ServiceException { + final SetConfigurationRequest request; + try { + request = ClientProtoUtils.toSetConfigurationRequest(proto); + final RaftClientReply reply = impl.setConfiguration(request); + return ClientProtoUtils.toRaftClientReplyProto(reply); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public RaftClientReplyProto reinitialize( + RpcController controller, ReinitializeRequestProto proto) + throws ServiceException { + final ReinitializeRequest request; + try { + request = ClientProtoUtils.toReinitializeRequest(proto); + final RaftClientReply reply = impl.reinitialize(request); + return ClientProtoUtils.toRaftClientReplyProto(reply); + } catch(IOException ioe) { + throw new ServiceException(ioe); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java index 25c0ecd..3a2d6fc 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java @@ -27,21 +27,23 @@ import java.io.IOException; public class HadoopClientRpc implements RaftClientRpc { - private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies; + private final PeerProxyMap<CombinedClientProtocolClientSideTranslatorPB> proxies; public HadoopClientRpc(final Configuration conf) { this.proxies = new PeerProxyMap<>( - p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); + p -> new CombinedClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); } @Override public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); - final RaftClientProtocolClientSideTranslatorPB proxy = + final CombinedClientProtocolClientSideTranslatorPB proxy = proxies.getProxy(serverId); try { - if (request instanceof SetConfigurationRequest) { + if (request instanceof ReinitializeRequest) { + return proxy.reinitialize((ReinitializeRequest) request); + } else if (request instanceof SetConfigurationRequest) { return proxy.setConfiguration((SetConfigurationRequest) request); } else { return proxy.submitClientRequest(request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java deleted file mode 100644 index a5c1a13..0000000 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.hadooprpc.client; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.hadooprpc.Proxy; -import org.apache.ratis.protocol.RaftClientProtocol; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.SetConfigurationRequest; -import org.apache.ratis.shaded.com.google.protobuf.ServiceException; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.ratis.util.ProtoUtils; - [email protected] -public class RaftClientProtocolClientSideTranslatorPB - extends Proxy<RaftClientProtocolPB> - implements RaftClientProtocol { - - public RaftClientProtocolClientSideTranslatorPB( - String addressStr, Configuration conf) throws IOException { - super(RaftClientProtocolPB.class, addressStr, conf); - } - - @Override - public RaftClientReply submitClientRequest(RaftClientRequest request) - throws IOException { - final RaftClientRequestProto p = ClientProtoUtils.toRaftClientRequestProto(request); - try { - final RaftClientReplyProto reply = getProtocol().submitClientRequest(null, p); - return ClientProtoUtils.toRaftClientReply(reply); - } catch (ServiceException se) { - throw ProtoUtils.toIOException(se); - } - } - - @Override - public RaftClientReply setConfiguration(SetConfigurationRequest request) - throws IOException { - final SetConfigurationRequestProto p - = ClientProtoUtils.toSetConfigurationRequestProto(request); - try { - final RaftClientReplyProto reply = getProtocol().setConfiguration(null, p); - return ClientProtoUtils.toRaftClientReply(reply); - } catch (ServiceException se) { - throw ProtoUtils.toIOException(se); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java deleted file mode 100644 index 908cd99..0000000 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.hadooprpc.client; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.ipc.ProtocolInfo; -import org.apache.hadoop.security.KerberosInfo; -import org.apache.ratis.hadooprpc.HadoopConstants; -import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService; - [email protected] [email protected] -@KerberosInfo( - serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY, - clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY) -@ProtocolInfo( - protocolName = HadoopConstants.RAFT_CLIENT_PROTOCOL_NAME, - protocolVersion = 1) -public interface RaftClientProtocolPB extends - RaftClientProtocolService.BlockingInterface { -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java deleted file mode 100644 index 08cf589..0000000 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.hadooprpc.client; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.protocol.RaftClientProtocol; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.SetConfigurationRequest; -import org.apache.ratis.shaded.com.google.protobuf.RpcController; -import org.apache.ratis.shaded.com.google.protobuf.ServiceException; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; - [email protected] -public class RaftClientProtocolServerSideTranslatorPB - implements RaftClientProtocolPB { - private final RaftClientProtocol impl; - - public RaftClientProtocolServerSideTranslatorPB(RaftClientProtocol impl) { - this.impl = impl; - } - - @Override - public RaftClientReplyProto submitClientRequest( - RpcController unused, RaftClientRequestProto proto) - throws ServiceException { - final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(proto); - try { - final RaftClientReply reply = impl.submitClientRequest(request); - return ClientProtoUtils.toRaftClientReplyProto(reply); - } catch(IOException ioe) { - throw new ServiceException(ioe); - } - } - - @Override - public RaftClientReplyProto setConfiguration( - RpcController unused, SetConfigurationRequestProto proto) - throws ServiceException { - final SetConfigurationRequest request; - try { - request = ClientProtoUtils.toSetConfigurationRequest(proto); - final RaftClientReply reply = impl.setConfiguration(request); - return ClientProtoUtils.toRaftClientReplyProto(reply); - } catch(IOException ioe) { - throw new ServiceException(ioe); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java index e31a03a..15afde8 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java @@ -22,9 +22,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; import org.apache.hadoop.ipc.RPC; import org.apache.ratis.hadooprpc.HadoopConfigKeys; import org.apache.ratis.hadooprpc.Proxy; -import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB; -import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB; -import org.apache.ratis.protocol.RaftClientProtocol; +import org.apache.ratis.hadooprpc.client.CombinedClientProtocolPB; +import org.apache.ratis.hadooprpc.client.CombinedClientProtocolServerSideTranslatorPB; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; @@ -35,7 +34,7 @@ import org.apache.ratis.shaded.com.google.protobuf.BlockingService; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.com.google.protobuf.ServiceException; import org.apache.ratis.shaded.proto.RaftProtos.*; -import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService; +import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.CombinedClientProtocolService; import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService; import org.apache.ratis.util.CheckedFunction; import org.apache.ratis.util.CodeInjectionForTesting; @@ -138,13 +137,12 @@ public class HadoopRpcService implements RaftServerRpc { .build(); } - private void addRaftClientProtocol(RaftClientProtocol clientProtocol, Configuration conf) { - final Class<?> protocol = RaftClientProtocolPB.class; - RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class); + private void addRaftClientProtocol(RaftServer server, Configuration conf) { + final Class<?> protocol = CombinedClientProtocolPB.class; + RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngineShaded.class); - final BlockingService service - = RaftClientProtocolService.newReflectiveBlockingService( - new RaftClientProtocolServerSideTranslatorPB(clientProtocol)); + final BlockingService service = CombinedClientProtocolService.newReflectiveBlockingService( + new CombinedClientProtocolServerSideTranslatorPB(server)); ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java new file mode 100644 index 0000000..6efb012 --- /dev/null +++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.hadooprpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.ReinitializationBaseTest; + +public class TestReinitializationWithHadoopRpc extends ReinitializationBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithHadoopRpc.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java index 74afddc..14218ad 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java @@ -23,6 +23,7 @@ import org.apache.ratis.netty.NettyRpcProxy; import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto; @@ -38,7 +39,12 @@ public class NettyClientRpc implements RaftClientRpc { final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder(); final RaftRpcRequestProto rpcRequest; - if (request instanceof SetConfigurationRequest) { + if (request instanceof ReinitializeRequest) { + final ReinitializeRequestProto proto = ClientProtoUtils.toReinitializeRequestProto( + (ReinitializeRequest)request); + b.setReinitializeRequest(proto); + rpcRequest = proto.getRpcRequest(); + } else if (request instanceof SetConfigurationRequest) { final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto( (SetConfigurationRequest)request); b.setSetConfigurationRequest(proto); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java index 9504241..b8028b6 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java @@ -200,6 +200,15 @@ public final class NettyRpcService implements RaftServerRpc { .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) .build(); } + case REINITIALIZEREQUEST: { + final ReinitializeRequestProto request = proto.getReinitializeRequest(); + rpcRequest = request.getRpcRequest(); + final RaftClientReply reply = server.reinitialize( + ClientProtoUtils.toReinitializeRequest(request)); + return RaftNettyServerReplyProto.newBuilder() + .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply)) + .build(); + } case RAFTNETTYSERVERREQUEST_NOT_SET: throw new IllegalArgumentException("Request case not set in proto: " + proto.getRaftNettyServerRequestCase()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java new file mode 100644 index 0000000..c378749 --- /dev/null +++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.netty; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.server.impl.ReinitializationBaseTest; + +public class TestReinitializationWithNetty extends ReinitializationBaseTest { + @Override + public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { + return MiniRaftClusterWithNetty.FACTORY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/GRpc.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto index 267f579..599227b 100644 --- a/ratis-proto-shaded/src/main/proto/GRpc.proto +++ b/ratis-proto-shaded/src/main/proto/GRpc.proto @@ -43,3 +43,9 @@ service RaftServerProtocolService { rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto) returns(ratis.common.InstallSnapshotReplyProto) {} } + +service AdminProtocolService { + // A client-to-server RPC to reinitialize the server + rpc reinitialize(ratis.common.ReinitializeRequestProto) + returns(ratis.common.RaftClientReplyProto) {} +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Hadoop.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Hadoop.proto b/ratis-proto-shaded/src/main/proto/Hadoop.proto index b85b9a2..48cfbf4 100644 --- a/ratis-proto-shaded/src/main/proto/Hadoop.proto +++ b/ratis-proto-shaded/src/main/proto/Hadoop.proto @@ -24,12 +24,15 @@ package ratis.hadoop; import "Raft.proto"; -service RaftClientProtocolService { +service CombinedClientProtocolService { rpc submitClientRequest(ratis.common.RaftClientRequestProto) returns(ratis.common.RaftClientReplyProto); rpc setConfiguration(ratis.common.SetConfigurationRequestProto) returns(ratis.common.RaftClientReplyProto); + + rpc reinitialize(ratis.common.ReinitializeRequestProto) + returns(ratis.common.RaftClientReplyProto); } service RaftServerProtocolService { @@ -42,3 +45,4 @@ service RaftServerProtocolService { rpc installSnapshot(ratis.common.InstallSnapshotRequestProto) returns(ratis.common.InstallSnapshotReplyProto); } + http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Netty.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Netty.proto b/ratis-proto-shaded/src/main/proto/Netty.proto index d1634d7..4de770f 100644 --- a/ratis-proto-shaded/src/main/proto/Netty.proto +++ b/ratis-proto-shaded/src/main/proto/Netty.proto @@ -35,6 +35,7 @@ message RaftNettyServerRequestProto { ratis.common.InstallSnapshotRequestProto installSnapshotRequest = 3; ratis.common.RaftClientRequestProto raftClientRequest = 4; ratis.common.SetConfigurationRequestProto setConfigurationRequest = 5; + ratis.common.ReinitializeRequestProto reinitializeRequest = 6; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index f8dcf62..b53181f 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -180,3 +180,9 @@ message SetConfigurationRequestProto { RaftRpcRequestProto rpcRequest = 1; repeated RaftPeerProto peers = 2; } + +// reinitialize request +message ReinitializeRequestProto { + RaftRpcRequestProto rpcRequest = 1; + repeated RaftPeerProto peers = 2; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index dbd32b7..0899dd1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -19,10 +19,7 @@ package org.apache.ratis.server; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; -import org.apache.ratis.protocol.RaftClientProtocol; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.*; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.impl.ServerFactory; import org.apache.ratis.server.impl.ServerImplUtils; @@ -35,7 +32,8 @@ import java.util.Objects; /** Raft server interface */ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, - RaftClientProtocol, RaftClientAsynchronousProtocol { + RaftClientProtocol, RaftClientAsynchronousProtocol, + AdminProtocol, AdminAsynchronousProtocol { /** @return the server ID. */ RaftPeerId getId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 4c19b7e..e9784bb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -448,8 +448,7 @@ public class LeaderState { } // the pending request handler will send NotLeaderException for // pending client requests when it stops - // TODO should close impl instead of proxy - server.getProxy().close(); + server.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java index 82f546b..06aeb62 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java @@ -38,7 +38,6 @@ class PeerConfiguration { map.put(p.getId(), p); } this.peers = Collections.unmodifiableMap(map); - Preconditions.assertTrue(!this.peers.isEmpty()); } Collection<RaftPeer> getPeers() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d62b207..9685fbc 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -445,7 +445,7 @@ public class RaftServerImpl implements RaftServerProtocol, return waitForReply(getId(), request, submitClientRequestAsync(request)); } - private static RaftClientReply waitForReply(RaftPeerId id, + static RaftClientReply waitForReply(RaftPeerId id, RaftClientRequest request, CompletableFuture<RaftClientReply> future) throws IOException { try { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java index 36adf11..5afb737 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java @@ -26,23 +26,29 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; public class RaftServerProxy implements RaftServer { public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class); private final RaftPeerId id; - private final RaftServerImpl impl; private final StateMachine stateMachine; private final RaftProperties properties; private final RaftServerRpc serverRpc; private final ServerFactory factory; + private volatile CompletableFuture<RaftServerImpl> impl; + private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>(); + RaftServerProxy(RaftPeerId id, StateMachine stateMachine, RaftConfiguration raftConf, RaftProperties properties, Parameters parameters) throws IOException { @@ -52,10 +58,15 @@ public class RaftServerProxy implements RaftServer { final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters)); - this.impl = new RaftServerImpl(id, this, raftConf, properties); + + this.impl = CompletableFuture.completedFuture(initImpl(raftConf)); this.serverRpc = initRaftServerRpc(factory, this, raftConf); } + private RaftServerImpl initImpl(RaftConfiguration raftConf) throws IOException { + return new RaftServerImpl(id, this, raftConf, properties); + } + private static RaftServerRpc initRaftServerRpc( ServerFactory factory, RaftServer server, RaftConfiguration raftConf) { final RaftServerRpc rpc = factory.newRaftServerRpc(server); @@ -67,6 +78,11 @@ public class RaftServerProxy implements RaftServer { } @Override + public RaftPeerId getId() { + return id; + } + + @Override public RpcType getRpcType() { return getFactory().getRpcType(); } @@ -90,24 +106,25 @@ public class RaftServerProxy implements RaftServer { return serverRpc; } - public RaftServerImpl getImpl() { - return impl; + public RaftServerImpl getImpl() throws IOException { + try { + return impl.get(); + } catch (InterruptedException e) { + throw IOUtils.toInterruptedIOException(getId() + ": getImpl interrupted.", e); + } catch (ExecutionException e) { + throw IOUtils.asIOException(e); + } } @Override public void start() { - getImpl().start(); + JavaUtils.getAndConsume(impl, RaftServerImpl::start); getServerRpc().start(); } @Override - public RaftPeerId getId() { - return id; - } - - @Override public void close() { - getImpl().shutdown(); + JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown); try { getServerRpc().close(); } catch (IOException ignored) { @@ -133,6 +150,46 @@ public class RaftServerProxy implements RaftServer { return getImpl().setConfiguration(request); } + @Override + public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException { + return RaftServerImpl.waitForReply(getId(), request, reinitializeAsync(request)); + } + + @Override + public CompletableFuture<RaftClientReply> reinitializeAsync( + ReinitializeRequest request) throws IOException { + if (!reinitializeRequest.compareAndSet(null, request)) { + throw new IOException("Another reinitialize is already in progress."); + } + return CompletableFuture.supplyAsync(() -> { + try { + final CompletableFuture<RaftServerImpl> oldImpl = impl; + impl = new CompletableFuture<>(); + JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown); + + final RaftConfiguration newConf = RaftConfiguration.newBuilder() + .setConf(request.getPeersInNewConf()).build(); + final RaftServerImpl newImpl; + try { + newImpl = initImpl(newConf); + } catch (IOException ioe) { + final RaftException re = new RaftException( + "Failed to reinitialize, request=" + request, ioe); + impl.completeExceptionally(new IOException( + "Server " + getId() + " is not initialized.", re)); + return new RaftClientReply(request, re); + } + + getServerRpc().addPeers(newConf.getPeers()); + newImpl.start(); + impl.complete(newImpl); + return new RaftClientReply(request, (Message) null); + } finally { + reinitializeRequest.set(null); + } + }); + } + /** * Handle a raft configuration change request from client. */ @@ -162,6 +219,10 @@ public class RaftServerProxy implements RaftServer { @Override public String toString() { - return getClass().getSimpleName() + ":" + getId().toString(); + try { + return getImpl().toString(); + } catch (IOException ignored) { + return getClass().getSimpleName() + ":" + getId(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index ef0e454..83fcf54 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -32,7 +32,6 @@ import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.statemachine.BaseStateMachine; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.*; -import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -335,7 +334,7 @@ public abstract class MiniRaftCluster { public RaftServerImpl getLeader() { final List<RaftServerImpl> leaders = new ArrayList<>(); - getServersAliveStream() + getServerAliveStream() .filter(RaftServerImpl::isLeader) .forEach(s -> { if (leaders.isEmpty()) { @@ -353,9 +352,9 @@ public abstract class MiniRaftCluster { }); if (leaders.isEmpty()) { return null; - } else if (leaders.size() != 1) { - Assert.fail(printServers() + leaders.toString() - + "leaders.size() = " + leaders.size() + " != 1"); + } else if (leaders.size() > 1) { + throw new IllegalStateException(printServers() + leaders + + ", leaders.size() = " + leaders.size() + " > 1"); } return leaders.get(0); } @@ -366,7 +365,7 @@ public abstract class MiniRaftCluster { } public List<RaftServerImpl> getFollowers() { - return getServersAliveStream() + return getServerAliveStream() .filter(RaftServerImpl::isFollower) .collect(Collectors.toList()); } @@ -376,13 +375,14 @@ public abstract class MiniRaftCluster { } public Iterable<RaftServerImpl> iterateServerImpls() { - return CollectionUtils.as(getServers(), RaftServerProxy::getImpl); + return CollectionUtils.as(getServers(), RaftTestUtil::getImplAsUnchecked); } - public Stream<RaftServerImpl> getServersAliveStream() { - return getServers().stream() - .map(RaftServerProxy::getImpl) - .filter(RaftServerImpl::isAlive); + public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) { + return servers.stream().map(RaftTestUtil::getImplAsUnchecked); + } + public Stream<RaftServerImpl> getServerAliveStream() { + return getServerStream(getServers()).filter(RaftServerImpl::isAlive); } public RaftServerProxy getServer(RaftPeerId id) { @@ -408,7 +408,7 @@ public abstract class MiniRaftCluster { public void shutdown() { LOG.info("Stopping " + getClass().getSimpleName()); - getServersAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close); + getServerAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close); if (ExitUtils.isTerminated()) { LOG.error("Test resulted in an unexpected exit", http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 9c69d03..26eda15 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -97,7 +97,7 @@ public abstract class RaftBasicTests { Thread.sleep(cluster.getMaxTimeout() + 100); LOG.info(cluster.printAllLogs()); - cluster.getServersAliveStream() + cluster.getServerAliveStream() .map(s -> s.getState().getLog()) .forEach(log -> RaftTestUtil.assertLogEntries(log, log.getEntries(1, Long.MAX_VALUE), 1, term, messages));
