This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 72b3ab5  RATIS-1105. Refactor Netty streaming encoder and decoder. 
(#231)
72b3ab5 is described below

commit 72b3ab5c2e8a03bc9a5c5ea6b0bb1b736c0d1a20
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Oct 24 09:37:52 2020 +0800

    RATIS-1105. Refactor Netty streaming encoder and decoder. (#231)
    
    * RATIS-1105. Refactor Netty streaming encoder and decoder.
    
    * Some minor changes.
    
    * Shutdown workerGroup in NettyClientStreamRpc
    
    * Enforce replies ordering.
    
    * Fix checkstyle
---
 .../impl/DataStreamPacketByteBuffer.java           |  4 +-
 .../apache/ratis/protocol/DataStreamPacket.java    |  4 +-
 .../ratis/protocol/DataStreamPacketHeader.java     |  5 +++
 .../org/apache/ratis/protocol/DataStreamReply.java |  9 +++-
 .../apache/ratis/protocol/DataStreamRequest.java   |  4 ++
 ...ketHeader.java => DataStreamRequestHeader.java} | 30 +++++++------
 .../apache/ratis/netty/NettyDataStreamUtils.java   | 51 +++++++++++++++++-----
 .../ratis/netty/client/DataStreamReplyDecoder.java | 34 ---------------
 .../netty/client/DataStreamRequestEncoder.java     | 42 ------------------
 .../ratis/netty/client/NettyClientStreamRpc.java   | 34 ++++++++++++++-
 .../ratis/netty/server/DataStreamReplyEncoder.java | 38 ----------------
 .../netty/server/DataStreamRequestByteBuf.java     | 12 +++--
 .../netty/server/DataStreamRequestDecoder.java     | 51 ----------------------
 .../ratis/netty/server/NettyServerStreamRpc.java   | 39 +++++++++++++++--
 14 files changed, 153 insertions(+), 204 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 15870a5..6eea669 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -21,10 +21,8 @@ import java.nio.ByteBuffer;
 
 /**
  * Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link 
ByteBuffer}.
- *
- * This class is immutable.
  */
-public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
+public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
   private static final ByteBuffer EMPTY = 
ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
 
   private final ByteBuffer buffer;
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index faf4648..fa3f706 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -27,7 +27,9 @@ public interface DataStreamPacket {
 
   long getDataLength();
 
-  default void putTo(LongConsumer putLong) {
+  int getHeaderSize();
+
+  default void writeHeaderTo(LongConsumer putLong) {
     putLong.accept(getStreamId());
     putLong.accept(getStreamOffset());
     putLong.accept(getDataLength());
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index b34216b..56274b6 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -49,4 +49,9 @@ public class DataStreamPacketHeader extends 
DataStreamPacketImpl {
   public long getDataLength() {
     return dataLength;
   }
+
+  @Override
+  public int getHeaderSize() {
+    return getSize();
+  }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 44a6ea0..9fb8fa7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -34,8 +34,13 @@ public interface DataStreamReply extends DataStreamPacket {
   long getBytesWritten();
 
   @Override
-  default void putTo(LongConsumer putLong) {
-    DataStreamPacket.super.putTo(putLong);
+  default int getHeaderSize() {
+    return DataStreamReplyHeader.getSize();
+  }
+
+  @Override
+  default void writeHeaderTo(LongConsumer putLong) {
+    DataStreamPacket.super.writeHeaderTo(putLong);
     putLong.accept(getBytesWritten());
     putLong.accept(toFlags(isSuccess()));
   }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index 8db10fe..797ddf4 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -19,4 +19,8 @@
 package org.apache.ratis.protocol;
 
 public interface DataStreamRequest extends DataStreamPacket {
+  @Override
+  default int getHeaderSize() {
+    return DataStreamRequestHeader.getSize();
+  }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
similarity index 54%
copy from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
copy to 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index b34216b..d6646df 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,35 +18,37 @@
 
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.util.SizeInBytes;
 
 import java.util.function.LongSupplier;
 
-/** The header format is streamId, streamOffset, dataLength. */
-public class DataStreamPacketHeader extends DataStreamPacketImpl {
-  private static final SizeInBytes SIZE = SizeInBytes.valueOf(24);
+/**
+ * The header format is the same {@link DataStreamPacketHeader}
+ * since there are no additional fields.
+ */
+public class DataStreamRequestHeader extends DataStreamPacketHeader implements 
DataStreamRequest {
+  private static final SizeInBytes SIZE = 
SizeInBytes.valueOf(DataStreamPacketHeader.getSize());
 
   public static int getSize() {
     return SIZE.getSizeInt();
   }
 
-  public static DataStreamPacketHeader read(LongSupplier readLong, int 
readableBytes) {
+  public static DataStreamRequestHeader read(LongSupplier readLong, int 
readableBytes) {
     if (readableBytes < getSize()) {
       return null;
     }
-    return new DataStreamPacketHeader(readLong.getAsLong(), 
readLong.getAsLong(), readLong.getAsLong());
+    final DataStreamPacketHeader packerHeader = 
DataStreamPacketHeader.read(readLong, readableBytes);
+    if (packerHeader == null) {
+      return null;
+    }
+    return new DataStreamRequestHeader(packerHeader);
   }
 
-  private final long dataLength;
-
-  public DataStreamPacketHeader(long streamId, long streamOffset, long 
dataLength) {
-    super(streamId, streamOffset);
-    this.dataLength = dataLength;
+  public DataStreamRequestHeader(long streamId, long streamOffset, long 
dataLength) {
+    super(streamId, streamOffset, dataLength);
   }
 
-  @Override
-  public long getDataLength() {
-    return dataLength;
+  public DataStreamRequestHeader(DataStreamPacketHeader header) {
+    this(header.getStreamId(), header.getStreamOffset(), 
header.getDataLength());
   }
 }
\ No newline at end of file
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 21f8d04..757c179 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -17,34 +17,63 @@
  */
 package org.apache.ratis.netty;
 
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.protocol.DataStreamPacketHeader;
 import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
-import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 public interface NettyDataStreamUtils {
-  static DataStreamReplyByteBuffer decode(ByteBuf buf) {
-    final DataStreamReplyHeader header = 
DataStreamReplyHeader.read(buf::readLong, buf.readableBytes());
+  static void encodeDataStreamPacketByteBuffer(DataStreamPacketByteBuffer 
packet, Consumer<ByteBuf> out) {
+    final ByteBuf buf = 
PooledByteBufAllocator.DEFAULT.directBuffer(packet.getHeaderSize());
+    packet.writeHeaderTo(buf::writeLong);
+    out.accept(buf);
+    out.accept(Unpooled.wrappedBuffer(packet.slice()));
+  }
+
+  static DataStreamRequestByteBuf decodeDataStreamRequestByteBuf(ByteBuf buf) {
+    return Optional.ofNullable(DataStreamRequestHeader.read(buf::readLong, 
buf.readableBytes()))
+        .map(header -> checkHeader(header, buf))
+        .map(header -> new DataStreamRequestByteBuf(header, decodeData(buf, 
header, ByteBuf::retain)))
+        .orElse(null);
+  }
+
+  static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf 
buf) {
+    return Optional.ofNullable(DataStreamReplyHeader.read(buf::readLong, 
buf.readableBytes()))
+        .map(header -> checkHeader(header, buf))
+        .map(header -> new DataStreamReplyByteBuffer(header, decodeData(buf, 
header, ByteBuf::nioBuffer)))
+        .orElse(null);
+  }
+
+  static <HEADER extends DataStreamPacketHeader> HEADER checkHeader(HEADER 
header, ByteBuf buf) {
     if (header == null) {
       return null;
     }
-    final int dataLength = Math.toIntExact(header.getDataLength());
-
-    if (buf.readableBytes() < dataLength) {
+    if (buf.readableBytes() < header.getDataLength()) {
       buf.resetReaderIndex();
       return null;
     }
+    return header;
+  }
 
-    final ByteBuffer buffer;
+  static <DATA> DATA decodeData(ByteBuf buf, DataStreamPacketHeader header, 
Function<ByteBuf, DATA> toData) {
+    final int dataLength = Math.toIntExact(header.getDataLength());
+    final DATA data;
     if (dataLength > 0) {
-      buffer = buf.slice(buf.readerIndex(), dataLength).nioBuffer();
+      data = toData.apply(buf.slice(buf.readerIndex(), dataLength));
       buf.readerIndex(buf.readerIndex() + dataLength);
     } else {
-      buffer = null;
+      data = null;
     }
     buf.markReaderIndex();
-
-    return new DataStreamReplyByteBuffer(header, buffer);
+    return data;
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
deleted file mode 100644
index 0e92b9f..0000000
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.netty.client;
-
-import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-import java.util.Optional;
-
-public class DataStreamReplyDecoder extends ByteToMessageDecoder {
-  @Override
-  protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf 
buf, List<Object> list) {
-    Optional.ofNullable(NettyDataStreamUtils.decode(buf)).ifPresent(list::add);
-  }
-}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
deleted file mode 100644
index 0864b12..0000000
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.netty.client;
-
-import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class DataStreamRequestEncoder extends 
MessageToMessageEncoder<DataStreamRequestByteBuffer> {
-
-  @Override
-  protected void encode(ChannelHandlerContext channelHandlerContext,
-      DataStreamRequestByteBuffer request, List<Object> list) {
-    final ByteBuffer data = request.slice();
-    ByteBuffer bb = ByteBuffer.allocateDirect(24);
-    bb.putLong(request.getStreamId());
-    bb.putLong(request.getStreamOffset());
-    bb.putLong(data.remaining());
-    bb.flip();
-    list.add(Unpooled.wrappedBuffer(bb, data));
-  }
-}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 7053ca9..c6b6266 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -20,20 +20,27 @@ package org.apache.ratis.netty.client;
 
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.*;
 import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.ratis.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 
@@ -70,13 +77,35 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
       public void initChannel(SocketChannel ch)
           throws Exception {
         ChannelPipeline p = ch.pipeline();
-        p.addLast(new DataStreamRequestEncoder());
-        p.addLast(new DataStreamReplyDecoder());
+        p.addLast(newEncoder());
+        p.addLast(newDecoder());
         p.addLast(getClientHandler());
       }
     };
   }
 
+  MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
+    return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
+      @Override
+      protected void encode(ChannelHandlerContext context, 
DataStreamRequestByteBuffer request, List<Object> out) {
+        NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(request, 
out::add);
+      }
+    };
+  }
+
+  ByteToMessageDecoder newDecoder() {
+    return new ByteToMessageDecoder() {
+      {
+        this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
+      }
+
+      @Override
+      protected void decode(ChannelHandlerContext context, ByteBuf buf, 
List<Object> out) {
+        
Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamReplyByteBuffer(buf)).ifPresent(out::add);
+      }
+    };
+  }
+
   @Override
   public synchronized CompletableFuture<DataStreamReply> 
streamAsync(DataStreamRequest request) {
     CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
@@ -106,5 +135,6 @@ public class NettyClientStreamRpc implements 
DataStreamClientRpc {
   @Override
   public void closeClient(){
     channel.close().syncUninterruptibly();
+    workerGroup.shutdownGracefully();
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
deleted file mode 100644
index 58ac232..0000000
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.ratis.netty.server;
-
-import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.protocol.DataStreamReplyHeader;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class DataStreamReplyEncoder extends 
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
-  @Override
-  protected void encode(ChannelHandlerContext context, 
DataStreamReplyByteBuffer reply, List<Object> list) {
-    final ByteBuffer buffer = 
ByteBuffer.allocateDirect(DataStreamReplyHeader.getSize());
-    reply.putTo(buffer::putLong);
-    buffer.flip();
-    list.add(Unpooled.wrappedBuffer(buffer, reply.slice()));
-  }
-}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index b782481..9f67c75 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -20,19 +20,25 @@ package org.apache.ratis.netty.server;
 
 import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
 /**
  * Implements {@link DataStreamRequest} with {@link ByteBuf}.
  *
  * This class is immutable.
  */
-class DataStreamRequestByteBuf extends DataStreamPacketImpl implements 
DataStreamRequest {
+public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements 
DataStreamRequest {
   private final ByteBuf buf;
 
-  DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf buf) {
+  public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf 
buf) {
     super(streamId, streamOffset);
-    this.buf = buf.asReadOnly();
+    this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
+  }
+
+  public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) 
{
+    this(header.getStreamId(), header.getStreamOffset(), buf);
   }
 
   @Override
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
deleted file mode 100644
index 249512c..0000000
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.ratis.netty.server;
-
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
-public class DataStreamRequestDecoder extends ByteToMessageDecoder {
-
-  public DataStreamRequestDecoder() {
-    this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
-  }
-
-  @Override
-  protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf 
byteBuf, List<Object> list) {
-    if(byteBuf.readableBytes() >= 24){
-      long streamId = byteBuf.readLong();
-      long dataOffset = byteBuf.readLong();
-      long dataLength = byteBuf.readLong();
-      if(byteBuf.readableBytes() >= dataLength){
-        ByteBuf bf = byteBuf.slice(byteBuf.readerIndex(), (int)dataLength);
-        bf.retain();
-        final DataStreamRequestByteBuf req = new 
DataStreamRequestByteBuf(streamId, dataOffset, bf);
-        byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
-        byteBuf.markReaderIndex();
-        list.add(req);
-      } else {
-        byteBuf.resetReaderIndex();
-      }
-    }
-  }
-}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 54baea6..b248572 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -38,6 +39,8 @@ import org.apache.ratis.thirdparty.io.netty.channel.*;
 import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
 import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
 import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
 import org.apache.ratis.util.IOUtils;
@@ -53,6 +56,7 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -61,6 +65,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -205,6 +210,9 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
+      private final AtomicReference<CompletableFuture<?>> previous
+          = new AtomicReference<>(CompletableFuture.completedFuture(null));
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf) 
msg;
@@ -230,11 +238,14 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
           }
         }
 
-        JavaUtils.allOf(remoteWrites).thenCombine(localWrite, (v, 
bytesWritten) -> {
+        final CompletableFuture<?> current = previous.get()
+            .thenCombine(JavaUtils.allOf(remoteWrites), (u, v) -> null)
+            .thenCombine(localWrite, (v, bytesWritten) -> {
               buf.release();
               sendReply(remoteWrites, request, bytesWritten, ctx);
               return null;
         });
+        previous.set(current);
       }
     };
   }
@@ -256,13 +267,35 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       @Override
       public void initChannel(SocketChannel ch) {
         ChannelPipeline p = ch.pipeline();
-        p.addLast(new DataStreamRequestDecoder());
-        p.addLast(new DataStreamReplyEncoder());
+        p.addLast(newDecoder());
+        p.addLast(newEncoder());
         p.addLast(getServerHandler());
       }
     };
   }
 
+  ByteToMessageDecoder newDecoder() {
+    return new ByteToMessageDecoder() {
+      {
+        this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
+      }
+
+      @Override
+      protected void decode(ChannelHandlerContext context, ByteBuf buf, 
List<Object> out) {
+        
Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamRequestByteBuf(buf)).ifPresent(out::add);
+      }
+    };
+  }
+
+  MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
+    return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
+      @Override
+      protected void encode(ChannelHandlerContext context, 
DataStreamReplyByteBuffer reply, List<Object> out) {
+        NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(reply, out::add);
+      }
+    };
+  }
+
   private Channel getChannel() {
     return channelFuture.awaitUninterruptibly().channel();
   }

Reply via email to