http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java deleted file mode 100644 index 9dd1a31..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.client; - -import org.apache.ratis.client.RaftClientConfigKeys; -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.*; -import org.apache.ratis.util.TimeoutScheduler; -import org.apache.ratis.shaded.io.grpc.ManagedChannel; -import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; -import org.apache.ratis.shaded.io.grpc.netty.NegotiationType; -import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.*; -import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc; -import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub; -import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc; -import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; -import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; -import org.apache.ratis.util.CheckedSupplier; -import org.apache.ratis.util.CollectionUtils; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.SizeInBytes; -import org.apache.ratis.util.TimeDuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import java.util.function.Supplier; - -public class RaftClientProtocolClient implements Closeable { - public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolClient.class); - - private final Supplier<String> name; - private final RaftPeer target; - private final ManagedChannel channel; - - private final TimeDuration requestTimeoutDuration; - private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1); - - private final RaftClientProtocolServiceBlockingStub blockingStub; - private final RaftClientProtocolServiceStub asyncStub; - private final AdminProtocolServiceBlockingStub adminBlockingStub; - - private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>(); - - public RaftClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties) { - this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); - this.target = target; - - final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug); - final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); - channel = NettyChannelBuilder.forTarget(target.getAddress()) - .negotiationType(NegotiationType.PLAINTEXT) - .flowControlWindow(flowControlWindow.getSizeInt()) - .maxInboundMessageSize(maxMessageSize.getSizeInt()) - .build(); - blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); - asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); - adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel); - this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); - } - - String getName() { - return name.get(); - } - - @Override - public void close() { - final AsyncStreamObservers observers = appendStreamObservers.get(); - if (observers != null) { - observers.close(); - } - channel.shutdownNow(); - } - - RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { - return blockingCall(() -> adminBlockingStub - .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .groupManagement(request)); - } - - ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) { - return adminBlockingStub - .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .serverInformation(request); - } - - RaftClientReplyProto setConfiguration( - SetConfigurationRequestProto request) throws IOException { - return blockingCall(() -> blockingStub - .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .setConfiguration(request)); - } - - private static RaftClientReplyProto blockingCall( - CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier - ) throws IOException { - try { - return supplier.get(); - } catch (StatusRuntimeException e) { - throw RaftGrpcUtil.unwrapException(e); - } - } - - StreamObserver<RaftClientRequestProto> append( - StreamObserver<RaftClientReplyProto> responseHandler) { - return asyncStub.append(responseHandler); - } - - StreamObserver<RaftClientRequestProto> appendWithTimeout( - StreamObserver<RaftClientReplyProto> responseHandler) { - return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .append(responseHandler); - } - - AsyncStreamObservers getAppendStreamObservers() { - return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers()); - } - - public RaftPeer getTarget() { - return target; - } - - class AsyncStreamObservers implements Closeable { - /** Request map: callId -> future */ - private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>()); - private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() { - @Override - public void onNext(RaftClientReplyProto proto) { - final long callId = proto.getRpcReply().getCallId(); - try { - final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto); - final NotLeaderException nle = reply.getNotLeaderException(); - if (nle != null) { - completeReplyExceptionally(nle, NotLeaderException.class.getName()); - return; - } - handleReplyFuture(callId, f -> f.complete(reply)); - } catch (Throwable t) { - handleReplyFuture(callId, f -> f.completeExceptionally(t)); - } - } - - @Override - public void onError(Throwable t) { - final IOException ioe = RaftGrpcUtil.unwrapIOException(t); - completeReplyExceptionally(ioe, "onError"); - } - - @Override - public void onCompleted() { - completeReplyExceptionally(null, "completed"); - } - }; - private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver); - - CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) { - final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get(); - if (map == null) { - return JavaUtils.completeExceptionally(new IOException("Already closed.")); - } - final CompletableFuture<RaftClientReply> f = new CompletableFuture<>(); - CollectionUtils.putNew(request.getCallId(), f, map, - () -> getName() + ":" + getClass().getSimpleName()); - try { - requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request)); - scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG, - () -> "Timeout check failed for client request: " + request); - } catch(Throwable t) { - handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t)); - } - return f; - } - - private void timeoutCheck(RaftClientRequest request) { - handleReplyFuture(request.getCallId(), f -> f.completeExceptionally( - new IOException("Request timeout " + requestTimeoutDuration + ": " + request))); - } - - private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) { - Optional.ofNullable(replies.get()) - .map(replyMap -> replyMap.remove(callId)) - .ifPresent(handler); - } - - @Override - public void close() { - requestStreamObserver.onCompleted(); - completeReplyExceptionally(null, "close"); - } - - private void completeReplyExceptionally(Throwable t, String event) { - appendStreamObservers.compareAndSet(this, null); - final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null); - if (map == null) { - return; - } - for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) { - final CompletableFuture<RaftClientReply> f = entry.getValue(); - if (!f.isDone()) { - f.completeExceptionally(t != null? t - : new IOException(getName() + ": Stream " + event - + ": no reply for async request cid=" + entry.getKey())); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java deleted file mode 100644 index ee9ce4e..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.client; - -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.protocol.RaftPeer; - -import java.io.Closeable; -import java.io.IOException; -import java.util.function.Function; - -public class RaftClientProtocolProxy implements Closeable { - private final RaftClientProtocolClient proxy; - private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation; - private RpcSession currentSession; - - public RaftClientProtocolProxy(ClientId clientId, RaftPeer target, - Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation, - RaftProperties properties) { - proxy = new RaftClientProtocolClient(clientId, target, properties); - this.responseHandlerCreation = responseHandlerCreation; - } - - @Override - public void close() throws IOException { - closeCurrentSession(); - proxy.close(); - } - - @Override - public String toString() { - return "ProxyTo:" + proxy.getTarget(); - } - - public void closeCurrentSession() { - if (currentSession != null) { - currentSession.close(); - currentSession = null; - } - } - - public void onNext(RaftClientRequestProto request) { - if (currentSession == null) { - currentSession = new RpcSession( - responseHandlerCreation.apply(proxy.getTarget())); - } - currentSession.requestObserver.onNext(request); - } - - public void onError() { - if (currentSession != null) { - currentSession.onError(); - } - } - - public interface CloseableStreamObserver - extends StreamObserver<RaftClientReplyProto>, Closeable { - } - - class RpcSession implements Closeable { - private final StreamObserver<RaftClientRequestProto> requestObserver; - private final CloseableStreamObserver responseHandler; - private boolean hasError = false; - - RpcSession(CloseableStreamObserver responseHandler) { - this.responseHandler = responseHandler; - this.requestObserver = proxy.append(responseHandler); - } - - void onError() { - hasError = true; - } - - @Override - public void close() { - if (!hasError) { - try { - requestObserver.onCompleted(); - } catch (Exception ignored) { - } - } - try { - responseHandler.close(); - } catch (IOException ignored) { - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java deleted file mode 100644 index 4b92be5..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.client; - -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.*; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; -import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.SlidingWindow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.CompletionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - -public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase { - public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); - - private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> { - private final RaftClientRequest request; - private volatile RaftClientReply reply; - - PendingAppend(RaftClientRequest request) { - this.request = request; - } - - @Override - public boolean hasReply() { - return reply != null || this == COMPLETED; - } - - @Override - public void setReply(RaftClientReply reply) { - this.reply = reply; - } - - RaftClientReply getReply() { - return reply; - } - - RaftClientRequest getRequest() { - return request; - } - - @Override - public long getSeqNum() { - return request != null? request.getSeqNum(): Long.MAX_VALUE; - } - - @Override - public String toString() { - return request != null? getSeqNum() + ":" + reply: "COMPLETED"; - } - } - private static final PendingAppend COMPLETED = new PendingAppend(null); - - private final Supplier<RaftPeerId> idSupplier; - private final RaftClientAsynchronousProtocol protocol; - - public RaftClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) { - this.idSupplier = idSupplier; - this.protocol = protocol; - } - - RaftPeerId getId() { - return idSupplier.get(); - } - - @Override - public void setConfiguration(SetConfigurationRequestProto proto, - StreamObserver<RaftClientReplyProto> responseObserver) { - final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto); - RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request), - ClientProtoUtils::toRaftClientReplyProto); - } - - @Override - public StreamObserver<RaftClientRequestProto> append( - StreamObserver<RaftClientReplyProto> responseObserver) { - return new AppendRequestStreamObserver(responseObserver); - } - - private final AtomicInteger streamCount = new AtomicInteger(); - - private class AppendRequestStreamObserver implements - StreamObserver<RaftClientRequestProto> { - private final String name = getId() + "-" + streamCount.getAndIncrement(); - private final StreamObserver<RaftClientReplyProto> responseObserver; - private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow - = new SlidingWindow.Server<>(name, COMPLETED); - private final AtomicBoolean isClosed; - - AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) { - LOG.debug("new AppendRequestStreamObserver {}", name); - this.responseObserver = ro; - this.isClosed = new AtomicBoolean(false); - } - - void processClientRequestAsync(PendingAppend pending) { - try { - protocol.submitClientRequestAsync(pending.getRequest() - ).thenAcceptAsync(reply -> slidingWindow.receiveReply( - pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync) - ).exceptionally(exception -> { - // TODO: the exception may be from either raft or state machine. - // Currently we skip all the following responses when getting an - // exception from the state machine. - responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest()); - return null; - }); - } catch (IOException e) { - throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e); - } - } - - @Override - public void onNext(RaftClientRequestProto request) { - try { - final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request); - final PendingAppend p = new PendingAppend(r); - slidingWindow.receivedRequest(p, this::processClientRequestAsync); - } catch (Throwable e) { - responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request)); - } - } - - private void sendReply(PendingAppend ready) { - Preconditions.assertTrue(ready.hasReply()); - if (ready == COMPLETED) { - close(); - } else { - LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply()); - responseObserver.onNext( - ClientProtoUtils.toRaftClientReplyProto(ready.getReply())); - } - } - - @Override - public void onError(Throwable t) { - // for now we just log a msg - RaftGrpcUtil.warn(LOG, () -> name + ": onError", t); - slidingWindow.close(); - } - - @Override - public void onCompleted() { - if (slidingWindow.endOfRequests()) { - close(); - } - } - - private void close() { - if (isClosed.compareAndSet(false, true)) { - LOG.debug("{}: close", name); - responseObserver.onCompleted(); - slidingWindow.close(); - } - } - - void responseError(Throwable t, Supplier<String> message) { - if (isClosed.compareAndSet(false, true)) { - t = JavaUtils.unwrapCompletionException(t); - if (LOG.isDebugEnabled()) { - LOG.debug(name + ": Failed " + message.get(), t); - } - responseObserver.onError(RaftGrpcUtil.wrapException(t)); - slidingWindow.close(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java deleted file mode 100644 index 09d57a0..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.client; - -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.util.ProtoUtils; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicLong; - -public class RaftOutputStream extends OutputStream { - /** internal buffer */ - private final byte buf[]; - private int count; - private final AtomicLong seqNum = new AtomicLong(); - private final ClientId clientId; - private final AppendStreamer streamer; - - private boolean closed = false; - - public RaftOutputStream(RaftProperties prop, ClientId clientId, - RaftGroup group, RaftPeerId leaderId) { - final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt(); - buf = new byte[bufferSize]; - count = 0; - this.clientId = clientId; - streamer = new AppendStreamer(prop, group, leaderId, clientId); - } - - @Override - public void write(int b) throws IOException { - checkClosed(); - buf[count++] = (byte)b; - flushIfNecessary(); - } - - private void flushIfNecessary() throws IOException { - if(count == buf.length) { - flushToStreamer(); - } - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - checkClosed(); - if (off < 0 || len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - - int total = 0; - while (total < len) { - int toWrite = Math.min(len - total, buf.length - count); - System.arraycopy(b, off + total, buf, count, toWrite); - count += toWrite; - total += toWrite; - flushIfNecessary(); - } - } - - private void flushToStreamer() throws IOException { - if (count > 0) { - streamer.write(ProtoUtils.toByteString(buf, 0, count), - seqNum.getAndIncrement()); - count = 0; - } - } - - @Override - public void flush() throws IOException { - checkClosed(); - flushToStreamer(); - streamer.flush(); - } - - @Override - public void close() throws IOException { - flushToStreamer(); - streamer.close(); // streamer will flush - this.closed = true; - } - - @Override - public String toString() { - return "RaftOutputStream-" + clientId; - } - - private void checkClosed() throws IOException { - if (closed) { - throw new IOException(this.toString() + " was closed."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java deleted file mode 100644 index d65abd0..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.server; - -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.AdminAsynchronousProtocol; -import org.apache.ratis.protocol.GroupManagementRequest; -import org.apache.ratis.protocol.ServerInformationRequest; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto; -import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase; - -public class AdminProtocolService extends AdminProtocolServiceImplBase { - private final AdminAsynchronousProtocol protocol; - - public AdminProtocolService(AdminAsynchronousProtocol protocol) { - this.protocol = protocol; - } - - @Override - public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) { - final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto); - RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request), - ClientProtoUtils::toRaftClientReplyProto); - } - - @Override - public void serverInformation(ServerInformationRequestProto proto, - StreamObserver<ServerInformationReplyProto> responseObserver) { - final ServerInformationRequest request = ClientProtoUtils.toServerInformationRequest(proto); - RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.getInfoAsync(request), - ClientProtoUtils::toServerInformationReplyProto); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java deleted file mode 100644 index 7dfe033..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ /dev/null @@ -1,438 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.server; - -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.grpc.RaftGRpcService; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.FollowerInfo; -import org.apache.ratis.server.impl.LeaderState; -import org.apache.ratis.server.impl.LogAppender; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.ServerProtoUtils; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.statemachine.SnapshotInfo; -import org.apache.ratis.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * A new log appender implementation using grpc bi-directional stream API. - */ -public class GRpcLogAppender extends LogAppender { - public static final Logger LOG = LoggerFactory.getLogger(GRpcLogAppender.class); - - private final RaftGRpcService rpcService; - private final Map<Long, AppendEntriesRequestProto> pendingRequests; - private final int maxPendingRequestsNum; - private long callId = 0; - private volatile boolean firstResponseReceived = false; - - private final TimeDuration requestTimeoutDuration; - private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1); - - private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; - - public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState, - FollowerInfo f) { - super(server, leaderState, f); - - this.rpcService = (RaftGRpcService) server.getServerRpc(); - - maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax( - server.getProxy().getProperties()); - requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties()); - pendingRequests = new ConcurrentHashMap<>(); - } - - private RaftServerProtocolClient getClient() throws IOException { - return rpcService.getProxies().getProxy(follower.getPeer().getId()); - } - - private synchronized void resetClient(AppendEntriesRequestProto request) { - rpcService.getProxies().resetProxy(follower.getPeer().getId()); - appendLogRequestObserver = null; - firstResponseReceived = false; - - // clear the pending requests queue and reset the next index of follower - final long nextIndex = request != null && request.hasPreviousLog()? - request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex(); - clearPendingRequests(nextIndex); - } - - @Override - protected void runAppenderImpl() throws IOException { - for(; isAppenderRunning(); mayWait()) { - if (shouldSendRequest()) { - SnapshotInfo snapshot = shouldInstallSnapshot(); - if (snapshot != null) { - installSnapshot(snapshot); - } else if (!shouldWait()) { - // keep appending log entries or sending heartbeats - appendLog(); - } - } - checkSlowness(); - } - - Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted); - } - - private long getWaitTimeMs() { - if (!shouldSendRequest()) { - return getHeartbeatRemainingTime(); // No requests, wait until heartbeat - } else if (shouldWait()) { - return halfMinTimeoutMs; // Should wait for a short time - } - return 0L; - } - - private void mayWait() { - // use lastSend time instead of lastResponse time - final long waitTimeMs = getWaitTimeMs(); - if (waitTimeMs <= 0L) { - return; - } - - synchronized(this) { - try { - LOG.trace("{}: wait {}ms", this, waitTimeMs); - wait(waitTimeMs); - } catch(InterruptedException ie) { - LOG.warn(this + ": Wait interrupted by " + ie); - } - } - } - - @Override - protected boolean shouldSendRequest() { - return appendLogRequestObserver == null || super.shouldSendRequest(); - } - - /** @return true iff not received first response or queue is full. */ - private boolean shouldWait() { - final int size = pendingRequests.size(); - if (size == 0) { - return false; - } - return !firstResponseReceived || size >= maxPendingRequestsNum; - } - - private void appendLog() throws IOException { - final AppendEntriesRequestProto pending; - final StreamObserver<AppendEntriesRequestProto> s; - synchronized (this) { - // prepare and enqueue the append request. note changes on follower's - // nextIndex and ops on pendingRequests should always be associated - // together and protected by the lock - pending = createRequest(callId++); - if (pending == null) { - return; - } - pendingRequests.put(pending.getServerRequest().getCallId(), pending); - updateNextIndex(pending); - if (appendLogRequestObserver == null) { - appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler()); - } - s = appendLogRequestObserver; - } - - if (isAppenderRunning()) { - sendRequest(pending, s); - } - } - - private void sendRequest(AppendEntriesRequestProto request, - StreamObserver<AppendEntriesRequestProto> s) { - CodeInjectionForTesting.execute(RaftGRpcService.GRPC_SEND_SERVER_REQUEST, - server.getId(), null, request); - - s.onNext(request); - scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG, - () -> "Timeout check failed for append entry request: " + request); - follower.updateLastRpcSendTime(); - } - - private void timeoutAppendRequest(AppendEntriesRequestProto request) { - AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId()); - if (pendingRequest != null) { - LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest())); - } - } - - private void updateNextIndex(AppendEntriesRequestProto request) { - final int count = request.getEntriesCount(); - if (count > 0) { - follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1); - } - } - - /** - * StreamObserver for handling responses from the follower - */ - private class AppendLogResponseHandler - implements StreamObserver<AppendEntriesReplyProto> { - /** - * After receiving a appendEntries reply, do the following: - * 1. If the reply is success, update the follower's match index and submit - * an event to leaderState - * 2. If the reply is NOT_LEADER, step down - * 3. If the reply is INCONSISTENCY, decrease the follower's next index - * based on the response - */ - @Override - public void onNext(AppendEntriesReplyProto reply) { - LOG.debug("{} received {} response from {}", server.getId(), - (!firstResponseReceived ? "the first" : "a"), - follower.getPeer()); - - // update the last rpc time - follower.updateLastRpcResponseTime(); - - if (!firstResponseReceived) { - firstResponseReceived = true; - } - switch (reply.getResult()) { - case SUCCESS: - onSuccess(reply); - break; - case NOT_LEADER: - onNotLeader(reply); - break; - case INCONSISTENCY: - onInconsistency(reply); - break; - default: - break; - } - notifyAppend(); - } - - /** - * for now we simply retry the first pending request - */ - @Override - public void onError(Throwable t) { - if (!isAppenderRunning()) { - LOG.info("{} is stopped", GRpcLogAppender.this); - return; - } - RaftGrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t); - - long callId = RaftGrpcUtil.getCallId(t); - resetClient(pendingRequests.get(callId)); - } - - @Override - public void onCompleted() { - LOG.info("{} stops appending log entries to follower {}", server.getId(), - follower); - } - } - - private void clearPendingRequests(long newNextIndex) { - pendingRequests.clear(); - follower.decreaseNextIndex(newNextIndex); - } - - protected synchronized void onSuccess(AppendEntriesReplyProto reply) { - AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId()); - if (request == null) { - // If reply comes after timeout, the reply is ignored. - LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply)); - return; - } - updateCommitIndex(request.getLeaderCommit()); - - final long replyNextIndex = reply.getNextIndex(); - final long lastIndex = replyNextIndex - 1; - final boolean updateMatchIndex; - - if (request.getEntriesCount() == 0) { - Preconditions.assertTrue(!request.hasPreviousLog() || - lastIndex == request.getPreviousLog().getIndex(), - "reply's next index is %s, request's previous is %s", - replyNextIndex, request.getPreviousLog()); - updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() < lastIndex; - } else { - // check if the reply and the pending request is consistent - final long lastEntryIndex = request - .getEntries(request.getEntriesCount() - 1).getIndex(); - Preconditions.assertTrue(lastIndex == lastEntryIndex, - "reply's next index is %s, request's last entry index is %s", - replyNextIndex, lastEntryIndex); - updateMatchIndex = true; - } - if (updateMatchIndex) { - follower.updateMatchIndex(lastIndex); - submitEventOnSuccessAppend(); - } - } - - private void onNotLeader(AppendEntriesReplyProto reply) { - checkResponseTerm(reply.getTerm()); - // the running loop will end and the connection will onComplete - } - - private synchronized void onInconsistency(AppendEntriesReplyProto reply) { - AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId()); - if (request == null) { - // If reply comes after timeout, the reply is ignored. - LOG.warn("{}: Ignoring {}", server.getId(), reply); - return; - } - Preconditions.assertTrue(request.hasPreviousLog()); - if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) { - clearPendingRequests(reply.getNextIndex()); - } - } - - private class InstallSnapshotResponseHandler - implements StreamObserver<InstallSnapshotReplyProto> { - private final Queue<Integer> pending; - private final AtomicBoolean done = new AtomicBoolean(false); - - InstallSnapshotResponseHandler() { - pending = new LinkedList<>(); - } - - synchronized void addPending(InstallSnapshotRequestProto request) { - pending.offer(request.getRequestIndex()); - } - - synchronized void removePending(InstallSnapshotReplyProto reply) { - int index = pending.poll(); - Preconditions.assertTrue(index == reply.getRequestIndex()); - } - - boolean isDone() { - return done.get(); - } - - void close() { - done.set(true); - GRpcLogAppender.this.notifyAppend(); - } - - synchronized boolean hasAllResponse() { - return pending.isEmpty(); - } - - @Override - public void onNext(InstallSnapshotReplyProto reply) { - LOG.debug("{} received {} response from {}", server.getId(), - (!firstResponseReceived ? "the first" : "a"), - follower.getPeer()); - - // update the last rpc time - follower.updateLastRpcResponseTime(); - - if (!firstResponseReceived) { - firstResponseReceived = true; - } - - switch (reply.getResult()) { - case SUCCESS: - removePending(reply); - break; - case NOT_LEADER: - checkResponseTerm(reply.getTerm()); - break; - case UNRECOGNIZED: - break; - } - } - - @Override - public void onError(Throwable t) { - if (!isAppenderRunning()) { - LOG.info("{} is stopped", GRpcLogAppender.this); - return; - } - LOG.info("{} got error when installing snapshot to {}, exception: {}", - server.getId(), follower.getPeer(), t); - resetClient(null); - close(); - } - - @Override - public void onCompleted() { - LOG.info("{} stops sending snapshots to follower {}", server.getId(), - follower); - close(); - } - } - - private void installSnapshot(SnapshotInfo snapshot) { - LOG.info("{}: follower {}'s next index is {}," + - " log's start index is {}, need to install snapshot", - server.getId(), follower.getPeer(), follower.getNextIndex(), - raftLog.getStartIndex()); - - final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(); - StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null; - final String requestId = UUID.randomUUID().toString(); - try { - snapshotRequestObserver = getClient().installSnapshot(responseHandler); - for (InstallSnapshotRequestProto request : - new SnapshotRequestIter(snapshot, requestId)) { - if (isAppenderRunning()) { - snapshotRequestObserver.onNext(request); - follower.updateLastRpcSendTime(); - responseHandler.addPending(request); - } else { - break; - } - } - snapshotRequestObserver.onCompleted(); - } catch (Exception e) { - LOG.warn("{} failed to install snapshot {}. Exception: {}", this, - snapshot.getFiles(), e); - if (snapshotRequestObserver != null) { - snapshotRequestObserver.onError(e); - } - return; - } - - synchronized (this) { - while (isAppenderRunning() && !responseHandler.isDone()) { - try { - wait(); - } catch (InterruptedException ignored) { - } - } - } - - if (responseHandler.hasAllResponse()) { - follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); - follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); - LOG.info("{}: install snapshot-{} successfully on follower {}", - server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java new file mode 100644 index 0000000..1201bf2 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.protocol.AdminAsynchronousProtocol; +import org.apache.ratis.protocol.GroupManagementRequest; +import org.apache.ratis.protocol.ServerInformationRequest; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto; +import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase; + +public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase { + private final AdminAsynchronousProtocol protocol; + + public GrpcAdminProtocolService(AdminAsynchronousProtocol protocol) { + this.protocol = protocol; + } + + @Override + public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) { + final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto); + GrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request), + ClientProtoUtils::toRaftClientReplyProto); + } + + @Override + public void serverInformation(ServerInformationRequestProto proto, + StreamObserver<ServerInformationReplyProto> responseObserver) { + final ServerInformationRequest request = ClientProtoUtils.toServerInformationRequest(proto); + GrpcUtil.asyncCall(responseObserver, () -> protocol.getInfoAsync(request), + ClientProtoUtils::toServerInformationReplyProto); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java new file mode 100644 index 0000000..3da58bf --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -0,0 +1,437 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.FollowerInfo; +import org.apache.ratis.server.impl.LeaderState; +import org.apache.ratis.server.impl.LogAppender; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.ServerProtoUtils; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A new log appender implementation using grpc bi-directional stream API. + */ +public class GrpcLogAppender extends LogAppender { + public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class); + + private final GrpcService rpcService; + private final Map<Long, AppendEntriesRequestProto> pendingRequests; + private final int maxPendingRequestsNum; + private long callId = 0; + private volatile boolean firstResponseReceived = false; + + private final TimeDuration requestTimeoutDuration; + private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1); + + private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; + + public GrpcLogAppender(RaftServerImpl server, LeaderState leaderState, + FollowerInfo f) { + super(server, leaderState, f); + + this.rpcService = (GrpcService) server.getServerRpc(); + + maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax( + server.getProxy().getProperties()); + requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties()); + pendingRequests = new ConcurrentHashMap<>(); + } + + private GrpcServerProtocolClient getClient() throws IOException { + return rpcService.getProxies().getProxy(follower.getPeer().getId()); + } + + private synchronized void resetClient(AppendEntriesRequestProto request) { + rpcService.getProxies().resetProxy(follower.getPeer().getId()); + appendLogRequestObserver = null; + firstResponseReceived = false; + + // clear the pending requests queue and reset the next index of follower + final long nextIndex = request != null && request.hasPreviousLog()? + request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex(); + clearPendingRequests(nextIndex); + } + + @Override + protected void runAppenderImpl() throws IOException { + for(; isAppenderRunning(); mayWait()) { + if (shouldSendRequest()) { + SnapshotInfo snapshot = shouldInstallSnapshot(); + if (snapshot != null) { + installSnapshot(snapshot); + } else if (!shouldWait()) { + // keep appending log entries or sending heartbeats + appendLog(); + } + } + checkSlowness(); + } + + Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted); + } + + private long getWaitTimeMs() { + if (!shouldSendRequest()) { + return getHeartbeatRemainingTime(); // No requests, wait until heartbeat + } else if (shouldWait()) { + return halfMinTimeoutMs; // Should wait for a short time + } + return 0L; + } + + private void mayWait() { + // use lastSend time instead of lastResponse time + final long waitTimeMs = getWaitTimeMs(); + if (waitTimeMs <= 0L) { + return; + } + + synchronized(this) { + try { + LOG.trace("{}: wait {}ms", this, waitTimeMs); + wait(waitTimeMs); + } catch(InterruptedException ie) { + LOG.warn(this + ": Wait interrupted by " + ie); + } + } + } + + @Override + protected boolean shouldSendRequest() { + return appendLogRequestObserver == null || super.shouldSendRequest(); + } + + /** @return true iff not received first response or queue is full. */ + private boolean shouldWait() { + final int size = pendingRequests.size(); + if (size == 0) { + return false; + } + return !firstResponseReceived || size >= maxPendingRequestsNum; + } + + private void appendLog() throws IOException { + final AppendEntriesRequestProto pending; + final StreamObserver<AppendEntriesRequestProto> s; + synchronized (this) { + // prepare and enqueue the append request. note changes on follower's + // nextIndex and ops on pendingRequests should always be associated + // together and protected by the lock + pending = createRequest(callId++); + if (pending == null) { + return; + } + pendingRequests.put(pending.getServerRequest().getCallId(), pending); + updateNextIndex(pending); + if (appendLogRequestObserver == null) { + appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler()); + } + s = appendLogRequestObserver; + } + + if (isAppenderRunning()) { + sendRequest(pending, s); + } + } + + private void sendRequest(AppendEntriesRequestProto request, + StreamObserver<AppendEntriesRequestProto> s) { + CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, + server.getId(), null, request); + + s.onNext(request); + scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG, + () -> "Timeout check failed for append entry request: " + request); + follower.updateLastRpcSendTime(); + } + + private void timeoutAppendRequest(AppendEntriesRequestProto request) { + AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId()); + if (pendingRequest != null) { + LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest())); + } + } + + private void updateNextIndex(AppendEntriesRequestProto request) { + final int count = request.getEntriesCount(); + if (count > 0) { + follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1); + } + } + + /** + * StreamObserver for handling responses from the follower + */ + private class AppendLogResponseHandler + implements StreamObserver<AppendEntriesReplyProto> { + /** + * After receiving a appendEntries reply, do the following: + * 1. If the reply is success, update the follower's match index and submit + * an event to leaderState + * 2. If the reply is NOT_LEADER, step down + * 3. If the reply is INCONSISTENCY, decrease the follower's next index + * based on the response + */ + @Override + public void onNext(AppendEntriesReplyProto reply) { + LOG.debug("{} received {} response from {}", server.getId(), + (!firstResponseReceived ? "the first" : "a"), + follower.getPeer()); + + // update the last rpc time + follower.updateLastRpcResponseTime(); + + if (!firstResponseReceived) { + firstResponseReceived = true; + } + switch (reply.getResult()) { + case SUCCESS: + onSuccess(reply); + break; + case NOT_LEADER: + onNotLeader(reply); + break; + case INCONSISTENCY: + onInconsistency(reply); + break; + default: + break; + } + notifyAppend(); + } + + /** + * for now we simply retry the first pending request + */ + @Override + public void onError(Throwable t) { + if (!isAppenderRunning()) { + LOG.info("{} is stopped", GrpcLogAppender.this); + return; + } + GrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t); + + long callId = GrpcUtil.getCallId(t); + resetClient(pendingRequests.get(callId)); + } + + @Override + public void onCompleted() { + LOG.info("{} stops appending log entries to follower {}", server.getId(), + follower); + } + } + + private void clearPendingRequests(long newNextIndex) { + pendingRequests.clear(); + follower.decreaseNextIndex(newNextIndex); + } + + protected synchronized void onSuccess(AppendEntriesReplyProto reply) { + AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId()); + if (request == null) { + // If reply comes after timeout, the reply is ignored. + LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply)); + return; + } + updateCommitIndex(request.getLeaderCommit()); + + final long replyNextIndex = reply.getNextIndex(); + final long lastIndex = replyNextIndex - 1; + final boolean updateMatchIndex; + + if (request.getEntriesCount() == 0) { + Preconditions.assertTrue(!request.hasPreviousLog() || + lastIndex == request.getPreviousLog().getIndex(), + "reply's next index is %s, request's previous is %s", + replyNextIndex, request.getPreviousLog()); + updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() < lastIndex; + } else { + // check if the reply and the pending request is consistent + final long lastEntryIndex = request + .getEntries(request.getEntriesCount() - 1).getIndex(); + Preconditions.assertTrue(lastIndex == lastEntryIndex, + "reply's next index is %s, request's last entry index is %s", + replyNextIndex, lastEntryIndex); + updateMatchIndex = true; + } + if (updateMatchIndex) { + follower.updateMatchIndex(lastIndex); + submitEventOnSuccessAppend(); + } + } + + private void onNotLeader(AppendEntriesReplyProto reply) { + checkResponseTerm(reply.getTerm()); + // the running loop will end and the connection will onComplete + } + + private synchronized void onInconsistency(AppendEntriesReplyProto reply) { + AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId()); + if (request == null) { + // If reply comes after timeout, the reply is ignored. + LOG.warn("{}: Ignoring {}", server.getId(), reply); + return; + } + Preconditions.assertTrue(request.hasPreviousLog()); + if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) { + clearPendingRequests(reply.getNextIndex()); + } + } + + private class InstallSnapshotResponseHandler + implements StreamObserver<InstallSnapshotReplyProto> { + private final Queue<Integer> pending; + private final AtomicBoolean done = new AtomicBoolean(false); + + InstallSnapshotResponseHandler() { + pending = new LinkedList<>(); + } + + synchronized void addPending(InstallSnapshotRequestProto request) { + pending.offer(request.getRequestIndex()); + } + + synchronized void removePending(InstallSnapshotReplyProto reply) { + int index = pending.poll(); + Preconditions.assertTrue(index == reply.getRequestIndex()); + } + + boolean isDone() { + return done.get(); + } + + void close() { + done.set(true); + GrpcLogAppender.this.notifyAppend(); + } + + synchronized boolean hasAllResponse() { + return pending.isEmpty(); + } + + @Override + public void onNext(InstallSnapshotReplyProto reply) { + LOG.debug("{} received {} response from {}", server.getId(), + (!firstResponseReceived ? "the first" : "a"), + follower.getPeer()); + + // update the last rpc time + follower.updateLastRpcResponseTime(); + + if (!firstResponseReceived) { + firstResponseReceived = true; + } + + switch (reply.getResult()) { + case SUCCESS: + removePending(reply); + break; + case NOT_LEADER: + checkResponseTerm(reply.getTerm()); + break; + case UNRECOGNIZED: + break; + } + } + + @Override + public void onError(Throwable t) { + if (!isAppenderRunning()) { + LOG.info("{} is stopped", GrpcLogAppender.this); + return; + } + LOG.info("{} got error when installing snapshot to {}, exception: {}", + server.getId(), follower.getPeer(), t); + resetClient(null); + close(); + } + + @Override + public void onCompleted() { + LOG.info("{} stops sending snapshots to follower {}", server.getId(), + follower); + close(); + } + } + + private void installSnapshot(SnapshotInfo snapshot) { + LOG.info("{}: follower {}'s next index is {}," + + " log's start index is {}, need to install snapshot", + server.getId(), follower.getPeer(), follower.getNextIndex(), + raftLog.getStartIndex()); + + final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(); + StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null; + final String requestId = UUID.randomUUID().toString(); + try { + snapshotRequestObserver = getClient().installSnapshot(responseHandler); + for (InstallSnapshotRequestProto request : + new SnapshotRequestIter(snapshot, requestId)) { + if (isAppenderRunning()) { + snapshotRequestObserver.onNext(request); + follower.updateLastRpcSendTime(); + responseHandler.addPending(request); + } else { + break; + } + } + snapshotRequestObserver.onCompleted(); + } catch (Exception e) { + LOG.warn("{} failed to install snapshot {}. Exception: {}", this, + snapshot.getFiles(), e); + if (snapshotRequestObserver != null) { + snapshotRequestObserver.onError(e); + } + return; + } + + synchronized (this) { + while (isAppenderRunning() && !responseHandler.isDone()) { + try { + wait(); + } catch (InterruptedException ignored) { + } + } + } + + if (responseHandler.hasAllResponse()) { + follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); + follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); + LOG.info("{}: install snapshot-{} successfully on follower {}", + server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java new file mode 100644 index 0000000..3b2f8ba --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.shaded.io.grpc.ManagedChannel; +import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.util.TimeDuration; + +import java.io.Closeable; + +/** + * This is a RaftClient implementation that supports streaming data to the raft + * ring. The stream implementation utilizes gRPC. + */ +public class GrpcServerProtocolClient implements Closeable { + private final ManagedChannel channel; + private final TimeDuration requestTimeoutDuration; + private final RaftServerProtocolServiceBlockingStub blockingStub; + private final RaftServerProtocolServiceStub asyncStub; + + public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow, + TimeDuration requestTimeoutDuration) { + channel = NettyChannelBuilder.forTarget(target.getAddress()) + .usePlaintext(true).flowControlWindow(flowControlWindow) + .build(); + blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); + asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); + this.requestTimeoutDuration = requestTimeoutDuration; + } + + @Override + public void close() { + channel.shutdownNow(); + } + + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) { + // the StatusRuntimeException will be handled by the caller + RequestVoteReplyProto r = + blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .requestVote(request); + return r; + } + + StreamObserver<AppendEntriesRequestProto> appendEntries( + StreamObserver<AppendEntriesReplyProto> responseHandler) { + return asyncStub.appendEntries(responseHandler); + } + + StreamObserver<InstallSnapshotRequestProto> installSnapshot( + StreamObserver<InstallSnapshotReplyProto> responseHandler) { + return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .installSnapshot(responseHandler); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java new file mode 100644 index 0000000..83335b8 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; +import org.apache.ratis.util.ProtoUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase { + public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class); + + private final Supplier<RaftPeerId> idSupplier; + private final RaftServer server; + + public GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server) { + this.idSupplier = idSupplier; + this.server = server; + } + + RaftPeerId getId() { + return idSupplier.get(); + } + + @Override + public void requestVote(RequestVoteRequestProto request, + StreamObserver<RequestVoteReplyProto> responseObserver) { + try { + final RequestVoteReplyProto reply = server.requestVote(request); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } catch (Throwable e) { + GrpcUtil.warn(LOG, () -> getId() + ": Failed requestVote " + ProtoUtils.toString(request.getServerRequest()), e); + responseObserver.onError(GrpcUtil.wrapException(e)); + } + } + + @Override + public StreamObserver<AppendEntriesRequestProto> appendEntries( + StreamObserver<AppendEntriesReplyProto> responseObserver) { + return new StreamObserver<AppendEntriesRequestProto>() { + private final AtomicReference<CompletableFuture<Void>> previousOnNext = + new AtomicReference<>(CompletableFuture.completedFuture(null)); + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + @Override + public void onNext(AppendEntriesRequestProto request) { + final CompletableFuture<Void> current = new CompletableFuture<>(); + final CompletableFuture<Void> previous = previousOnNext.getAndSet(current); + try { + server.appendEntriesAsync(request).thenCombine(previous, + (reply, v) -> { + if (!isClosed.get()) { + responseObserver.onNext(reply); + } + current.complete(null); + return null; + }); + } catch (Throwable e) { + GrpcUtil.warn(LOG, () -> getId() + ": Failed appendEntries " + ProtoUtils.toString(request.getServerRequest()), e); + responseObserver.onError(GrpcUtil.wrapException(e, request.getServerRequest().getCallId())); + current.completeExceptionally(e); + } + } + + @Override + public void onError(Throwable t) { + // for now we just log a msg + GrpcUtil.warn(LOG, () -> getId() + ": appendEntries onError", t); + } + + @Override + public void onCompleted() { + if (isClosed.compareAndSet(false, true)) { + LOG.info("{}: appendEntries completed", getId()); + responseObserver.onCompleted(); + } + } + }; + } + + @Override + public StreamObserver<InstallSnapshotRequestProto> installSnapshot( + StreamObserver<InstallSnapshotReplyProto> responseObserver) { + return new StreamObserver<InstallSnapshotRequestProto>() { + @Override + public void onNext(InstallSnapshotRequestProto request) { + try { + final InstallSnapshotReplyProto reply = server.installSnapshot(request); + responseObserver.onNext(reply); + } catch (Throwable e) { + GrpcUtil.warn(LOG, () -> getId() + ": Failed installSnapshot " + ProtoUtils.toString(request.getServerRequest()), e); + responseObserver.onError(GrpcUtil.wrapException(e)); + } + } + + @Override + public void onError(Throwable t) { + GrpcUtil.warn(LOG, () -> getId() + ": installSnapshot onError", t); + } + + @Override + public void onCompleted() { + LOG.info("{}: installSnapshot completed", getId()); + responseObserver.onCompleted(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java new file mode 100644 index 0000000..eb8310c --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.server; + +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.client.GrpcClientProtocolService; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.server.impl.RaftServerRpcWithProxy; +import org.apache.ratis.shaded.io.grpc.Server; +import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.function.Supplier; + +/** A grpc implementation of {@link RaftServerRpc}. */ +public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>> { + static final Logger LOG = LoggerFactory.getLogger(GrpcService.class); + public static final String GRPC_SEND_SERVER_REQUEST = + GrpcService.class.getSimpleName() + ".sendRequest"; + + public static class Builder extends RaftServerRpc.Builder<Builder, GrpcService> { + private Builder() {} + + @Override + public Builder getThis() { + return this; + } + + @Override + public GrpcService build() { + return new GrpcService(getServer()); + } + } + + public static Builder newBuilder() { + return new Builder(); + } + + private final Server server; + private final Supplier<InetSocketAddress> addressSupplier; + + private GrpcService(RaftServer server) { + this(server, server::getId, + GrpcConfigKeys.Server.port(server.getProperties()), + GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info), + RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()), + GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info), + RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties())); + } + private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port, + SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, + SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration) { + super(idSupplier, id -> new PeerProxyMap<>(id.toString(), + p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration))); + if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { + throw new IllegalArgumentException("Illegal configuration: " + + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize + + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); + } + + server = NettyServerBuilder.forPort(port) + .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt()) + .flowControlWindow(flowControlWindow.getSizeInt()) + .addService(new GrpcServerProtocolService(idSupplier, raftServer)) + .addService(new GrpcClientProtocolService(idSupplier, raftServer)) + .addService(new GrpcAdminProtocolService(raftServer)) + .build(); + addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort())); + } + + @Override + public SupportedRpcType getRpcType() { + return SupportedRpcType.GRPC; + } + + @Override + public void startImpl() { + try { + server.start(); + } catch (IOException e) { + ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG); + } + LOG.info("{}: {} started, listening on {}", getId(), getClass().getSimpleName(), getInetSocketAddress()); + } + + @Override + public void closeImpl() throws IOException { + final String name = getId() + ": shutdown server with port " + server.getPort(); + LOG.info("{} now", name); + final Server s = server.shutdownNow(); + super.closeImpl(); + try { + s.awaitTermination(); + } catch(InterruptedException e) { + throw IOUtils.toInterruptedIOException(name + " failed", e); + } + LOG.info("{} successfully", name); + } + + @Override + public InetSocketAddress getInetSocketAddress() { + return addressSupplier.get(); + } + + @Override + public AppendEntriesReplyProto appendEntries( + AppendEntriesRequestProto request) throws IOException { + throw new UnsupportedOperationException( + "Blocking AppendEntries call is not supported"); + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + throw new UnsupportedOperationException( + "Blocking InstallSnapshot call is not supported"); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) + throws IOException { + CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), + null, request); + + final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId()); + return getProxies().getProxy(target).requestVote(request); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java deleted file mode 100644 index b801c2a..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.server; - -import org.apache.ratis.shaded.io.grpc.ManagedChannel; -import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.*; -import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc; -import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub; -import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub; -import org.apache.ratis.protocol.RaftPeer; -import org.apache.ratis.util.TimeDuration; - -import java.io.Closeable; - -/** - * This is a RaftClient implementation that supports streaming data to the raft - * ring. The stream implementation utilizes gRPC. - */ -public class RaftServerProtocolClient implements Closeable { - private final ManagedChannel channel; - private final TimeDuration requestTimeoutDuration; - private final RaftServerProtocolServiceBlockingStub blockingStub; - private final RaftServerProtocolServiceStub asyncStub; - - public RaftServerProtocolClient(RaftPeer target, int flowControlWindow, - TimeDuration requestTimeoutDuration) { - channel = NettyChannelBuilder.forTarget(target.getAddress()) - .usePlaintext(true).flowControlWindow(flowControlWindow) - .build(); - blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); - asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); - this.requestTimeoutDuration = requestTimeoutDuration; - } - - @Override - public void close() { - channel.shutdownNow(); - } - - public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) { - // the StatusRuntimeException will be handled by the caller - RequestVoteReplyProto r = - blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .requestVote(request); - return r; - } - - StreamObserver<AppendEntriesRequestProto> appendEntries( - StreamObserver<AppendEntriesReplyProto> responseHandler) { - return asyncStub.appendEntries(responseHandler); - } - - StreamObserver<InstallSnapshotRequestProto> installSnapshot( - StreamObserver<InstallSnapshotReplyProto> responseHandler) { - return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) - .installSnapshot(responseHandler); - } -}
