This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 72b3ab5 RATIS-1105. Refactor Netty streaming encoder and decoder.
(#231)
72b3ab5 is described below
commit 72b3ab5c2e8a03bc9a5c5ea6b0bb1b736c0d1a20
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Oct 24 09:37:52 2020 +0800
RATIS-1105. Refactor Netty streaming encoder and decoder. (#231)
* RATIS-1105. Refactor Netty streaming encoder and decoder.
* Some minor changes.
* Shutdown workerGroup in NettyClientStreamRpc
* Enforce replies ordering.
* Fix checkstyle
---
.../impl/DataStreamPacketByteBuffer.java | 4 +-
.../apache/ratis/protocol/DataStreamPacket.java | 4 +-
.../ratis/protocol/DataStreamPacketHeader.java | 5 +++
.../org/apache/ratis/protocol/DataStreamReply.java | 9 +++-
.../apache/ratis/protocol/DataStreamRequest.java | 4 ++
...ketHeader.java => DataStreamRequestHeader.java} | 30 +++++++------
.../apache/ratis/netty/NettyDataStreamUtils.java | 51 +++++++++++++++++-----
.../ratis/netty/client/DataStreamReplyDecoder.java | 34 ---------------
.../netty/client/DataStreamRequestEncoder.java | 42 ------------------
.../ratis/netty/client/NettyClientStreamRpc.java | 34 ++++++++++++++-
.../ratis/netty/server/DataStreamReplyEncoder.java | 38 ----------------
.../netty/server/DataStreamRequestByteBuf.java | 12 +++--
.../netty/server/DataStreamRequestDecoder.java | 51 ----------------------
.../ratis/netty/server/NettyServerStreamRpc.java | 39 +++++++++++++++--
14 files changed, 153 insertions(+), 204 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 15870a5..6eea669 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -21,10 +21,8 @@ import java.nio.ByteBuffer;
/**
* Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link
ByteBuffer}.
- *
- * This class is immutable.
*/
-public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
+public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
private static final ByteBuffer EMPTY =
ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
private final ByteBuffer buffer;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index faf4648..fa3f706 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -27,7 +27,9 @@ public interface DataStreamPacket {
long getDataLength();
- default void putTo(LongConsumer putLong) {
+ int getHeaderSize();
+
+ default void writeHeaderTo(LongConsumer putLong) {
putLong.accept(getStreamId());
putLong.accept(getStreamOffset());
putLong.accept(getDataLength());
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index b34216b..56274b6 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -49,4 +49,9 @@ public class DataStreamPacketHeader extends
DataStreamPacketImpl {
public long getDataLength() {
return dataLength;
}
+
+ @Override
+ public int getHeaderSize() {
+ return getSize();
+ }
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 44a6ea0..9fb8fa7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -34,8 +34,13 @@ public interface DataStreamReply extends DataStreamPacket {
long getBytesWritten();
@Override
- default void putTo(LongConsumer putLong) {
- DataStreamPacket.super.putTo(putLong);
+ default int getHeaderSize() {
+ return DataStreamReplyHeader.getSize();
+ }
+
+ @Override
+ default void writeHeaderTo(LongConsumer putLong) {
+ DataStreamPacket.super.writeHeaderTo(putLong);
putLong.accept(getBytesWritten());
putLong.accept(toFlags(isSuccess()));
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index 8db10fe..797ddf4 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -19,4 +19,8 @@
package org.apache.ratis.protocol;
public interface DataStreamRequest extends DataStreamPacket {
+ @Override
+ default int getHeaderSize() {
+ return DataStreamRequestHeader.getSize();
+ }
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
similarity index 54%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
copy to
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index b34216b..d6646df 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,35 +18,37 @@
package org.apache.ratis.protocol;
-import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.util.SizeInBytes;
import java.util.function.LongSupplier;
-/** The header format is streamId, streamOffset, dataLength. */
-public class DataStreamPacketHeader extends DataStreamPacketImpl {
- private static final SizeInBytes SIZE = SizeInBytes.valueOf(24);
+/**
+ * The header format is the same {@link DataStreamPacketHeader}
+ * since there are no additional fields.
+ */
+public class DataStreamRequestHeader extends DataStreamPacketHeader implements
DataStreamRequest {
+ private static final SizeInBytes SIZE =
SizeInBytes.valueOf(DataStreamPacketHeader.getSize());
public static int getSize() {
return SIZE.getSizeInt();
}
- public static DataStreamPacketHeader read(LongSupplier readLong, int
readableBytes) {
+ public static DataStreamRequestHeader read(LongSupplier readLong, int
readableBytes) {
if (readableBytes < getSize()) {
return null;
}
- return new DataStreamPacketHeader(readLong.getAsLong(),
readLong.getAsLong(), readLong.getAsLong());
+ final DataStreamPacketHeader packerHeader =
DataStreamPacketHeader.read(readLong, readableBytes);
+ if (packerHeader == null) {
+ return null;
+ }
+ return new DataStreamRequestHeader(packerHeader);
}
- private final long dataLength;
-
- public DataStreamPacketHeader(long streamId, long streamOffset, long
dataLength) {
- super(streamId, streamOffset);
- this.dataLength = dataLength;
+ public DataStreamRequestHeader(long streamId, long streamOffset, long
dataLength) {
+ super(streamId, streamOffset, dataLength);
}
- @Override
- public long getDataLength() {
- return dataLength;
+ public DataStreamRequestHeader(DataStreamPacketHeader header) {
+ this(header.getStreamId(), header.getStreamOffset(),
header.getDataLength());
}
}
\ No newline at end of file
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 21f8d04..757c179 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -17,34 +17,63 @@
*/
package org.apache.ratis.netty;
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.protocol.DataStreamPacketHeader;
import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
-import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
public interface NettyDataStreamUtils {
- static DataStreamReplyByteBuffer decode(ByteBuf buf) {
- final DataStreamReplyHeader header =
DataStreamReplyHeader.read(buf::readLong, buf.readableBytes());
+ static void encodeDataStreamPacketByteBuffer(DataStreamPacketByteBuffer
packet, Consumer<ByteBuf> out) {
+ final ByteBuf buf =
PooledByteBufAllocator.DEFAULT.directBuffer(packet.getHeaderSize());
+ packet.writeHeaderTo(buf::writeLong);
+ out.accept(buf);
+ out.accept(Unpooled.wrappedBuffer(packet.slice()));
+ }
+
+ static DataStreamRequestByteBuf decodeDataStreamRequestByteBuf(ByteBuf buf) {
+ return Optional.ofNullable(DataStreamRequestHeader.read(buf::readLong,
buf.readableBytes()))
+ .map(header -> checkHeader(header, buf))
+ .map(header -> new DataStreamRequestByteBuf(header, decodeData(buf,
header, ByteBuf::retain)))
+ .orElse(null);
+ }
+
+ static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf
buf) {
+ return Optional.ofNullable(DataStreamReplyHeader.read(buf::readLong,
buf.readableBytes()))
+ .map(header -> checkHeader(header, buf))
+ .map(header -> new DataStreamReplyByteBuffer(header, decodeData(buf,
header, ByteBuf::nioBuffer)))
+ .orElse(null);
+ }
+
+ static <HEADER extends DataStreamPacketHeader> HEADER checkHeader(HEADER
header, ByteBuf buf) {
if (header == null) {
return null;
}
- final int dataLength = Math.toIntExact(header.getDataLength());
-
- if (buf.readableBytes() < dataLength) {
+ if (buf.readableBytes() < header.getDataLength()) {
buf.resetReaderIndex();
return null;
}
+ return header;
+ }
- final ByteBuffer buffer;
+ static <DATA> DATA decodeData(ByteBuf buf, DataStreamPacketHeader header,
Function<ByteBuf, DATA> toData) {
+ final int dataLength = Math.toIntExact(header.getDataLength());
+ final DATA data;
if (dataLength > 0) {
- buffer = buf.slice(buf.readerIndex(), dataLength).nioBuffer();
+ data = toData.apply(buf.slice(buf.readerIndex(), dataLength));
buf.readerIndex(buf.readerIndex() + dataLength);
} else {
- buffer = null;
+ data = null;
}
buf.markReaderIndex();
-
- return new DataStreamReplyByteBuffer(header, buffer);
+ return data;
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
deleted file mode 100644
index 0e92b9f..0000000
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.netty.client;
-
-import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-import java.util.Optional;
-
-public class DataStreamReplyDecoder extends ByteToMessageDecoder {
- @Override
- protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf
buf, List<Object> list) {
- Optional.ofNullable(NettyDataStreamUtils.decode(buf)).ifPresent(list::add);
- }
-}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
deleted file mode 100644
index 0864b12..0000000
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.netty.client;
-
-import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class DataStreamRequestEncoder extends
MessageToMessageEncoder<DataStreamRequestByteBuffer> {
-
- @Override
- protected void encode(ChannelHandlerContext channelHandlerContext,
- DataStreamRequestByteBuffer request, List<Object> list) {
- final ByteBuffer data = request.slice();
- ByteBuffer bb = ByteBuffer.allocateDirect(24);
- bb.putLong(request.getStreamId());
- bb.putLong(request.getStreamOffset());
- bb.putLong(data.remaining());
- bb.flip();
- list.add(Unpooled.wrappedBuffer(bb, data));
- }
-}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 7053ca9..c6b6266 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -20,20 +20,27 @@ package org.apache.ratis.netty.client;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.*;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -70,13 +77,35 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public void initChannel(SocketChannel ch)
throws Exception {
ChannelPipeline p = ch.pipeline();
- p.addLast(new DataStreamRequestEncoder());
- p.addLast(new DataStreamReplyDecoder());
+ p.addLast(newEncoder());
+ p.addLast(newDecoder());
p.addLast(getClientHandler());
}
};
}
+ MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
+ return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
+ @Override
+ protected void encode(ChannelHandlerContext context,
DataStreamRequestByteBuffer request, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(request,
out::add);
+ }
+ };
+ }
+
+ ByteToMessageDecoder newDecoder() {
+ return new ByteToMessageDecoder() {
+ {
+ this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext context, ByteBuf buf,
List<Object> out) {
+
Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamReplyByteBuffer(buf)).ifPresent(out::add);
+ }
+ };
+ }
+
@Override
public synchronized CompletableFuture<DataStreamReply>
streamAsync(DataStreamRequest request) {
CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
@@ -106,5 +135,6 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
@Override
public void closeClient(){
channel.close().syncUninterruptibly();
+ workerGroup.shutdownGracefully();
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
deleted file mode 100644
index 58ac232..0000000
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.netty.server;
-
-import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.protocol.DataStreamReplyHeader;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class DataStreamReplyEncoder extends
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
- @Override
- protected void encode(ChannelHandlerContext context,
DataStreamReplyByteBuffer reply, List<Object> list) {
- final ByteBuffer buffer =
ByteBuffer.allocateDirect(DataStreamReplyHeader.getSize());
- reply.putTo(buffer::putLong);
- buffer.flip();
- list.add(Unpooled.wrappedBuffer(buffer, reply.slice()));
- }
-}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index b782481..9f67c75 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -20,19 +20,25 @@ package org.apache.ratis.netty.server;
import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
/**
* Implements {@link DataStreamRequest} with {@link ByteBuf}.
*
* This class is immutable.
*/
-class DataStreamRequestByteBuf extends DataStreamPacketImpl implements
DataStreamRequest {
+public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements
DataStreamRequest {
private final ByteBuf buf;
- DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf buf) {
+ public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf
buf) {
super(streamId, streamOffset);
- this.buf = buf.asReadOnly();
+ this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
+ }
+
+ public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf)
{
+ this(header.getStreamId(), header.getStreamOffset(), buf);
}
@Override
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
deleted file mode 100644
index 249512c..0000000
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ratis.netty.server;
-
-import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
-public class DataStreamRequestDecoder extends ByteToMessageDecoder {
-
- public DataStreamRequestDecoder() {
- this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
- }
-
- @Override
- protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf
byteBuf, List<Object> list) {
- if(byteBuf.readableBytes() >= 24){
- long streamId = byteBuf.readLong();
- long dataOffset = byteBuf.readLong();
- long dataLength = byteBuf.readLong();
- if(byteBuf.readableBytes() >= dataLength){
- ByteBuf bf = byteBuf.slice(byteBuf.readerIndex(), (int)dataLength);
- bf.retain();
- final DataStreamRequestByteBuf req = new
DataStreamRequestByteBuf(streamId, dataOffset, bf);
- byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
- byteBuf.markReaderIndex();
- list.add(req);
- } else {
- byteBuf.resetReaderIndex();
- }
- }
- }
-}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 54baea6..b248572 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -38,6 +39,8 @@ import org.apache.ratis.thirdparty.io.netty.channel.*;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.util.IOUtils;
@@ -53,6 +56,7 @@ import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -61,6 +65,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -205,6 +210,9 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private ChannelInboundHandler getServerHandler(){
return new ChannelInboundHandlerAdapter(){
+ private final AtomicReference<CompletableFuture<?>> previous
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
+
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
IOException {
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)
msg;
@@ -230,11 +238,14 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
}
- JavaUtils.allOf(remoteWrites).thenCombine(localWrite, (v,
bytesWritten) -> {
+ final CompletableFuture<?> current = previous.get()
+ .thenCombine(JavaUtils.allOf(remoteWrites), (u, v) -> null)
+ .thenCombine(localWrite, (v, bytesWritten) -> {
buf.release();
sendReply(remoteWrites, request, bytesWritten, ctx);
return null;
});
+ previous.set(current);
}
};
}
@@ -256,13 +267,35 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
- p.addLast(new DataStreamRequestDecoder());
- p.addLast(new DataStreamReplyEncoder());
+ p.addLast(newDecoder());
+ p.addLast(newEncoder());
p.addLast(getServerHandler());
}
};
}
+ ByteToMessageDecoder newDecoder() {
+ return new ByteToMessageDecoder() {
+ {
+ this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext context, ByteBuf buf,
List<Object> out) {
+
Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamRequestByteBuf(buf)).ifPresent(out::add);
+ }
+ };
+ }
+
+ MessageToMessageEncoder<DataStreamReplyByteBuffer> newEncoder() {
+ return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
+ @Override
+ protected void encode(ChannelHandlerContext context,
DataStreamReplyByteBuffer reply, List<Object> out) {
+ NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(reply, out::add);
+ }
+ };
+ }
+
private Channel getChannel() {
return channelFuture.awaitUninterruptibly().channel();
}