http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java deleted file mode 100644 index a8a39bb..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java +++ /dev/null @@ -1,415 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.server; - -import com.google.common.base.Preconditions; -import org.apache.raft.grpc.RaftGRpcService; -import org.apache.raft.grpc.RaftGrpcConfigKeys; -import org.apache.raft.server.impl.FollowerInfo; -import org.apache.raft.server.impl.LeaderState; -import org.apache.raft.server.impl.LogAppender; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.shaded.io.grpc.Status; -import org.apache.raft.shaded.io.grpc.stub.StreamObserver; -import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotReplyProto; -import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.raft.statemachine.SnapshotInfo; -import org.apache.raft.util.CodeInjectionForTesting; - -import java.util.LinkedList; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.raft.grpc.RaftGRpcService.GRPC_SEND_SERVER_REQUEST; - -/** - * A new log appender implementation using grpc bi-directional stream API. - */ -public class GRpcLogAppender extends LogAppender { - private final RaftServerProtocolClient client; - private final Queue<AppendEntriesRequestProto> pendingRequests; - private final int maxPendingRequestsNum; - private volatile boolean firstResponseReceived = false; - - private final AppendLogResponseHandler appendResponseHandler; - private final InstallSnapshotResponseHandler snapshotResponseHandler; - - private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; - private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver; - - public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState, - FollowerInfo f) { - super(server, leaderState, f); - - RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc(); - client = rpcService.getRpcClient(f.getPeer()); - maxPendingRequestsNum = server.getProperties().getInt( - RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY, - RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT); - pendingRequests = new ConcurrentLinkedQueue<>(); - - appendResponseHandler = new AppendLogResponseHandler(); - snapshotResponseHandler = new InstallSnapshotResponseHandler(); - } - - @Override - public void run() { - while (isAppenderRunning()) { - if (shouldSendRequest()) { - SnapshotInfo snapshot = shouldInstallSnapshot(); - if (snapshot != null) { - installSnapshot(snapshot, snapshotResponseHandler); - } else { - // keep appending log entries or sending heartbeats - appendLog(); - } - } - - if (isAppenderRunning() && !shouldSendRequest()) { - // use lastSend time instead of lastResponse time - final long waitTime = getHeartbeatRemainingTime( - follower.getLastRpcTime()); - if (waitTime > 0) { - synchronized (this) { - try { - LOG.debug("{} decides to wait {}ms before appending to {}", - server.getId(), waitTime, follower.getPeer()); - wait(waitTime); - } catch (InterruptedException ignored) { - } - } - } - } - } - appendLogRequestObserver.onCompleted(); - } - - private boolean shouldWait() { - return pendingRequests.size() >= maxPendingRequestsNum || - shouldWaitForFirstResponse(); - } - - private void appendLog() { - if (appendLogRequestObserver == null) { - appendLogRequestObserver = client.appendEntries(appendResponseHandler); - } - AppendEntriesRequestProto pending = null; - final StreamObserver<AppendEntriesRequestProto> s; - synchronized (this) { - // if the queue's size >= maxSize, wait - while (isAppenderRunning() && shouldWait()) { - try { - LOG.debug("{} wait to send the next AppendEntries to {}", - server.getId(), follower.getPeer()); - this.wait(); - } catch (InterruptedException ignored) { - } - } - - if (isAppenderRunning()) { - // prepare and enqueue the append request. note changes on follower's - // nextIndex and ops on pendingRequests should always be associated - // together and protected by the lock - pending = createRequest(); - if (pending != null) { - Preconditions.checkState(pendingRequests.offer(pending)); - updateNextIndex(pending); - } - } - s = appendLogRequestObserver; - } - - if (pending != null && isAppenderRunning()) { - sendRequest(pending, s); - } - } - - private void sendRequest(AppendEntriesRequestProto request, - StreamObserver<AppendEntriesRequestProto> s) { - CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, server.getId(), - null, request); - - s.onNext(request); - follower.updateLastRpcSendTime(); - } - - private void updateNextIndex(AppendEntriesRequestProto request) { - final int count = request.getEntriesCount(); - if (count > 0) { - follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1); - } - } - - /** - * if this is the first append, wait for the response of the first append so - * that we can get the correct next index. - */ - private boolean shouldWaitForFirstResponse() { - return pendingRequests.size() > 0 && !firstResponseReceived; - } - - /** - * StreamObserver for handling responses from the follower - */ - private class AppendLogResponseHandler - implements StreamObserver<AppendEntriesReplyProto> { - /** - * After receiving a appendEntries reply, do the following: - * 1. If the reply is success, update the follower's match index and submit - * an event to leaderState - * 2. If the reply is NOT_LEADER, step down - * 3. If the reply is INCONSISTENCY, decrease the follower's next index - * based on the response - */ - @Override - public void onNext(AppendEntriesReplyProto reply) { - LOG.debug("{} received {} response from {}", server.getId(), - (!firstResponseReceived ? "the first" : "a"), - follower.getPeer()); - - // update the last rpc time - follower.updateLastRpcResponseTime(); - - if (!firstResponseReceived) { - firstResponseReceived = true; - } - switch (reply.getResult()) { - case SUCCESS: - onSuccess(reply); - break; - case NOT_LEADER: - onNotLeader(reply); - break; - case INCONSISTENCY: - onInconsistency(reply); - break; - default: - break; - } - notifyAppend(); - } - - /** - * for now we simply retry the first pending request - */ - @Override - public void onError(Throwable t) { - if (!isAppenderRunning()) { - LOG.info("{} is stopped", GRpcLogAppender.this); - return; - } - LOG.warn("{} got error when appending entries to {}, exception: {}.", - server.getId(), follower.getPeer().getId(), t); - - synchronized (this) { - final Status cause = Status.fromThrowable(t); - if (cause != null && cause.getCode() == Status.Code.INTERNAL) { - // TODO check other Status. Add sleep to avoid tight loop - LOG.debug("{} restarts Append call to {} due to error {}", - server.getId(), follower.getPeer(), t); - // recreate the StreamObserver - appendLogRequestObserver = client.appendEntries(appendResponseHandler); - // reset firstResponseReceived to false - firstResponseReceived = false; - } - - // clear the pending requests queue and reset the next index of follower - AppendEntriesRequestProto request = pendingRequests.peek(); - if (request != null) { - final long nextIndex = request.hasPreviousLog() ? - request.getPreviousLog().getIndex() + 1 : raftLog.getStartIndex(); - clearPendingRequests(nextIndex); - } - } - } - - @Override - public void onCompleted() { - LOG.info("{} stops appending log entries to follower {}", server.getId(), - follower); - } - } - - private void clearPendingRequests(long newNextIndex) { - pendingRequests.clear(); - follower.decreaseNextIndex(newNextIndex); - } - - private void onSuccess(AppendEntriesReplyProto reply) { - AppendEntriesRequestProto request = pendingRequests.poll(); - final long replyNextIndex = reply.getNextIndex(); - Preconditions.checkNotNull(request, - "Got reply with next index %s but the pending queue is empty", - replyNextIndex); - - if (request.getEntriesCount() == 0) { - Preconditions.checkState(!request.hasPreviousLog() || - replyNextIndex - 1 == request.getPreviousLog().getIndex(), - "reply's next index is %s, request's previous is %s", - replyNextIndex, request.getPreviousLog()); - } else { - // check if the reply and the pending request is consistent - final long lastEntryIndex = request - .getEntries(request.getEntriesCount() - 1).getIndex(); - Preconditions.checkState(replyNextIndex == lastEntryIndex + 1, - "reply's next index is %s, request's last entry index is %s", - replyNextIndex, lastEntryIndex); - follower.updateMatchIndex(lastEntryIndex); - submitEventOnSuccessAppend(); - } - } - - private void onNotLeader(AppendEntriesReplyProto reply) { - checkResponseTerm(reply.getTerm()); - // the running loop will end and the connection will onComplete - } - - private synchronized void onInconsistency(AppendEntriesReplyProto reply) { - AppendEntriesRequestProto request = pendingRequests.peek(); - Preconditions.checkState(request.hasPreviousLog()); - if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) { - clearPendingRequests(reply.getNextIndex()); - } - } - - private class InstallSnapshotResponseHandler - implements StreamObserver<InstallSnapshotReplyProto> { - private final Queue<Integer> pending; - private final AtomicBoolean done = new AtomicBoolean(false); - - InstallSnapshotResponseHandler() { - pending = new LinkedList<>(); - } - - synchronized void addPending(InstallSnapshotRequestProto request) { - pending.offer(request.getRequestIndex()); - } - - synchronized void removePending(InstallSnapshotReplyProto reply) { - int index = pending.poll(); - Preconditions.checkState(index == reply.getRequestIndex()); - } - - boolean isDone() { - return done.get(); - } - - void close() { - done.set(true); - GRpcLogAppender.this.notifyAppend(); - } - - synchronized boolean hasAllResponse() { - return pending.isEmpty(); - } - - @Override - public void onNext(InstallSnapshotReplyProto reply) { - LOG.debug("{} received {} response from {}", server.getId(), - (!firstResponseReceived ? "the first" : "a"), - follower.getPeer()); - - // update the last rpc time - follower.updateLastRpcResponseTime(); - - if (!firstResponseReceived) { - firstResponseReceived = true; - } - - switch (reply.getResult()) { - case SUCCESS: - removePending(reply); - break; - case NOT_LEADER: - checkResponseTerm(reply.getTerm()); - break; - case UNRECOGNIZED: - break; - } - } - - @Override - public void onError(Throwable t) { - if (!isAppenderRunning()) { - LOG.info("{} is stopped", GRpcLogAppender.this); - return; - } - LOG.info("{} got error when installing snapshot to {}, exception: {}", - server.getId(), follower.getPeer(), t); - close(); - } - - @Override - public void onCompleted() { - LOG.info("{} stops sending snapshots to follower {}", server.getId(), - follower); - close(); - } - } - - private void installSnapshot(SnapshotInfo snapshot, - InstallSnapshotResponseHandler responseHandler) { - LOG.info("{}: follower {}'s next index is {}," + - " log's start index is {}, need to install snapshot", - server.getId(), follower.getPeer(), follower.getNextIndex(), - raftLog.getStartIndex()); - - snapshotRequestObserver = client.installSnapshot(snapshotResponseHandler); - final String requestId = UUID.randomUUID().toString(); - try { - for (InstallSnapshotRequestProto request : - new SnapshotRequestIter(snapshot, requestId)) { - if (isAppenderRunning()) { - snapshotRequestObserver.onNext(request); - follower.updateLastRpcSendTime(); - responseHandler.addPending(request); - } else { - break; - } - } - snapshotRequestObserver.onCompleted(); - } catch (Exception e) { - LOG.warn("{} failed to install snapshot {}. Exception: {}", this, - snapshot.getFiles(), e); - snapshotRequestObserver.onError(e); - return; - } finally { - snapshotRequestObserver = null; - } - - synchronized (this) { - while (isAppenderRunning() && !responseHandler.isDone()) { - try { - wait(); - } catch (InterruptedException ignored) { - } - } - } - - if (responseHandler.hasAllResponse()) { - follower.updateMatchIndex(snapshot.getTermIndex().getIndex()); - follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1); - LOG.info("{}: install snapshot-{} successfully on follower {}", - server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer()); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java deleted file mode 100644 index cc2e513..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.server; - -import org.apache.raft.server.impl.FollowerInfo; -import org.apache.raft.server.impl.LeaderState; -import org.apache.raft.server.impl.LogAppender; -import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServerImpl; - -public class PipelinedLogAppenderFactory implements LogAppenderFactory { - @Override - public LogAppender getLogAppender(RaftServerImpl server, LeaderState state, - FollowerInfo f) { - return new GRpcLogAppender(server, state, f); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java deleted file mode 100644 index 437e1f4..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.server; - -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.shaded.io.grpc.ManagedChannel; -import org.apache.raft.shaded.io.grpc.ManagedChannelBuilder; -import org.apache.raft.shaded.io.grpc.stub.StreamObserver; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc; -import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub; -import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub; - -/** - * This is a RaftClient implementation that supports streaming data to the raft - * ring. The stream implementation utilizes gRPC. - */ -public class RaftServerProtocolClient { - private final ManagedChannel channel; - private final RaftServerProtocolServiceBlockingStub blockingStub; - private final RaftServerProtocolServiceStub asyncStub; - - public RaftServerProtocolClient(RaftPeer target) { - channel = ManagedChannelBuilder.forTarget(target.getAddress()) - .usePlaintext(true).build(); - blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); - asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); - } - - public void shutdown() { - channel.shutdownNow(); - } - - public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) { - // the StatusRuntimeException will be handled by the caller - return blockingStub.requestVote(request); - } - - StreamObserver<AppendEntriesRequestProto> appendEntries( - StreamObserver<AppendEntriesReplyProto> responseHandler) { - return asyncStub.appendEntries(responseHandler); - } - - StreamObserver<InstallSnapshotRequestProto> installSnapshot( - StreamObserver<InstallSnapshotReplyProto> responseHandler) { - return asyncStub.installSnapshot(responseHandler); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java deleted file mode 100644 index 53dbb6a..0000000 --- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc.server; - -import org.apache.raft.grpc.RaftGrpcUtil; -import org.apache.raft.server.protocol.RaftServerProtocol; -import org.apache.raft.shaded.io.grpc.stub.StreamObserver; -import org.apache.raft.shaded.proto.RaftProtos.*; -import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase { - public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class); - - private final String id; - private final RaftServerProtocol server; - - public RaftServerProtocolService(String id, RaftServerProtocol server) { - this.id = id; - this.server = server; - } - - @Override - public void requestVote(RequestVoteRequestProto request, - StreamObserver<RequestVoteReplyProto> responseObserver) { - try { - final RequestVoteReplyProto reply = server.requestVote(request); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Throwable e) { - LOG.info("{} got exception when handling requestVote {}: {}", - id, request.getServerRequest(), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - @Override - public StreamObserver<AppendEntriesRequestProto> appendEntries( - StreamObserver<AppendEntriesReplyProto> responseObserver) { - return new StreamObserver<AppendEntriesRequestProto>() { - @Override - public void onNext(AppendEntriesRequestProto request) { - try { - final AppendEntriesReplyProto reply = server.appendEntries(request); - responseObserver.onNext(reply); - } catch (Throwable e) { - LOG.info("{} got exception when handling appendEntries {}: {}", - id, request.getServerRequest(), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - @Override - public void onError(Throwable t) { - // for now we just log a msg - LOG.info("{}: appendEntries on error. Exception: {}", id, t); - } - - @Override - public void onCompleted() { - LOG.info("{}: appendEntries completed", id); - responseObserver.onCompleted(); - } - }; - } - - @Override - public StreamObserver<InstallSnapshotRequestProto> installSnapshot( - StreamObserver<InstallSnapshotReplyProto> responseObserver) { - return new StreamObserver<InstallSnapshotRequestProto>() { - @Override - public void onNext(InstallSnapshotRequestProto request) { - try { - final InstallSnapshotReplyProto reply = server.installSnapshot(request); - responseObserver.onNext(reply); - } catch (Throwable e) { - LOG.info("{} got exception when handling installSnapshot {}: {}", - id, request.getServerRequest(), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - @Override - public void onError(Throwable t) { - LOG.info("{}: installSnapshot on error. Exception: {}", id, t); - } - - @Override - public void onCompleted() { - LOG.info("{}: installSnapshot completed", id); - responseObserver.onCompleted(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java deleted file mode 100644 index 359dabd..0000000 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import com.google.common.base.Preconditions; -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.grpc.client.RaftClientSenderWithGrpc; -import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.protocol.RaftPeer; -import org.apache.raft.server.impl.BlockRequestHandlingInjection; -import org.apache.raft.server.impl.DelayLocalExecutionInjection; -import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.util.NetUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; - -public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { - public static final Factory<MiniRaftClusterWithGRpc> FACTORY - = new Factory<MiniRaftClusterWithGRpc>() { - @Override - public MiniRaftClusterWithGRpc newCluster( - String[] ids, RaftProperties prop, boolean formatted) throws IOException { - return new MiniRaftClusterWithGRpc(ids, prop, formatted); - } - }; - - public static final DelayLocalExecutionInjection sendServerRequestInjection = - new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST); - - public MiniRaftClusterWithGRpc(int numServers, RaftProperties properties) - throws IOException { - this(generateIds(numServers, 0), properties, true); - } - - public MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties, - boolean formatted) throws IOException { - super(ids, getPropForGrpc(properties), formatted); - init(initRpcServices(getServers(), properties)); - } - - private static RaftProperties getPropForGrpc(RaftProperties prop) { - RaftProperties newProp = new RaftProperties(prop); - newProp.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - return newProp; - } - - private static Map<RaftPeer, RaftGRpcService> initRpcServices( - Collection<RaftServerImpl> servers, RaftProperties prop) throws IOException { - final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>(); - - for (RaftServerImpl s : servers) { - final RaftGRpcService rpc = new RaftGRpcService(s, prop); - peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc); - } - return peerRpcs; - } - - @Override - public RaftClientRequestSender getRaftClientRequestSender() { - return new RaftClientSenderWithGrpc(getPeers()); - } - - @Override - protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers, - Collection<RaftServerImpl> newServers, boolean startService) - throws IOException { - final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, properties); - for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) { - RaftServerImpl server = servers.get(entry.getKey().getId()); - server.setServerRpc(entry.getValue()); - if (!startService) { - BlockRequestHandlingInjection.getInstance().blockReplier(server.getId()); - } else { - server.start(); - } - } - return new ArrayList<>(peers.keySet()); - } - - @Override - protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException { - RaftServerImpl server = servers.get(peer.getId()); - int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort(); - int oldPort = properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, - RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT); - properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port); - final RaftGRpcService rpc = new RaftGRpcService(server, properties); - Preconditions.checkState( - rpc.getInetSocketAddress().toString().contains(peer.getAddress()), - "address in the raft conf: %s, address in rpc server: %s", - peer.getAddress(), rpc.getInetSocketAddress().toString()); - server.setServerRpc(rpc); - properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, oldPort); - return server; - } - - @Override - public void startServer(String id) { - super.startServer(id); - BlockRequestHandlingInjection.getInstance().unblockReplier(id); - } - - @Override - protected void blockQueueAndSetDelay(String leaderId, int delayMs) - throws InterruptedException { - RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection, - leaderId, delayMs, getMaxTimeout()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java deleted file mode 100644 index a8357c9..0000000 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.RaftNotLeaderExceptionBaseTest; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.server.impl.LogAppenderFactory; - -import java.io.IOException; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; - -public class TestNotLeaderExceptionWithGrpc extends RaftNotLeaderExceptionBaseTest { - @Override - public MiniRaftCluster initCluster() throws IOException { - String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0); - RaftProperties prop = new RaftProperties(); - prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - return new MiniRaftClusterWithGRpc(s, prop, true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java deleted file mode 100644 index 83e6c62..0000000 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import org.apache.log4j.Level; -import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.grpc.server.RaftServerProtocolService; -import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftReconfigurationBaseTest; -import org.apache.raft.util.RaftUtils; -import org.junit.BeforeClass; - -import java.io.IOException; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; - -public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest { - static { - RaftUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG); - } - - @BeforeClass - public static void setProp() { - prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - } - - @Override - public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException { - return new MiniRaftClusterWithGRpc(peerNum, prop); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java deleted file mode 100644 index 74b2c63..0000000 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import org.apache.raft.MiniRaftCluster; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.statemachine.RaftSnapshotBaseTest; - -import java.io.IOException; - -public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest { - @Override - public MiniRaftCluster initCluster(int numServer, RaftProperties prop) - throws IOException { - return MiniRaftClusterWithGRpc.FACTORY.newCluster(numServer, prop, true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java deleted file mode 100644 index 82a4e13..0000000 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java +++ /dev/null @@ -1,319 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import org.apache.log4j.Level; -import org.apache.raft.RaftTestUtil; -import org.apache.raft.conf.RaftProperties; -import org.apache.raft.grpc.client.AppendStreamer; -import org.apache.raft.grpc.client.RaftOutputStream; -import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.server.storage.RaftLog; -import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.raft.util.RaftUtils; -import org.junit.After; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; - -import static org.apache.raft.RaftTestUtil.waitForLeader; -import static org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY; -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; -import static org.junit.Assert.fail; - -public class TestRaftStream { - static { - RaftUtils.setLogLevel(AppendStreamer.LOG, Level.ALL); - } - static final Logger LOG = LoggerFactory.getLogger(TestRaftStream.class); - - private static final RaftProperties prop = new RaftProperties(); - private static final int NUM_SERVERS = 3; - - private MiniRaftClusterWithGRpc cluster; - - - @BeforeClass - public static void setProp() { - prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - } - - @After - public void tearDown() { - if (cluster != null) { - cluster.shutdown(); - } - } - - private byte[] genContent(int count) { - return toBytes(count); - } - - private byte[] toBytes(int i) { - byte[] b = new byte[4]; - b[0] = (byte) ((i >>> 24) & 0xFF); - b[1] = (byte) ((i >>> 16) & 0xFF); - b[2] = (byte) ((i >>> 8) & 0xFF); - b[3] = (byte) (i & 0xFF); - return b; - } - - @Test - public void testSimpleWrite() throws Exception { - LOG.info("Running testSimpleWrite"); - - // default 64K is too large for a test - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4); - cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); - - cluster.start(); - RaftServerImpl leader = waitForLeader(cluster); - - int count = 1; - try (RaftOutputStream out = new RaftOutputStream(prop, "writer-1", - cluster.getPeers(), leader.getId())) { - for (int i = 0; i < 500; i++) { // generate 500 requests - out.write(genContent(count++)); - } - } - - // check the leader's raft log - final RaftLog raftLog = leader.getState().getLog(); - final AtomicInteger currentNum = new AtomicInteger(1); - checkLog(raftLog, 500, () -> { - int value = currentNum.getAndIncrement(); - return toBytes(value); - }); - } - - private void checkLog(RaftLog raftLog, long expectedCommittedIndex, - Supplier<byte[]> s) { - long committedIndex = raftLog.getLastCommittedIndex(); - Assert.assertEquals(expectedCommittedIndex, committedIndex); - // check the log content - LogEntryProto[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1); - for (LogEntryProto entry : entries) { - byte[] logData = entry.getSmLogEntry().getData().toByteArray(); - byte[] expected = s.get(); - Assert.assertEquals("log entry: " + entry, - expected.length, logData.length); - Assert.assertArrayEquals(expected, logData); - } - } - - @Test - public void testWriteAndFlush() throws Exception { - LOG.info("Running testWriteAndFlush"); - - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE); - cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); - cluster.start(); - - RaftServerImpl leader = waitForLeader(cluster); - RaftOutputStream out = new RaftOutputStream(prop, "writer", - cluster.getPeers(), leader.getId()); - - int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072}; - ByteValue[] values = new ByteValue[lengths.length]; - for (int i = 0; i < values.length; i++) { - values[i] = new ByteValue(lengths[i], (byte) 9); - } - - List<byte[]> expectedTxs = new ArrayList<>(); - for (ByteValue v : values) { - byte[] data = v.genData(); - expectedTxs.addAll(v.getTransactions()); - out.write(data); - out.flush(); - - // make sure after the flush the data has been committed - Assert.assertEquals(expectedTxs.size(), - leader.getState().getLastAppliedIndex()); - } - out.close(); - - try { - out.write(0); - fail("The OutputStream has been closed"); - } catch (IOException ignored) { - } - - LOG.info("Start to check leader's log"); - final AtomicInteger index = new AtomicInteger(0); - checkLog(leader.getState().getLog(), expectedTxs.size(), - () -> expectedTxs.get(index.getAndIncrement())); - } - - private static class ByteValue { - final static int BUFFERSIZE = 1024; - - final int length; - final byte value; - final int numTx; - byte[] data; - - ByteValue(int length, byte value) { - this.length = length; - this.value = value; - numTx = (length - 1) / BUFFERSIZE + 1; - } - - byte[] genData() { - data = new byte[length]; - Arrays.fill(data, value); - return data; - } - - Collection<byte[]> getTransactions() { - if (data.length <= BUFFERSIZE) { - return Collections.singletonList(data); - } else { - List<byte[]> list = new ArrayList<>(); - for (int i = 0; i < numTx; i++) { - int txSize = Math.min(BUFFERSIZE, length - BUFFERSIZE * i); - byte[] t = new byte[txSize]; - Arrays.fill(t, value); - list.add(t); - } - return list; - } - } - } - - @Test - public void testWriteWithOffset() throws Exception { - LOG.info("Running testWriteWithOffset"); - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE); - - cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); - cluster.start(); - RaftServerImpl leader = waitForLeader(cluster); - - RaftOutputStream out = new RaftOutputStream(prop, "writer", - cluster.getPeers(), leader.getId()); - - byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2]; - Arrays.fill(b1, (byte) 1); - byte[] b2 = new byte[ByteValue.BUFFERSIZE]; - Arrays.fill(b2, (byte) 2); - byte[] b3 = new byte[ByteValue.BUFFERSIZE * 2 + ByteValue.BUFFERSIZE / 2]; - Arrays.fill(b3, (byte) 3); - byte[] b4 = new byte[ByteValue.BUFFERSIZE * 4]; - Arrays.fill(b3, (byte) 4); - - byte[] expected = new byte[ByteValue.BUFFERSIZE * 8]; - byte[][] data = new byte[][]{b1, b2, b3, b4}; - final Random random = new Random(); - int totalSize = 0; - for (byte[] b : data) { - System.arraycopy(b, 0, expected, totalSize, b.length); - totalSize += b.length; - - int written = 0; - while (written < b.length) { - int toWrite = random.nextInt(b.length - written) + 1; - LOG.info("write {} bytes", toWrite); - out.write(b, written, toWrite); - written += toWrite; - } - } - out.close(); - - final RaftLog log = leader.getState().getLog(); - // 0.5 + 1 + 2.5 + 4 = 8 - Assert.assertEquals(8, leader.getState().getLastAppliedIndex()); - Assert.assertEquals(8, log.getLastCommittedIndex()); - LogEntryProto[] entries = log.getEntries(1, 9); - byte[] actual = new byte[ByteValue.BUFFERSIZE * 8]; - totalSize = 0; - for (LogEntryProto e : entries) { - byte[] eValue = e.getSmLogEntry().getData().toByteArray(); - Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length); - System.arraycopy(eValue, 0, actual, totalSize, eValue.length); - totalSize += eValue.length; - } - Assert.assertArrayEquals(expected, actual); - } - - /** - * Write while leader is killed - */ - @Test - public void testKillLeader() throws Exception { - LOG.info("Running testChangeLeader"); - - prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4); - cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop); - cluster.start(); - final RaftServerImpl leader = waitForLeader(cluster); - - final AtomicBoolean running = new AtomicBoolean(true); - final AtomicBoolean success = new AtomicBoolean(false); - final AtomicInteger result = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(1); - - new Thread(() -> { - LOG.info("Writer thread starts"); - int count = 0; - try (RaftOutputStream out = new RaftOutputStream(prop, "writer", - cluster.getPeers(), leader.getId())) { - while (running.get()) { - out.write(toBytes(count++)); - Thread.sleep(10); - } - success.set(true); - result.set(count); - } catch (Exception e) { - LOG.info("Got exception when writing", e); - success.set(false); - } finally { - latch.countDown(); - } - }).start(); - - // force change the leader - RaftTestUtil.waitAndKillLeader(cluster, true); - final RaftServerImpl newLeader = waitForLeader(cluster); - Assert.assertNotEquals(leader.getId(), newLeader.getId()); - Thread.sleep(500); - - running.set(false); - latch.await(5, TimeUnit.SECONDS); - Assert.assertTrue(success.get()); - // total number of tx should be >= result + 2, where 2 means two NoOp from - // leaders. It may be larger than result+2 because the client may resend - // requests and we do not have retry cache on servers yet. - LOG.info("last applied index: {}. total number of requests: {}", - newLeader.getState().getLastAppliedIndex(), result.get()); - Assert.assertTrue( - newLeader.getState().getLastAppliedIndex() >= result.get() + 1); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java deleted file mode 100644 index c6667b4..0000000 --- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.grpc; - -import org.apache.log4j.Level; -import org.apache.raft.RaftBasicTests; -import org.apache.raft.grpc.server.PipelinedLogAppenderFactory; -import org.apache.raft.server.impl.BlockRequestHandlingInjection; -import org.apache.raft.server.impl.LogAppenderFactory; -import org.apache.raft.server.impl.RaftServerImpl; -import org.apache.raft.util.RaftUtils; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; - -import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY; - -public class TestRaftWithGrpc extends RaftBasicTests { - static { - RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); - } - - private final MiniRaftClusterWithGRpc cluster; - - @BeforeClass - public static void setProp() { - properties.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY, - PipelinedLogAppenderFactory.class, LogAppenderFactory.class); - } - - public TestRaftWithGrpc() throws IOException { - cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, properties); - Assert.assertNull(cluster.getLeader()); - } - - @Override - public MiniRaftClusterWithGRpc getCluster() { - return cluster; - } - - @Override - @Test - public void testEnforceLeader() throws Exception { - super.testEnforceLeader(); - - MiniRaftClusterWithGRpc.sendServerRequestInjection.clear(); - BlockRequestHandlingInjection.getInstance().unblockAll(); - } - - @Override - @Test - public void testWithLoad() throws Exception { - super.testWithLoad(); - BlockRequestHandlingInjection.getInstance().unblockAll(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/raft-grpc/src/test/resources/log4j.properties b/raft-grpc/src/test/resources/log4j.properties deleted file mode 100644 index ced0687..0000000 --- a/raft-grpc/src/test/resources/log4j.properties +++ /dev/null @@ -1,18 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# log4j configuration used during build and unit tests - -log4j.rootLogger=info,stdout -log4j.threshold=ALL -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/pom.xml ---------------------------------------------------------------------- diff --git a/raft-hadoop/pom.xml b/raft-hadoop/pom.xml deleted file mode 100644 index da84fc9..0000000 --- a/raft-hadoop/pom.xml +++ /dev/null @@ -1,99 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. See accompanying LICENSE file. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <artifactId>raft-project-dist</artifactId> - <groupId>com.hortonworks.raft</groupId> - <version>1.0-SNAPSHOT</version> - <relativePath>../raft-project-dist</relativePath> - </parent> - - <artifactId>raft-hadoop</artifactId> - <name>Raft Hadoop Support</name> - - <dependencies> - <dependency> - <artifactId>raft-proto-shaded</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - - <dependency> - <artifactId>raft-common</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - <dependency> - <artifactId>raft-common</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <artifactId>raft-client</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - <dependency> - <artifactId>raft-client</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <artifactId>raft-server</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>provided</scope> - </dependency> - <dependency> - <artifactId>raft-server</artifactId> - <groupId>com.hortonworks.raft</groupId> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java b/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java deleted file mode 100644 index 824b19c..0000000 --- a/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java +++ /dev/null @@ -1,623 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataOutputOutputStream; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc.Client.ConnectionId; -import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.ProtoUtil; -import org.apache.hadoop.util.Time; -import org.apache.raft.shaded.com.google.protobuf.*; -import org.apache.raft.shaded.com.google.protobuf.Descriptors.MethodDescriptor; -import org.apache.raft.shaded.org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto; -import org.apache.raft.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; -import org.apache.raft.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; - -import javax.net.SocketFactory; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.OutputStream; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Copied from {@link org.apache.hadoop.ipc.ProtobufRpcEngine} - * and replaced the protobuf classes with the shaded classes. - */ -@InterfaceStability.Evolving -public class ProtobufRpcEngineShaded implements RpcEngine { - public static final Log LOG = LogFactory.getLog(ProtobufRpcEngineShaded.class); - - static { // Register the rpcRequest deserializer for WritableRpcEngine - org.apache.hadoop.ipc.Server.registerProtocolEngine( - RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, - new Server.ProtoBufRpcInvoker()); - } - - private static final ClientCache CLIENTS = new ClientCache(); - - public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException { - return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, null); - } - - @Override - public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy - ) throws IOException { - return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null); - } - - @Override - @SuppressWarnings("unchecked") - public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, - SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) throws IOException { - - final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth); - return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance( - protocol.getClassLoader(), new Class[]{protocol}, invoker), false); - } - - @Override - public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy( - ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException { - Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class; - return new ProtocolProxy<ProtocolMetaInfoPB>(protocol, - (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(), - new Class[] { protocol }, new Invoker(protocol, connId, conf, - factory)), false); - } - - private static class Invoker implements RpcInvocationHandler { - private final Map<String, Message> returnTypes = - new ConcurrentHashMap<String, Message>(); - private boolean isClosed = false; - private final Client.ConnectionId remoteId; - private final Client client; - private final long clientProtocolVersion; - private final String protocolName; - private AtomicBoolean fallbackToSimpleAuth; - - private Invoker(Class<?> protocol, InetSocketAddress addr, - UserGroupInformation ticket, Configuration conf, SocketFactory factory, - int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth) throws IOException { - this(protocol, Client.ConnectionId.getConnectionId( - addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf), - conf, factory); - this.fallbackToSimpleAuth = fallbackToSimpleAuth; - } - - /** - * This constructor takes a connectionId, instead of creating a new one. - */ - private Invoker(Class<?> protocol, Client.ConnectionId connId, - Configuration conf, SocketFactory factory) { - this.remoteId = connId; - this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class); - this.protocolName = RPC.getProtocolName(protocol); - this.clientProtocolVersion = RPC - .getProtocolVersion(protocol); - } - - private RequestHeaderProto constructRpcRequestHeader(Method method) { - RequestHeaderProto.Builder builder = RequestHeaderProto - .newBuilder(); - builder.setMethodName(method.getName()); - - - // For protobuf, {@code protocol} used when creating client side proxy is - // the interface extending BlockingInterface, which has the annotations - // such as ProtocolName etc. - // - // Using Method.getDeclaringClass(), as in WritableEngine to get at - // the protocol interface will return BlockingInterface, from where - // the annotation ProtocolName and Version cannot be - // obtained. - // - // Hence we simply use the protocol class used to create the proxy. - // For PB this may limit the use of mixins on client side. - builder.setDeclaringClassProtocolName(protocolName); - builder.setClientProtocolVersion(clientProtocolVersion); - return builder.build(); - } - - /** - * This is the client side invoker of RPC method. It only throws - * ServiceException, since the invocation proxy expects only - * ServiceException to be thrown by the method in case protobuf service. - * - * ServiceException has the following causes: - * <ol> - * <li>Exceptions encountered on the client side in this method are - * set as cause in ServiceException as is.</li> - * <li>Exceptions from the server are wrapped in RemoteException and are - * set as cause in ServiceException</li> - * </ol> - * - * Note that the client calling protobuf RPC methods, must handle - * ServiceException by getting the cause from the ServiceException. If the - * cause is RemoteException, then unwrap it to get the exception thrown by - * the server. - */ - @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws ServiceException { - long startTime = 0; - if (LOG.isDebugEnabled()) { - startTime = Time.now(); - } - - if (args.length != 2) { // RpcController + Message - throw new ServiceException("Too many parameters for request. Method: [" - + method.getName() + "]" + ", Expected: 2, Actual: " - + args.length); - } - if (args[1] == null) { - throw new ServiceException("null param while calling Method: [" - + method.getName() + "]"); - } - - RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method); - - if (LOG.isTraceEnabled()) { - LOG.trace(Thread.currentThread().getId() + ": Call -> " + - remoteId + ": " + method.getName() + - " {" + TextFormat.shortDebugString((Message) args[1]) + "}"); - } - - - Message theRequest = (Message) args[1]; - final RpcResponseWrapper val; - try { - val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId, - fallbackToSimpleAuth); - - } catch (Throwable e) { - if (LOG.isTraceEnabled()) { - LOG.trace(Thread.currentThread().getId() + ": Exception <- " + - remoteId + ": " + method.getName() + - " {" + e + "}"); - } - throw new ServiceException(e); - } - - if (LOG.isDebugEnabled()) { - long callTime = Time.now() - startTime; - LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); - } - - Message prototype = null; - try { - prototype = getReturnProtoType(method); - } catch (Exception e) { - throw new ServiceException(e); - } - Message returnMessage; - try { - returnMessage = prototype.newBuilderForType() - .mergeFrom(val.theResponseRead).build(); - - if (LOG.isTraceEnabled()) { - LOG.trace(Thread.currentThread().getId() + ": Response <- " + - remoteId + ": " + method.getName() + - " {" + TextFormat.shortDebugString(returnMessage) + "}"); - } - - } catch (Throwable e) { - throw new ServiceException(e); - } - return returnMessage; - } - - @Override - public void close() throws IOException { - if (!isClosed) { - isClosed = true; - CLIENTS.stopClient(client); - } - } - - private Message getReturnProtoType(Method method) throws Exception { - if (returnTypes.containsKey(method.getName())) { - return returnTypes.get(method.getName()); - } - - Class<?> returnType = method.getReturnType(); - Method newInstMethod = returnType.getMethod("getDefaultInstance"); - newInstMethod.setAccessible(true); - Message prototype = (Message) newInstMethod.invoke(null, (Object[]) null); - returnTypes.put(method.getName(), prototype); - return prototype; - } - - @Override //RpcInvocationHandler - public ConnectionId getConnectionId() { - return remoteId; - } - } - - interface RpcWrapper extends Writable { - int getLength(); - } - /** - * Wrapper for Protocol Buffer Requests - * - * Note while this wrapper is writable, the request on the wire is in - * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} - * use type Writable as a wrapper to work across multiple RpcEngine kinds. - */ - private static abstract class RpcMessageWithHeader<T extends GeneratedMessage> - implements RpcWrapper { - T requestHeader; - Message theRequest; // for clientSide, the request is here - byte[] theRequestRead; // for server side, the request is here - - public RpcMessageWithHeader() { - } - - public RpcMessageWithHeader(T requestHeader, Message theRequest) { - this.requestHeader = requestHeader; - this.theRequest = theRequest; - } - - @Override - public void write(DataOutput out) throws IOException { - OutputStream os = DataOutputOutputStream.constructOutputStream(out); - - ((Message)requestHeader).writeDelimitedTo(os); - theRequest.writeDelimitedTo(os); - } - - @Override - public void readFields(DataInput in) throws IOException { - requestHeader = parseHeaderFrom(readVarintBytes(in)); - theRequestRead = readMessageRequest(in); - } - - abstract T parseHeaderFrom(byte[] bytes) throws IOException; - - byte[] readMessageRequest(DataInput in) throws IOException { - return readVarintBytes(in); - } - - private static byte[] readVarintBytes(DataInput in) throws IOException { - final int length = ProtoUtil.readRawVarint32(in); - final byte[] bytes = new byte[length]; - in.readFully(bytes); - return bytes; - } - - public T getMessageHeader() { - return requestHeader; - } - - public byte[] getMessageBytes() { - return theRequestRead; - } - - @Override - public int getLength() { - int headerLen = requestHeader.getSerializedSize(); - int reqLen; - if (theRequest != null) { - reqLen = theRequest.getSerializedSize(); - } else if (theRequestRead != null ) { - reqLen = theRequestRead.length; - } else { - throw new IllegalArgumentException( - "getLength on uninitialized RpcWrapper"); - } - return CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen - + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen; - } - } - - private static class RpcRequestWrapper - extends RpcMessageWithHeader<RequestHeaderProto> { - @SuppressWarnings("unused") - public RpcRequestWrapper() {} - - public RpcRequestWrapper( - RequestHeaderProto requestHeader, Message theRequest) { - super(requestHeader, theRequest); - } - - @Override - RequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException { - return RequestHeaderProto.parseFrom(bytes); - } - - @Override - public String toString() { - return requestHeader.getDeclaringClassProtocolName() + "." + - requestHeader.getMethodName(); - } - } - - @InterfaceAudience.LimitedPrivate({"RPC"}) - public static class RpcRequestMessageWrapper - extends RpcMessageWithHeader<RpcRequestHeaderProto> { - public RpcRequestMessageWrapper() {} - - public RpcRequestMessageWrapper( - RpcRequestHeaderProto requestHeader, Message theRequest) { - super(requestHeader, theRequest); - } - - @Override - RpcRequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException { - return RpcRequestHeaderProto.parseFrom(bytes); - } - } - - @InterfaceAudience.LimitedPrivate({"RPC"}) - public static class RpcResponseMessageWrapper - extends RpcMessageWithHeader<RpcResponseHeaderProto> { - public RpcResponseMessageWrapper() {} - - public RpcResponseMessageWrapper( - RpcResponseHeaderProto responseHeader, Message theRequest) { - super(responseHeader, theRequest); - } - - @Override - byte[] readMessageRequest(DataInput in) throws IOException { - // error message contain no message body - switch (requestHeader.getStatus()) { - case ERROR: - case FATAL: - return null; - default: - return super.readMessageRequest(in); - } - } - - @Override - RpcResponseHeaderProto parseHeaderFrom(byte[] bytes) throws IOException { - return RpcResponseHeaderProto.parseFrom(bytes); - } - } - - /** - * Wrapper for Protocol Buffer Responses - * - * Note while this wrapper is writable, the request on the wire is in - * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} - * use type Writable as a wrapper to work across multiple RpcEngine kinds. - */ - @InterfaceAudience.LimitedPrivate({"RPC"}) // temporarily exposed - public static class RpcResponseWrapper implements RpcWrapper { - Message theResponse; // for senderSide, the response is here - byte[] theResponseRead; // for receiver side, the response is here - - public RpcResponseWrapper() { - } - - public RpcResponseWrapper(Message message) { - this.theResponse = message; - } - - @Override - public void write(DataOutput out) throws IOException { - OutputStream os = DataOutputOutputStream.constructOutputStream(out); - theResponse.writeDelimitedTo(os); - } - - @Override - public void readFields(DataInput in) throws IOException { - int length = ProtoUtil.readRawVarint32(in); - theResponseRead = new byte[length]; - in.readFully(theResponseRead); - } - - @Override - public int getLength() { - int resLen; - if (theResponse != null) { - resLen = theResponse.getSerializedSize(); - } else if (theResponseRead != null ) { - resLen = theResponseRead.length; - } else { - throw new IllegalArgumentException( - "getLength on uninitialized RpcWrapper"); - } - return CodedOutputStream.computeRawVarint32Size(resLen) + resLen; - } - } - - @VisibleForTesting - @InterfaceAudience.Private - @InterfaceStability.Unstable - static Client getClient(Configuration conf) { - return CLIENTS.getClient(conf, SocketFactory.getDefault(), - RpcResponseWrapper.class); - } - - - - @Override - public RPC.Server getServer(Class<?> protocol, Object protocolImpl, - String bindAddress, int port, int numHandlers, int numReaders, - int queueSizePerHandler, boolean verbose, Configuration conf, - SecretManager<? extends TokenIdentifier> secretManager, - String portRangeConfig) - throws IOException { - return new Server(protocol, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, - portRangeConfig); - } - - public static class Server extends RPC.Server { - /** - * Construct an RPC server. - * - * @param protocolClass the class of protocol - * @param protocolImpl the protocolImpl whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - * @param portRangeConfig A config parameter that can be used to restrict - * the range of ports used when port is 0 (an ephemeral port) - */ - public Server(Class<?> protocolClass, Object protocolImpl, - Configuration conf, String bindAddress, int port, int numHandlers, - int numReaders, int queueSizePerHandler, boolean verbose, - SecretManager<? extends TokenIdentifier> secretManager, - String portRangeConfig) - throws IOException { - super(bindAddress, port, null, numHandlers, - numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl - .getClass().getName()), secretManager, portRangeConfig); - this.verbose = verbose; - registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, - protocolImpl); - } - - /** - * Protobuf invoker for {@link RpcInvoker} - */ - static class ProtoBufRpcInvoker implements RpcInvoker { - private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, - String protoName, long clientVersion) throws RpcServerException { - ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion); - ProtoClassProtoImpl impl = - server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv); - if (impl == null) { // no match for Protocol AND Version - VerProtocolImpl highest = - server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - protoName); - if (highest == null) { - throw new RpcNoSuchProtocolException( - "Unknown protocol: " + protoName); - } - // protocol supported but not the version that client wants - throw new RPC.VersionMismatch(protoName, clientVersion, - highest.version); - } - return impl; - } - - @Override - /** - * This is a server side method, which is invoked over RPC. On success - * the return response has protobuf response payload. On failure, the - * exception name and the stack trace are return in the resposne. - * See {@link HadoopRpcResponseProto} - * - * In this method there three types of exceptions possible and they are - * returned in response as follows. - * <ol> - * <li> Exceptions encountered in this method that are returned - * as {@link RpcServerException} </li> - * <li> Exceptions thrown by the service is wrapped in ServiceException. - * In that this method returns in response the exception thrown by the - * service.</li> - * <li> Other exceptions thrown by the service. They are returned as - * it is.</li> - * </ol> - */ - public Writable call(RPC.Server server, String protocol, - Writable writableRequest, long receiveTime) throws Exception { - RpcRequestWrapper request = (RpcRequestWrapper) writableRequest; - RequestHeaderProto rpcRequest = request.requestHeader; - String methodName = rpcRequest.getMethodName(); - String protoName = rpcRequest.getDeclaringClassProtocolName(); - long clientVersion = rpcRequest.getClientProtocolVersion(); - if (server.verbose) - LOG.info("Call: protocol=" + protocol + ", method=" + methodName); - - ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName, - clientVersion); - BlockingService service = (BlockingService) protocolImpl.protocolImpl; - MethodDescriptor methodDescriptor = service.getDescriptorForType() - .findMethodByName(methodName); - if (methodDescriptor == null) { - String msg = "Unknown method " + methodName + " called on " + protocol - + " protocol."; - LOG.warn(msg); - throw new RpcNoSuchMethodException(msg); - } - Message prototype = service.getRequestPrototype(methodDescriptor); - Message param = prototype.newBuilderForType() - .mergeFrom(request.theRequestRead).build(); - - Message result; - long startTime = Time.now(); - int qTime = (int) (startTime - receiveTime); - Exception exception = null; - try { - server.rpcDetailedMetrics.init(protocolImpl.protocolClass); - result = service.callBlockingMethod(methodDescriptor, null, param); - } catch (ServiceException e) { - exception = (Exception) e.getCause(); - throw (Exception) e.getCause(); - } catch (Exception e) { - exception = e; - throw e; - } finally { - int processingTime = (int) (Time.now() - startTime); - if (LOG.isDebugEnabled()) { - String msg = "Served: " + methodName + " queueTime= " + qTime + - " procesingTime= " + processingTime; - if (exception != null) { - msg += " exception= " + exception.getClass().getSimpleName(); - } - LOG.debug(msg); - } - String detailedMetricsName = (exception == null) ? - methodName : - exception.getClass().getSimpleName(); - server.rpcMetrics.addRpcQueueTime(qTime); - server.rpcMetrics.addRpcProcessingTime(processingTime); - server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName, - processingTime); - } - return new RpcResponseWrapper(result); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java deleted file mode 100644 index b8b0497..0000000 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.hadooprpc; - -public interface HadoopConstants { - String RAFT_SERVER_KERBEROS_PRINCIPAL_KEY - = "raft.server.kerberos.principal"; - String RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY - = "raft.client.kerberos.principal"; - String RAFT_SERVER_PROTOCOL_NAME - = "org.apache.hadoop.raft.server.protocol.RaftServerProtocol"; - String RAFT_CLIENT_PROTOCOL_NAME - = "org.apache.hadoop.raft.protocol.RaftClientProtocol"; -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java deleted file mode 100644 index 60c8be5..0000000 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.hadooprpc; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.ProtobufRpcEngineShaded; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; - -import java.io.Closeable; -import java.io.IOException; - -public class Proxy<PROTOCOL> implements Closeable { - public static <PROTOCOL> PROTOCOL getProxy( - Class<PROTOCOL> clazz, String addressStr, Configuration conf) - throws IOException { - RPC.setProtocolEngine(conf, clazz, ProtobufRpcEngineShaded.class); - return RPC.getProxy(clazz, RPC.getProtocolVersion(clazz), - org.apache.raft.util.NetUtils.newInetSocketAddress(addressStr), - UserGroupInformation.getCurrentUser(), - conf, NetUtils.getSocketFactory(conf, clazz)); - } - - private final PROTOCOL protocol; - - public Proxy(Class<PROTOCOL> clazz, String addressStr, Configuration conf) - throws IOException { - this.protocol = getProxy(clazz, addressStr, conf); - } - - public PROTOCOL getProtocol() { - return protocol; - } - - @Override - public void close() { - RPC.stopProxy(protocol); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java ---------------------------------------------------------------------- diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java deleted file mode 100644 index 29372ea..0000000 --- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.raft.hadooprpc.client; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.raft.client.RaftClientRequestSender; -import org.apache.raft.protocol.*; -import org.apache.raft.util.PeerProxyMap; - -import java.io.IOException; -import java.util.Collection; - -public class HadoopClientRequestSender implements RaftClientRequestSender { - - private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies; - - public HadoopClientRequestSender( - Collection<RaftPeer> peers, final Configuration conf) { - this.proxies = new PeerProxyMap<>( - p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf)); - proxies.addPeers(peers); - } - - @Override - public RaftClientReply sendRequest(RaftClientRequest request) - throws IOException { - final String serverId = request.getReplierId(); - final RaftClientProtocolClientSideTranslatorPB proxy = - proxies.getProxy(serverId); - try { - if (request instanceof SetConfigurationRequest) { - return proxy.setConfiguration((SetConfigurationRequest) request); - } else { - return proxy.submitClientRequest(request); - } - } catch (RemoteException e) { - throw e.unwrapRemoteException(StateMachineException.class, - ReconfigurationTimeoutException.class, - ReconfigurationInProgressException.class, RaftException.class); - } - } - - @Override - public void addServers(Iterable<RaftPeer> servers) { - proxies.addPeers(servers); - } - - @Override - public void close() { - proxies.close(); - } -}