Repository: incubator-ratis Updated Branches: refs/heads/master 974919e5e -> ed8e60dad
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java deleted file mode 100644 index d047803..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.server; - -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.shaded.proto.RaftProtos.*; -import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; -import org.apache.ratis.util.ProtoUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase { - public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class); - - private final Supplier<RaftPeerId> idSupplier; - private final RaftServer server; - - public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server) { - this.idSupplier = idSupplier; - this.server = server; - } - - RaftPeerId getId() { - return idSupplier.get(); - } - - @Override - public void requestVote(RequestVoteRequestProto request, - StreamObserver<RequestVoteReplyProto> responseObserver) { - try { - final RequestVoteReplyProto reply = server.requestVote(request); - responseObserver.onNext(reply); - responseObserver.onCompleted(); - } catch (Throwable e) { - RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed requestVote " + ProtoUtils.toString(request.getServerRequest()), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - @Override - public StreamObserver<AppendEntriesRequestProto> appendEntries( - StreamObserver<AppendEntriesReplyProto> responseObserver) { - return new StreamObserver<AppendEntriesRequestProto>() { - private final AtomicReference<CompletableFuture<Void>> previousOnNext = - new AtomicReference<>(CompletableFuture.completedFuture(null)); - private final AtomicBoolean isClosed = new AtomicBoolean(false); - - @Override - public void onNext(AppendEntriesRequestProto request) { - final CompletableFuture<Void> current = new CompletableFuture<>(); - final CompletableFuture<Void> previous = previousOnNext.getAndSet(current); - try { - server.appendEntriesAsync(request).thenCombine(previous, - (reply, v) -> { - if (!isClosed.get()) { - responseObserver.onNext(reply); - } - current.complete(null); - return null; - }); - } catch (Throwable e) { - RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed appendEntries " + ProtoUtils.toString(request.getServerRequest()), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e, request.getServerRequest().getCallId())); - current.completeExceptionally(e); - } - } - - @Override - public void onError(Throwable t) { - // for now we just log a msg - RaftGrpcUtil.warn(LOG, () -> getId() + ": appendEntries onError", t); - } - - @Override - public void onCompleted() { - if (isClosed.compareAndSet(false, true)) { - LOG.info("{}: appendEntries completed", getId()); - responseObserver.onCompleted(); - } - } - }; - } - - @Override - public StreamObserver<InstallSnapshotRequestProto> installSnapshot( - StreamObserver<InstallSnapshotReplyProto> responseObserver) { - return new StreamObserver<InstallSnapshotRequestProto>() { - @Override - public void onNext(InstallSnapshotRequestProto request) { - try { - final InstallSnapshotReplyProto reply = server.installSnapshot(request); - responseObserver.onNext(reply); - } catch (Throwable e) { - RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed installSnapshot " + ProtoUtils.toString(request.getServerRequest()), e); - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - @Override - public void onError(Throwable t) { - RaftGrpcUtil.warn(LOG, () -> getId() + ": installSnapshot onError", t); - } - - @Override - public void onCompleted() { - LOG.info("{}: installSnapshot completed", getId()); - responseObserver.onCompleted(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java deleted file mode 100644 index a2c419f..0000000 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.RaftConfigKeys; -import org.apache.ratis.RaftTestUtil; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.*; -import org.apache.ratis.statemachine.StateMachine; - -import java.io.IOException; - -public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase { - public static final Factory<MiniRaftClusterWithGRpc> FACTORY - = new Factory<MiniRaftClusterWithGRpc>() { - @Override - public MiniRaftClusterWithGRpc newCluster( - String[] ids, RaftProperties prop) { - RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC); - return new MiniRaftClusterWithGRpc(ids, prop); - } - }; - - public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGRpc> { - @Override - default Factory<MiniRaftClusterWithGRpc> getFactory() { - return FACTORY; - } - } - - public static final DelayLocalExecutionInjection sendServerRequestInjection = - new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST); - - private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties) { - super(ids, properties, null); - } - - @Override - protected RaftServerProxy newRaftServer( - RaftPeerId id, StateMachine stateMachine, RaftGroup group, - RaftProperties properties) throws IOException { - GrpcConfigKeys.Server.setPort(properties, getPort(id, group)); - return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null); - } - - @Override - protected void blockQueueAndSetDelay(String leaderId, int delayMs) - throws InterruptedException { - RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection, - leaderId, delayMs, getMaxTimeout()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java new file mode 100644 index 0000000..176cfa0 --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.server.GrpcService; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.impl.*; +import org.apache.ratis.statemachine.StateMachine; + +import java.io.IOException; + +public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase { + public static final Factory<MiniRaftClusterWithGrpc> FACTORY + = new Factory<MiniRaftClusterWithGrpc>() { + @Override + public MiniRaftClusterWithGrpc newCluster( + String[] ids, RaftProperties prop) { + RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC); + return new MiniRaftClusterWithGrpc(ids, prop); + } + }; + + public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> { + @Override + default Factory<MiniRaftClusterWithGrpc> getFactory() { + return FACTORY; + } + } + + public static final DelayLocalExecutionInjection sendServerRequestInjection = + new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST); + + private MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) { + super(ids, properties, null); + } + + @Override + protected RaftServerProxy newRaftServer( + RaftPeerId id, StateMachine stateMachine, RaftGroup group, + RaftProperties properties) throws IOException { + GrpcConfigKeys.Server.setPort(properties, getPort(id, group)); + return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null); + } + + @Override + protected void blockQueueAndSetDelay(String leaderId, int delayMs) + throws InterruptedException { + RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection, + leaderId, delayMs, getMaxTimeout()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java index 0b5e2a9..657bfd1 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java @@ -23,6 +23,6 @@ import org.apache.ratis.server.impl.GroupManagementBaseTest; public class TestGroupManagementWithGrpc extends GroupManagementBaseTest { @Override public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() { - return MiniRaftClusterWithGRpc.FACTORY; + return MiniRaftClusterWithGrpc.FACTORY; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java index a62dab0..eb08336 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java @@ -22,15 +22,15 @@ import org.apache.ratis.server.impl.LeaderElectionTests; import org.junit.Test; public class TestLeaderElectionWithGrpc - extends LeaderElectionTests<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { + extends LeaderElectionTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { @Override @Test public void testEnforceLeader() throws Exception { super.testEnforceLeader(); - MiniRaftClusterWithGRpc.sendServerRequestInjection.clear(); + MiniRaftClusterWithGrpc.sendServerRequestInjection.clear(); BlockRequestHandlingInjection.getInstance().unblockAll(); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java index 752a3dd..614787e 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java @@ -19,6 +19,6 @@ package org.apache.ratis.grpc; import org.apache.ratis.RaftAsyncTests; -public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { +public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java index fc110ea..d2b71bc 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java @@ -20,6 +20,6 @@ package org.apache.ratis.grpc; import org.apache.ratis.RaftExceptionBaseTest; public class TestRaftExceptionWithGrpc - extends RaftExceptionBaseTest<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { + extends RaftExceptionBaseTest<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java deleted file mode 100644 index 822b923..0000000 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.log4j.Level; -import org.apache.ratis.grpc.server.RaftServerProtocolService; -import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; -import org.apache.ratis.util.LogUtils; - -import java.io.IOException; - -public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest { - static { - LogUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG); - } - - @Override - public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException { - return MiniRaftClusterWithGRpc.FACTORY.newCluster(peerNum, prop); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java new file mode 100644 index 0000000..29f8bea --- /dev/null +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.log4j.Level; +import org.apache.ratis.grpc.server.GrpcServerProtocolService; +import org.apache.ratis.server.impl.RaftReconfigurationBaseTest; +import org.apache.ratis.util.LogUtils; + +import java.io.IOException; + +public class TestRaftReconfigurationWithGrpc extends RaftReconfigurationBaseTest { + static { + LogUtils.setLogLevel(GrpcServerProtocolService.LOG, Level.DEBUG); + } + + @Override + public MiniRaftClusterWithGrpc getCluster(int peerNum) { + return MiniRaftClusterWithGrpc.FACTORY.newCluster(peerNum, prop); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index 7173e1f..8a9e94b 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -33,8 +33,8 @@ public class TestRaftServerWithGrpc extends BaseTest { @Test public void testServerRestartOnException() throws Exception { RaftProperties properties = new RaftProperties(); - final MiniRaftClusterWithGRpc cluster - = MiniRaftClusterWithGRpc.FACTORY.newCluster(1, properties); + final MiniRaftClusterWithGrpc cluster + = MiniRaftClusterWithGrpc.FACTORY.newCluster(1, properties); cluster.start(); RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId(); GrpcConfigKeys.Server.setPort(properties, cluster.getLeader().getServerRpc().getInetSocketAddress().getPort()); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java index 091277d..a960478 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java @@ -18,14 +18,11 @@ package org.apache.ratis.grpc; import org.apache.ratis.MiniRaftCluster; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.statemachine.RaftSnapshotBaseTest; -import java.io.IOException; - public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest { @Override public MiniRaftCluster.Factory<?> getFactory() { - return MiniRaftClusterWithGRpc.FACTORY; + return MiniRaftClusterWithGrpc.FACTORY; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java index ca36738..c8789a7 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java @@ -20,7 +20,7 @@ package org.apache.ratis.grpc; import org.apache.ratis.server.impl.RaftStateMachineExceptionTests; public class TestRaftStateMachineExceptionWithGrpc - extends RaftStateMachineExceptionTests<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { + extends RaftStateMachineExceptionTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java index 17956c7..f3897ac 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java @@ -21,8 +21,8 @@ import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.client.AppendStreamer; -import org.apache.ratis.grpc.client.RaftOutputStream; +import org.apache.ratis.grpc.client.GrpcClientStreamer; +import org.apache.ratis.grpc.client.GrpcOutputStream; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.protocol.TermIndex; @@ -51,14 +51,14 @@ import static org.junit.Assert.fail; @Ignore public class TestRaftStream extends BaseTest { static { - LogUtils.setLogLevel(AppendStreamer.LOG, Level.ALL); + LogUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL); } private static final RaftProperties prop = new RaftProperties(); private static final int NUM_SERVERS = 3; private static final byte[] BYTES = new byte[4]; - private MiniRaftClusterWithGRpc cluster; + private MiniRaftClusterWithGrpc cluster; @After public void tearDown() { @@ -85,12 +85,12 @@ public class TestRaftStream extends BaseTest { // default 64K is too large for a test GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4)); - cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); + cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); RaftServerImpl leader = waitForLeader(cluster); - try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(), + try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(), cluster.getGroup(), leader.getId())) { for (int i = 0; i < numRequests; i++) { // generate requests out.write(toBytes(i)); @@ -124,11 +124,11 @@ public class TestRaftStream extends BaseTest { LOG.info("Running testWriteAndFlush"); GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE)); - cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); + cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); RaftServerImpl leader = waitForLeader(cluster); - RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(), + GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(), cluster.getGroup(), leader.getId()); int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072}; @@ -203,11 +203,11 @@ public class TestRaftStream extends BaseTest { LOG.info("Running testWriteWithOffset"); GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE)); - cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); + cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); RaftServerImpl leader = waitForLeader(cluster); - RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(), + GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(), cluster.getGroup(), leader.getId()); byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2]; @@ -261,7 +261,7 @@ public class TestRaftStream extends BaseTest { LOG.info("Running testChangeLeader"); GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4)); - cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop); + cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop); cluster.start(); final RaftServerImpl leader = waitForLeader(cluster); @@ -273,7 +273,7 @@ public class TestRaftStream extends BaseTest { new Thread(() -> { LOG.info("Writer thread starts"); int count = 0; - try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(), + try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(), cluster.getGroup(), leader.getId())) { while (running.get()) { out.write(toBytes(count++)); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index 16c0f31..2d0af07 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -36,8 +36,8 @@ import java.util.concurrent.CompletableFuture; import static org.apache.ratis.RaftTestUtil.waitForLeader; public class TestRaftWithGrpc - extends RaftBasicTests<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { + extends RaftBasicTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { { getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, @@ -53,7 +53,7 @@ public class TestRaftWithGrpc @Test public void testRequestTimeout() throws Exception { - try(MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS)) { + try(MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS)) { cluster.start(); testRequestTimeout(false, cluster, LOG); } @@ -62,7 +62,7 @@ public class TestRaftWithGrpc @Test public void testUpdateViaHeartbeat() throws Exception { LOG.info("Running testUpdateViaHeartbeat"); - final MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS); + final MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS); cluster.start(); waitForLeader(cluster); long waitTime = 5000; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java index f577a48..30a3f0d 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java @@ -44,16 +44,16 @@ public class TestRetryCacheWithGrpc extends RetryCacheTests { LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); } - private final MiniRaftClusterWithGRpc cluster; + private final MiniRaftClusterWithGrpc cluster; public TestRetryCacheWithGrpc() throws IOException { - cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster( + cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster( NUM_SERVERS, properties); Assert.assertNull(cluster.getLeader()); } @Override - public MiniRaftClusterWithGRpc getCluster() { + public MiniRaftClusterWithGrpc getCluster() { return cluster; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java index ef978a1..30be724 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java @@ -20,6 +20,6 @@ package org.apache.ratis.grpc; import org.apache.ratis.server.impl.ServerInformationBaseTest; public class TestServerInformationWithGrpc - extends ServerInformationBaseTest<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { + extends ServerInformationBaseTest<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-proto-shaded/src/main/proto/GRpc.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto deleted file mode 100644 index d7e550e..0000000 --- a/ratis-proto-shaded/src/main/proto/GRpc.proto +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -syntax = "proto3"; -option java_package = "org.apache.ratis.shaded.proto.grpc"; -option java_outer_classname = "GRpcProtos"; -option java_generate_equals_and_hash = true; -package ratis.grpc; - -import "Raft.proto"; - -service RaftClientProtocolService { - // A client-to-server RPC to set new raft configuration - rpc setConfiguration(ratis.common.SetConfigurationRequestProto) - returns(ratis.common.RaftClientReplyProto) {} - - // A client-to-server stream RPC to append data - rpc append(stream ratis.common.RaftClientRequestProto) - returns (stream ratis.common.RaftClientReplyProto) {} -} - -service RaftServerProtocolService { - rpc requestVote(ratis.common.RequestVoteRequestProto) - returns(ratis.common.RequestVoteReplyProto) {} - - rpc appendEntries(stream ratis.common.AppendEntriesRequestProto) - returns(stream ratis.common.AppendEntriesReplyProto) {} - - rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto) - returns(ratis.common.InstallSnapshotReplyProto) {} -} - -service AdminProtocolService { - // A client-to-server RPC to add a new group - rpc groupManagement(ratis.common.GroupManagementRequestProto) - returns(ratis.common.RaftClientReplyProto) {} - - rpc serverInformation(ratis.common.ServerInformationRequestProto) - returns(ratis.common.ServerInformationReplyProto) {} -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-proto-shaded/src/main/proto/Grpc.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Grpc.proto b/ratis-proto-shaded/src/main/proto/Grpc.proto new file mode 100644 index 0000000..5c4bbad --- /dev/null +++ b/ratis-proto-shaded/src/main/proto/Grpc.proto @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; +option java_package = "org.apache.ratis.shaded.proto.grpc"; +option java_outer_classname = "GrpcProtos"; +option java_generate_equals_and_hash = true; +package ratis.grpc; + +import "Raft.proto"; + +service RaftClientProtocolService { + // A client-to-server RPC to set new raft configuration + rpc setConfiguration(ratis.common.SetConfigurationRequestProto) + returns(ratis.common.RaftClientReplyProto) {} + + // A client-to-server stream RPC to append data + rpc append(stream ratis.common.RaftClientRequestProto) + returns (stream ratis.common.RaftClientReplyProto) {} +} + +service RaftServerProtocolService { + rpc requestVote(ratis.common.RequestVoteRequestProto) + returns(ratis.common.RequestVoteReplyProto) {} + + rpc appendEntries(stream ratis.common.AppendEntriesRequestProto) + returns(stream ratis.common.AppendEntriesReplyProto) {} + + rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto) + returns(ratis.common.InstallSnapshotReplyProto) {} +} + +service AdminProtocolService { + // A client-to-server RPC to add a new group + rpc groupManagement(ratis.common.GroupManagementRequestProto) + returns(ratis.common.RaftClientReplyProto) {} + + rpc serverInformation(ratis.common.ServerInformationRequestProto) + returns(ratis.common.ServerInformationReplyProto) {} +}
