This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 11689cd RATIS-1078. Refactor DataStream classes. (#210)
11689cd is described below
commit 11689cd536b05bde323c9c6c6b57a8e82a60b680
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Sep 29 09:07:26 2020 +0800
RATIS-1078. Refactor DataStream classes. (#210)
Co-authored-by: Tsz Wo Sze <[email protected]>
---
.../org/apache/ratis/client/impl/OrderedStreamAsync.java | 8 ++++----
...StreamReplyImpl.java => DataStreamReplyByteBuffer.java} | 6 ++----
...RequestClient.java => DataStreamRequestByteBuffer.java} | 5 ++---
.../netty/{decoders => client}/DataStreamReplyDecoder.java | 13 +++++--------
.../{encoders => client}/DataStreamRequestEncoder.java | 11 +++++------
.../apache/ratis/netty/client/NettyClientStreamRpc.java | 12 +++---------
.../netty/{encoders => server}/DataStreamReplyEncoder.java | 12 +++++-------
.../ratis/netty/server/DataStreamRequestByteBuf.java | 8 ++++----
.../{decoders => server}/DataStreamRequestDecoder.java | 14 +++-----------
.../apache/ratis/netty/server/NettyServerStreamRpc.java | 9 +++------
10 files changed, 36 insertions(+), 62 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 34ba99e..093190b 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -22,7 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -47,8 +47,8 @@ public class OrderedStreamAsync {
private boolean isFirst = false;
private CompletableFuture<DataStreamReply> replyFuture = new
CompletableFuture<>();
- public DataStreamRequestClient newDataStreamRequest(){
- return new DataStreamRequestClient(streamId, messageId, data.slice());
+ public DataStreamRequestByteBuffer newDataStreamRequest(){
+ return new DataStreamRequestByteBuffer(streamId, messageId,
data.slice());
}
@Override
@@ -142,7 +142,7 @@ public class OrderedStreamAsync {
if(slidingWindow.isFirst(request.getSeqNum())){
request.setFirstRequest();
}
- DataStreamRequestClient rpcRequest = request.newDataStreamRequest();
+ DataStreamRequestByteBuffer rpcRequest = request.newDataStreamRequest();
CompletableFuture<DataStreamReply> requestFuture =
dataStreamClientRpc.streamAsync(rpcRequest);
requestFuture.thenApply(reply -> {
slidingWindow.receiveReply(
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
similarity index 87%
rename from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
rename to
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
index fba60f0..f868bf8 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
@@ -20,14 +20,12 @@ package org.apache.ratis.protocol;
import java.nio.ByteBuffer;
-public class DataStreamReplyImpl implements DataStreamReply {
+public class DataStreamReplyByteBuffer implements DataStreamReply {
private long streamId;
private long dataOffset;
private ByteBuffer response;
- public DataStreamReplyImpl(long streamId,
- long dataOffset,
- ByteBuffer bf){
+ public DataStreamReplyByteBuffer(long streamId, long dataOffset, ByteBuffer
bf){
this.streamId = streamId;
this.dataOffset = dataOffset;
this.response = bf;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
similarity index 87%
rename from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
rename to
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
index a34809c..fee5a04 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
@@ -20,13 +20,12 @@ package org.apache.ratis.protocol;
import java.nio.ByteBuffer;
-public class DataStreamRequestClient implements DataStreamRequest{
+public class DataStreamRequestByteBuffer implements DataStreamRequest{
private long streamId;
private long dataOffset;
private ByteBuffer buf;
- public DataStreamRequestClient(long streamId, long dataOffset,
- ByteBuffer buf){
+ public DataStreamRequestByteBuffer(long streamId, long dataOffset,
ByteBuffer buf){
this.streamId = streamId;
this.dataOffset = dataOffset;
this.buf = buf;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
similarity index 85%
rename from
ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
rename to
ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
index 2417707..775833b 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +16,9 @@
* limitations under the License.
*/
-package org.apache.ratis.netty.decoders;
+package org.apache.ratis.netty.client;
-import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
@@ -29,14 +29,14 @@ public class DataStreamReplyDecoder extends
ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
- ByteBuf byteBuf, List<Object> list) throws Exception {
+ ByteBuf byteBuf, List<Object> list) {
if(byteBuf.readableBytes() >= 24){
long streamId = byteBuf.readLong();
long dataOffset = byteBuf.readLong();
long dataLength = byteBuf.readLong();
if(byteBuf.readableBytes() >= dataLength){
- DataStreamReplyImpl reply = new DataStreamReplyImpl(streamId,
+ DataStreamReplyByteBuffer reply = new
DataStreamReplyByteBuffer(streamId,
dataOffset,
byteBuf.slice(byteBuf.readerIndex(), (int)
dataLength).nioBuffer());
byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
@@ -44,10 +44,7 @@ public class DataStreamReplyDecoder extends
ByteToMessageDecoder {
list.add(reply);
} else {
byteBuf.resetReaderIndex();
- return;
}
- } else{
- return;
}
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
similarity index 82%
rename from
ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
rename to
ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
index 84bf570..6bafffa 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,9 +16,9 @@
* limitations under the License.
*/
-package org.apache.ratis.netty.encoders;
+package org.apache.ratis.netty.client;
-import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -26,12 +26,11 @@ import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
import java.nio.ByteBuffer;
import java.util.List;
-public class DataStreamRequestEncoder
- extends MessageToMessageEncoder<DataStreamRequestClient> {
+public class DataStreamRequestEncoder extends
MessageToMessageEncoder<DataStreamRequestByteBuffer> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
- DataStreamRequestClient requestData, List<Object>
list) throws Exception {
+ DataStreamRequestByteBuffer requestData, List<Object>
list) {
ByteBuffer bb = ByteBuffer.allocateDirect(24);
bb.putLong(requestData.getStreamId());
bb.putLong(requestData.getDataOffset());
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index c59a928..7053ca9 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,10 +20,7 @@ package org.apache.ratis.netty.client;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.netty.decoders.DataStreamReplyDecoder;
-import org.apache.ratis.netty.encoders.DataStreamRequestEncoder;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamReplyImpl;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
@@ -44,7 +41,6 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyClientStreamRpc.class);
private RaftPeer server;
- private RaftProperties raftProperties;
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private Channel channel;
private Queue<CompletableFuture<DataStreamReply>> replies
@@ -52,7 +48,6 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
this.server = server;
- this.raftProperties = properties;
}
synchronized CompletableFuture<DataStreamReply> pollReply() {
@@ -63,9 +58,8 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
InterruptedException {
- final DataStreamReplyImpl reply = (DataStreamReplyImpl) msg;
- CompletableFuture<DataStreamReply> f = pollReply();
- f.complete(reply);
+ final DataStreamReply reply = (DataStreamReply) msg;
+ pollReply().complete(reply);
}
};
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
similarity index 82%
rename from
ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
rename to
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
index 2d7957d..71132d9 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
@@ -16,9 +16,9 @@
* limitations under the License.
*/
-package org.apache.ratis.netty.encoders;
+package org.apache.ratis.netty.server;
-import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -26,13 +26,11 @@ import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
import java.nio.ByteBuffer;
import java.util.List;
-public class DataStreamReplyEncoder extends
- MessageToMessageEncoder<DataStreamReplyImpl> {
-
+public class DataStreamReplyEncoder extends
MessageToMessageEncoder<DataStreamReply> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
- DataStreamReplyImpl dataStreamReply,
- List<Object> list) throws Exception {
+ DataStreamReply dataStreamReply,
+ List<Object> list) {
ByteBuffer bb = ByteBuffer.allocateDirect(24);
bb.putLong(dataStreamReply.getStreamId());
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
similarity index 85%
rename from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
rename to
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 8906700..6d5e4f6 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -16,17 +16,17 @@
* limitations under the License.
*/
-package org.apache.ratis.protocol;
+package org.apache.ratis.netty.server;
+import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-public class DataStreamRequestServer implements DataStreamRequest {
+class DataStreamRequestByteBuf implements DataStreamRequest {
private long streamId;
private long dataOffset;
private ByteBuf buf;
- public DataStreamRequestServer(long streamId, long dataOffset,
- ByteBuf buf){
+ DataStreamRequestByteBuf(long streamId, long dataOffset, ByteBuf buf) {
this.streamId = streamId;
this.dataOffset = dataOffset;
this.buf = buf;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
similarity index 83%
rename from
ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
rename to
ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
index b0aca36..249512c 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestDecoder.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.ratis.netty.decoders;
+package org.apache.ratis.netty.server;
-import org.apache.ratis.protocol.DataStreamRequestServer;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
@@ -32,9 +31,7 @@ public class DataStreamRequestDecoder extends
ByteToMessageDecoder {
}
@Override
- protected void decode(ChannelHandlerContext channelHandlerContext,
- ByteBuf byteBuf,
- List<Object> list) throws Exception {
+ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf
byteBuf, List<Object> list) {
if(byteBuf.readableBytes() >= 24){
long streamId = byteBuf.readLong();
long dataOffset = byteBuf.readLong();
@@ -42,18 +39,13 @@ public class DataStreamRequestDecoder extends
ByteToMessageDecoder {
if(byteBuf.readableBytes() >= dataLength){
ByteBuf bf = byteBuf.slice(byteBuf.readerIndex(), (int)dataLength);
bf.retain();
- DataStreamRequestServer req = new DataStreamRequestServer(streamId,
- dataOffset,
- bf);
+ final DataStreamRequestByteBuf req = new
DataStreamRequestByteBuf(streamId, dataOffset, bf);
byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
byteBuf.markReaderIndex();
list.add(req);
} else {
byteBuf.resetReaderIndex();
- return;
}
- } else{
- return;
}
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 1c051c4..bf0e46a 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -18,11 +18,8 @@
package org.apache.ratis.netty.server;
-import org.apache.ratis.netty.decoders.DataStreamRequestDecoder;
-import org.apache.ratis.netty.encoders.DataStreamReplyEncoder;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamReplyImpl;
-import org.apache.ratis.protocol.DataStreamRequestServer;
+import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
@@ -65,13 +62,13 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
- DataStreamRequestServer req = (DataStreamRequestServer)msg;
+ final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
ByteBuffer[] bfs = req.getBuf().nioBuffers();
for(int i = 0; i < bfs.length; i++){
fileChannel.write(bfs[i]);
}
req.getBuf().release();
- DataStreamReply reply = new DataStreamReplyImpl(req.getStreamId(),
+ final DataStreamReply reply = new
DataStreamReplyByteBuffer(req.getStreamId(),
req.getDataOffset(),
ByteBuffer.wrap("OK".getBytes()));
ctx.writeAndFlush(reply);