This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 11689cd  RATIS-1078. Refactor DataStream classes. (#210)
11689cd is described below

commit 11689cd536b05bde323c9c6c6b57a8e82a60b680
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Sep 29 09:07:26 2020 +0800

    RATIS-1078. Refactor DataStream classes. (#210)
    
    Co-authored-by: Tsz Wo Sze <[email protected]>
---
 .../org/apache/ratis/client/impl/OrderedStreamAsync.java   |  8 ++++----
 ...StreamReplyImpl.java => DataStreamReplyByteBuffer.java} |  6 ++----
 ...RequestClient.java => DataStreamRequestByteBuffer.java} |  5 ++---
 .../netty/{decoders => client}/DataStreamReplyDecoder.java | 13 +++++--------
 .../{encoders => client}/DataStreamRequestEncoder.java     | 11 +++++------
 .../apache/ratis/netty/client/NettyClientStreamRpc.java    | 12 +++---------
 .../netty/{encoders => server}/DataStreamReplyEncoder.java | 12 +++++-------
 .../ratis/netty/server/DataStreamRequestByteBuf.java       |  8 ++++----
 .../{decoders => server}/DataStreamRequestDecoder.java     | 14 +++-----------
 .../apache/ratis/netty/server/NettyServerStreamRpc.java    |  9 +++------
 10 files changed, 36 insertions(+), 62 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 34ba99e..093190b 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -22,7 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -47,8 +47,8 @@ public class OrderedStreamAsync {
     private boolean isFirst = false;
     private CompletableFuture<DataStreamReply> replyFuture = new 
CompletableFuture<>();
 
-    public DataStreamRequestClient newDataStreamRequest(){
-      return new DataStreamRequestClient(streamId, messageId, data.slice());
+    public DataStreamRequestByteBuffer newDataStreamRequest(){
+      return new DataStreamRequestByteBuffer(streamId, messageId, 
data.slice());
     }
 
     @Override
@@ -142,7 +142,7 @@ public class OrderedStreamAsync {
     if(slidingWindow.isFirst(request.getSeqNum())){
       request.setFirstRequest();
     }
-    DataStreamRequestClient rpcRequest = request.newDataStreamRequest();
+    DataStreamRequestByteBuffer rpcRequest = request.newDataStreamRequest();
     CompletableFuture<DataStreamReply> requestFuture = 
dataStreamClientRpc.streamAsync(rpcRequest);
     requestFuture.thenApply(reply -> {
       slidingWindow.receiveReply(
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
similarity index 87%
rename from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
rename to 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
index fba60f0..f868bf8 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
@@ -20,14 +20,12 @@ package org.apache.ratis.protocol;
 
 import java.nio.ByteBuffer;
 
-public class DataStreamReplyImpl implements DataStreamReply {
+public class DataStreamReplyByteBuffer implements DataStreamReply {
   private long streamId;
   private long dataOffset;
   private ByteBuffer response;
 
-  public DataStreamReplyImpl(long streamId,
-                             long dataOffset,
-                             ByteBuffer bf){
+  public DataStreamReplyByteBuffer(long streamId, long dataOffset, ByteBuffer 
bf){
     this.streamId = streamId;
     this.dataOffset = dataOffset;
     this.response = bf;
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
similarity index 87%
rename from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
rename to 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
index a34809c..fee5a04 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
@@ -20,13 +20,12 @@ package org.apache.ratis.protocol;
 
 import java.nio.ByteBuffer;
 
-public class DataStreamRequestClient implements DataStreamRequest{
+public class DataStreamRequestByteBuffer implements DataStreamRequest{
   private long streamId;
   private long dataOffset;
   private ByteBuffer buf;
 
-  public DataStreamRequestClient(long streamId, long dataOffset,
-                                 ByteBuffer buf){
+  public DataStreamRequestByteBuffer(long streamId, long dataOffset, 
ByteBuffer buf){
     this.streamId = streamId;
     this.dataOffset = dataOffset;
     this.buf = buf;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
similarity index 85%
rename from 
ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
rename to 
ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
index 2417707..775833b 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.ratis.netty.decoders;
+package org.apache.ratis.netty.client;
 
-import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
@@ -29,14 +29,14 @@ public class DataStreamReplyDecoder extends 
ByteToMessageDecoder {
 
   @Override
   protected void decode(ChannelHandlerContext channelHandlerContext,
-                        ByteBuf byteBuf, List<Object> list) throws Exception {
+                        ByteBuf byteBuf, List<Object> list) {
 
     if(byteBuf.readableBytes() >= 24){
       long streamId = byteBuf.readLong();
       long dataOffset = byteBuf.readLong();
       long dataLength = byteBuf.readLong();
       if(byteBuf.readableBytes() >= dataLength){
-        DataStreamReplyImpl reply = new DataStreamReplyImpl(streamId,
+        DataStreamReplyByteBuffer reply = new 
DataStreamReplyByteBuffer(streamId,
             dataOffset,
             byteBuf.slice(byteBuf.readerIndex(), (int) 
dataLength).nioBuffer());
         byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
@@ -44,10 +44,7 @@ public class DataStreamReplyDecoder extends 
ByteToMessageDecoder {
         list.add(reply);
       } else {
         byteBuf.resetReaderIndex();
-        return;
       }
-    } else{
-      return;
     }
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
similarity index 82%
rename from 
ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
rename to 
ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
index 84bf570..6bafffa 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.ratis.netty.encoders;
+package org.apache.ratis.netty.client;
 
-import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -26,12 +26,11 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class DataStreamRequestEncoder
-    extends MessageToMessageEncoder<DataStreamRequestClient> {
+public class DataStreamRequestEncoder extends 
MessageToMessageEncoder<DataStreamRequestByteBuffer> {
 
   @Override
   protected void encode(ChannelHandlerContext channelHandlerContext,
-                        DataStreamRequestClient requestData, List<Object> 
list) throws Exception {
+                        DataStreamRequestByteBuffer requestData, List<Object> 
list) {
     ByteBuffer bb = ByteBuffer.allocateDirect(24);
     bb.putLong(requestData.getStreamId());
     bb.putLong(requestData.getDataOffset());
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index c59a928..7053ca9 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,10 +20,7 @@ package org.apache.ratis.netty.client;
 
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.netty.decoders.DataStreamReplyDecoder;
-import org.apache.ratis.netty.encoders.DataStreamRequestEncoder;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamReplyImpl;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
@@ -44,7 +41,6 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientStreamRpc.class);
 
   private RaftPeer server;
-  private RaftProperties raftProperties;
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
   private Channel channel;
   private Queue<CompletableFuture<DataStreamReply>> replies
@@ -52,7 +48,6 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
 
   public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
     this.server = server;
-    this.raftProperties = properties;
   }
 
   synchronized CompletableFuture<DataStreamReply> pollReply() {
@@ -63,9 +58,8 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
InterruptedException {
-        final DataStreamReplyImpl reply = (DataStreamReplyImpl) msg;
-        CompletableFuture<DataStreamReply> f = pollReply();
-        f.complete(reply);
+        final DataStreamReply reply = (DataStreamReply) msg;
+        pollReply().complete(reply);
       }
     };
   }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
similarity index 82%
rename from 
ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
rename to 
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
index 2d7957d..71132d9 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
@@ -16,9 +16,9 @@
  *  limitations under the License.
  */
 
-package org.apache.ratis.netty.encoders;
+package org.apache.ratis.netty.server;
 
-import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -26,13 +26,11 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
 import java.nio.ByteBuffer;
 import java.util.List;
 
-public class DataStreamReplyEncoder extends
-    MessageToMessageEncoder<DataStreamReplyImpl> {
-
+public class DataStreamReplyEncoder extends 
MessageToMessageEncoder<DataStreamReply> {
   @Override
   protected void encode(ChannelHandlerContext channelHandlerContext,
-                        DataStreamReplyImpl dataStreamReply,
-                        List<Object> list) throws Exception {
+                        DataStreamReply dataStreamReply,
+                        List<Object> list) {
 
     ByteBuffer bb = ByteBuffer.allocateDirect(24);
     bb.putLong(dataStreamReply.getStreamId());
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
similarity index 85%
rename from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
rename to 
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 8906700..6d5e4f6 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -16,17 +16,17 @@
  *  limitations under the License.
  */
 
-package org.apache.ratis.protocol;
+package org.apache.ratis.netty.server;
 
+import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 
-public class DataStreamRequestServer implements DataStreamRequest {
+class DataStreamRequestByteBuf implements DataStreamRequest {
   private long streamId;
   private long dataOffset;
   private ByteBuf buf;
 
-  public DataStreamRequestServer(long streamId, long dataOffset,
-                                 ByteBuf buf){
+  DataStreamRequestByteBuf(long streamId, long dataOffset, ByteBuf buf) {
     this.streamId = streamId;
     this.dataOffset = dataOffset;
     this.buf = buf;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
similarity index 83%
rename from 
ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
rename to 
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
index b0aca36..249512c 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
@@ -16,9 +16,8 @@
  *  limitations under the License.
  */
 
-package org.apache.ratis.netty.decoders;
+package org.apache.ratis.netty.server;
 
-import org.apache.ratis.protocol.DataStreamRequestServer;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
@@ -32,9 +31,7 @@ public class DataStreamRequestDecoder extends 
ByteToMessageDecoder {
   }
 
   @Override
-  protected void decode(ChannelHandlerContext channelHandlerContext,
-                        ByteBuf byteBuf,
-                        List<Object> list) throws Exception {
+  protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf 
byteBuf, List<Object> list) {
     if(byteBuf.readableBytes() >= 24){
       long streamId = byteBuf.readLong();
       long dataOffset = byteBuf.readLong();
@@ -42,18 +39,13 @@ public class DataStreamRequestDecoder extends 
ByteToMessageDecoder {
       if(byteBuf.readableBytes() >= dataLength){
         ByteBuf bf = byteBuf.slice(byteBuf.readerIndex(), (int)dataLength);
         bf.retain();
-        DataStreamRequestServer req = new DataStreamRequestServer(streamId,
-            dataOffset,
-            bf);
+        final DataStreamRequestByteBuf req = new 
DataStreamRequestByteBuf(streamId, dataOffset, bf);
         byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
         byteBuf.markReaderIndex();
         list.add(req);
       } else {
         byteBuf.resetReaderIndex();
-        return;
       }
-    } else{
-      return;
     }
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 1c051c4..bf0e46a 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -18,11 +18,8 @@
 
 package org.apache.ratis.netty.server;
 
-import org.apache.ratis.netty.decoders.DataStreamRequestDecoder;
-import org.apache.ratis.netty.encoders.DataStreamReplyEncoder;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamReplyImpl;
-import org.apache.ratis.protocol.DataStreamRequestServer;
+import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
@@ -65,13 +62,13 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-        DataStreamRequestServer req = (DataStreamRequestServer)msg;
+        final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
         req.getBuf().release();
-        DataStreamReply reply = new DataStreamReplyImpl(req.getStreamId(),
+        final DataStreamReply reply = new 
DataStreamReplyByteBuffer(req.getStreamId(),
                                                         req.getDataOffset(),
                                                         
ByteBuffer.wrap("OK".getBytes()));
         ctx.writeAndFlush(reply);

Reply via email to