This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a475808  RATIS-1129. Move out the RPC related APIs from 
DataStreamOutput. (#254)
a475808 is described below

commit a475808f41b5dcabde85341d3c065591fbde7d94
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Nov 5 07:48:04 2020 +0800

    RATIS-1129. Move out the RPC related APIs from DataStreamOutput. (#254)
---
 ...aStreamOutput.java => DataStreamOutputRpc.java} | 18 +++++-------
 .../apache/ratis/client/api/DataStreamOutput.java  |  9 ------
 .../apache/ratis/client/api/MessageStreamApi.java  |  6 ++--
 .../ratis/client/impl/DataStreamClientImpl.java    |  8 +++---
 .../ratis/netty/server/NettyServerStreamRpc.java   | 32 ++++++++++++----------
 .../ratis/datastream/DataStreamBaseTest.java       | 10 ++-----
 6 files changed, 33 insertions(+), 50 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
similarity index 77%
copy from 
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
copy to 
ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
index 429a422..19fb46b 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
@@ -15,25 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.client.api;
+package org.apache.ratis.client;
 
-import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.protocol.DataStreamReply;
 
-import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
-/** An asynchronous output stream supporting zero buffer copying. */
-public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
-  /** Send out the data in the buffer asynchronously */
-  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
-
-  /** Create a transaction asynchronously once the stream data is replicated 
to all servers */
-  CompletableFuture<DataStreamReply> startTransactionAsync();
-
+/** An RPC interface which extends the user interface {@link 
DataStreamOutput}. */
+public interface DataStreamOutputRpc extends DataStreamOutput {
   /** Get the future of the header request. */
   CompletableFuture<DataStreamReply> getHeaderFuture();
 
   /** Peer close asynchronously. */
   CompletableFuture<DataStreamReply> closeForwardAsync();
+
+  /** Create a transaction asynchronously once the stream data is replicated 
to all servers */
+  CompletableFuture<DataStreamReply> startTransactionAsync();
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 429a422..dd86569 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -27,13 +27,4 @@ import java.util.concurrent.CompletableFuture;
 public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
   /** Send out the data in the buffer asynchronously */
   CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
-
-  /** Create a transaction asynchronously once the stream data is replicated 
to all servers */
-  CompletableFuture<DataStreamReply> startTransactionAsync();
-
-  /** Get the future of the header request. */
-  CompletableFuture<DataStreamReply> getHeaderFuture();
-
-  /** Peer close asynchronously. */
-  CompletableFuture<DataStreamReply> closeForwardAsync();
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
index 27cfa7f..9a816c8 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
@@ -31,11 +31,11 @@ import java.util.concurrent.CompletableFuture;
  * the leader creates a raft log entry for the request
  * and then replicates the log entry to all the followers.
  *
- * Note that this API is similar to {@link 
org.apache.ratis.client.RaftClient#sendAsync(Message)}
+ * Note that this API is similar to {@link AsyncApi#send(Message)}
  * except that {@link MessageStreamApi} divides a (large) message into 
multiple (small) sub-messages in the stream
- * but {@link org.apache.ratis.client.RaftClient#sendAsync(Message)} sends the 
entire message in a single RPC.
+ * but {@link AsyncApi#send(Message)} sends the entire message in a single RPC.
  * For sending large messages,
- * {@link MessageStreamApi} is more efficient than {@link 
org.apache.ratis.client.RaftClient#sendAsync(Message)}}.
+ * {@link MessageStreamApi} is more efficient than {@link 
AsyncApi#send(Message)}}.
  *
  * Note also that this API is different from {@link DataStreamApi} in the 
sense that
  * this API streams messages only to the leader
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 7f3546e..2690091 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -21,7 +21,7 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamClientFactory;
 import org.apache.ratis.client.DataStreamClientRpc;
-import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
@@ -61,7 +61,7 @@ public class DataStreamClientImpl implements DataStreamClient 
{
     this.orderedStreamAsync = new OrderedStreamAsync(clientId, 
dataStreamClientRpc, properties);
   }
 
-  public class DataStreamOutputImpl implements DataStreamOutput {
+  public class DataStreamOutputImpl implements DataStreamOutputRpc {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
 
@@ -122,12 +122,12 @@ public class DataStreamClientImpl implements 
DataStreamClient {
   }
 
   @Override
-  public DataStreamOutput stream() {
+  public DataStreamOutputRpc stream() {
     return stream(groupId);
   }
 
   @Override
-  public DataStreamOutput stream(RaftGroupId gid) {
+  public DataStreamOutputRpc stream(RaftGroupId gid) {
     return new DataStreamOutputImpl(gid);
   }
 
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 3e7cce8..911af6c 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -19,7 +19,7 @@
 package org.apache.ratis.netty.server;
 
 import org.apache.ratis.client.DataStreamClient;
-import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
@@ -95,8 +95,8 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       peers.addAll(newPeers);
     }
 
-    List<DataStreamOutput> getDataStreamOutput(RaftGroupId groupId) throws 
IOException {
-      final List<DataStreamOutput> outs = new ArrayList<>();
+    List<DataStreamOutputRpc> getDataStreamOutput(RaftGroupId groupId) throws 
IOException {
+      final List<DataStreamOutputRpc> outs = new ArrayList<>();
       try {
         getDataStreamOutput(outs, groupId);
       } catch (IOException e) {
@@ -106,10 +106,10 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       return outs;
     }
 
-    private void getDataStreamOutput(List<DataStreamOutput> outs, RaftGroupId 
groupId) throws IOException {
+    private void getDataStreamOutput(List<DataStreamOutputRpc> outs, 
RaftGroupId groupId) throws IOException {
       for (RaftPeer peer : peers) {
         try {
-          outs.add(map.getProxy(peer.getId()).stream(groupId));
+          outs.add((DataStreamOutputRpc) 
map.getProxy(peer.getId()).stream(groupId));
         } catch (IOException e) {
           throw new IOException(map.getName() + ": Failed to 
getDataStreamOutput for " + peer, e);
         }
@@ -124,11 +124,11 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   static class StreamInfo {
     private final RaftClientRequest request;
     private final CompletableFuture<DataStream> stream;
-    private final List<DataStreamOutput> outs;
+    private final List<DataStreamOutputRpc> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
-    StreamInfo(RaftClientRequest request, CompletableFuture<DataStream> 
stream, List<DataStreamOutput> outs) {
+    StreamInfo(RaftClientRequest request, CompletableFuture<DataStream> 
stream, List<DataStreamOutputRpc> outs) {
       this.request = request;
       this.stream = stream;
       this.outs = outs;
@@ -138,7 +138,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       return stream;
     }
 
-    List<DataStreamOutput> getDataStreamOutputs() {
+    List<DataStreamOutputRpc> getDataStreamOutputs() {
       return outs;
     }
 
@@ -319,7 +319,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         } else if (request.getType() == Type.START_TRANSACTION){
           sendReplyNotSuccess(request, ctx);
         } else {
-          LOG.error("{}: Unexpected type:{}", this, request.getType());
+          throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
         }
       }, executorService);
     } catch (IOException e) {
@@ -331,7 +331,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private void forwardStartTransaction(
       final StreamInfo info, final DataStreamRequestByteBuf request, final 
ChannelHandlerContext ctx) {
     final List<CompletableFuture<Boolean>> results = new ArrayList<>();
-    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+    for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
       final CompletableFuture<Boolean> f = 
out.startTransactionAsync().thenApplyAsync(reply -> {
         if (reply.isSuccess()) {
           final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
@@ -364,7 +364,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
-      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
     } else if (request.getType() == Type.STREAM_DATA) {
@@ -372,7 +372,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       final CompletableFuture<?> previous = info.getPrevious().get();
 
       localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> 
writeTo(buf, stream), executorService);
-      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
         remoteWrites.add(previous.thenComposeAsync(v -> 
out.writeAsync(request.slice().nioBuffer()), executorService));
       }
     } else if (request.getType() == Type.STREAM_CLOSE || request.getType() == 
Type.STREAM_CLOSE_FORWARD) {
@@ -389,16 +389,18 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       }, executorService);
 
       if (request.getType() == Type.STREAM_CLOSE) {
-        for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
           remoteWrites.add(previous.thenComposeAsync(v -> 
out.closeForwardAsync(), executorService));
         }
       }
-    } else {
+    } else if (request.getType() == Type.START_TRANSACTION) {
       // peer server start transaction
       info = streams.get(key);
       final CompletableFuture<?> previous = info.getPrevious().get();
       previous.thenApplyAsync(v -> startTransaction(streams.get(key), request, 
ctx), executorService);
       return;
+    } else {
+      throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
     }
 
     final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
@@ -413,7 +415,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
             // TODO(runzhiwang): send start transaction to leader directly
             startTransaction(info, request, ctx);
           } else {
-            LOG.error("{}: Unexpected type:{}", this, request.getType());
+            throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
           }
           return null;
         }, executorService);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index d034350..a7e134f 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -150,17 +150,11 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
-  class Server {
+  static class Server {
     private final RaftPeer peer;
     private final RaftServer raftServer;
     private final DataStreamServerImpl dataStreamServer;
 
-    Server(RaftPeer peer) {
-      this.peer = peer;
-      this.raftServer = newRaftServer(peer, properties);
-      this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
-    }
-
     Server(RaftPeer peer, RaftServer raftServer) {
       this.peer = peer;
       this.raftServer = raftServer;
@@ -423,7 +417,7 @@ abstract class DataStreamBaseTest extends BaseTest {
           RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
       Assert.assertTrue(replyByteBuffer.isSuccess());
       Assert.assertEquals(clientReply.getCallId(), 
expectedClientReply.getCallId());
-      
Assert.assertTrue(clientReply.getClientId().equals(expectedClientReply.getClientId()));
+      Assert.assertEquals(clientReply.getClientId(), 
expectedClientReply.getClientId());
       Assert.assertEquals(clientReply.getLogIndex(), 
expectedClientReply.getLogIndex());
     }
   }

Reply via email to