This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new da56f4f RATIS-1114. Change NettyServerStreamRpc to use only one
ConcurrentMap. (#236)
da56f4f is described below
commit da56f4f0b637d750e34ec0f3625c5e3b21409ee6
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 29 07:57:44 2020 +0800
RATIS-1114. Change NettyServerStreamRpc to use only one ConcurrentMap.
(#236)
---
.../ratis/netty/server/NettyServerStreamRpc.java | 137 +++++++++++----------
1 file changed, 73 insertions(+), 64 deletions(-)
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 3cf52e2..9797709 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -26,8 +26,8 @@ import
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
-import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -36,7 +36,6 @@ import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
-import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.*;
@@ -47,7 +46,6 @@ import
org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
-import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.PeerProxyMap;
@@ -67,7 +65,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -120,6 +117,30 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
}
+ static class StreamInfo {
+ private final CompletableFuture<DataStream> stream;
+ private final List<DataStreamOutput> outs;
+ private final AtomicReference<CompletableFuture<?>> previous
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
+
+ StreamInfo(CompletableFuture<DataStream> stream, List<DataStreamOutput>
outs) {
+ this.stream = stream;
+ this.outs = outs;
+ }
+
+ CompletableFuture<DataStream> getStream() {
+ return stream;
+ }
+
+ List<DataStreamOutput> getDataStreamOutputs() {
+ return outs;
+ }
+
+ AtomicReference<CompletableFuture<?>> getPrevious() {
+ return previous;
+ }
+ }
+
private final RaftServer server;
private final String name;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
@@ -127,8 +148,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private final ChannelFuture channelFuture;
private final StateMachine stateMachine;
- private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
- private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput
= new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, StreamInfo> streams = new
ConcurrentHashMap<>();
private final Proxies proxies;
@@ -170,12 +190,12 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
proxies.addPeers(newPeers);
}
- private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
+ private StreamInfo newStreamInfo(ByteBuf buf) {
try {
- final RaftClientRequest request =
-
ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
- return stateMachine.data().stream(request);
- } catch (InvalidProtocolBufferException e) {
+ final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
+ RaftClientRequestProto.parseFrom(buf.nioBuffer()));
+ return new StreamInfo(stateMachine.data().stream(request),
proxies.getDataStreamOutput());
+ } catch (Throwable e) {
throw new CompletionException(e);
}
}
@@ -211,64 +231,58 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
- try {
if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
sendReplyNotSuccess(request, ctx);
} else {
sendReplySuccess(request, bytesWritten, ctx);
}
- } catch (ExecutionException | InterruptedException e) {
- IOUtils.asIOException(e);
- }
}
- private ChannelInboundHandler getServerHandler(){
+ private ChannelInboundHandler newChannelInboundHandlerAdapter(){
return new ChannelInboundHandlerAdapter(){
- private final AtomicReference<CompletableFuture<?>> previous
- = new AtomicReference<>(CompletableFuture.completedFuture(null));
-
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws
IOException {
- final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)
msg;
- final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
-
- final CompletableFuture<Long> localWrite = isHeader ?
- streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf)).thenApply(stream -> 0L)
- : streams.get(request.getStreamId()).thenApply(stream ->
writeTo(buf, stream));
-
- final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
- if (isHeader) {
- // do not need to forward header request
- final List<DataStreamOutput> outs = proxies.getDataStreamOutput();
- peersStreamOutput.put(request.getStreamId(), outs);
- for (DataStreamOutput out : outs) {
- remoteWrites.add(out.getHeaderFuture());
- }
- } else {
- // body
- for (DataStreamOutput out :
peersStreamOutput.get(request.getStreamId())) {
- remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
- }
- }
-
- final CompletableFuture<?> current = previous.get()
- .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
- .thenCombineAsync(localWrite, (v, bytesWritten) -> {
- buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
- return null;
- });
- previous.set(current);
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ read(ctx, (DataStreamRequestByteBuf)msg);
}
};
}
+ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
+ final ByteBuf buf = request.slice();
+ final boolean isHeader = request.getType() == Type.STREAM_HEADER;
+
+ final StreamInfo info;
+ final CompletableFuture<Long> localWrite;
+ final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
+ if (isHeader) {
+ info = streams.computeIfAbsent(request.getStreamId(), id ->
newStreamInfo(buf));
+ localWrite = CompletableFuture.completedFuture(0L);
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.getHeaderFuture());
+ }
+ } else {
+ info = streams.get(request.getStreamId());
+ localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
+ }
+ }
+
+ final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
+ final CompletableFuture<?> current = previous.get()
+ .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
+ .thenCombineAsync(localWrite, (v, bytesWritten) -> {
+ buf.release();
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ return null;
+ });
+ previous.set(current);
+ }
+
private boolean checkSuccessRemoteWrite(
- List<CompletableFuture<DataStreamReply>> replyFutures, long
bytesWritten)
- throws ExecutionException, InterruptedException {
+ List<CompletableFuture<DataStreamReply>> replyFutures, long
bytesWritten) {
for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
- DataStreamReply reply = replyFuture.get();
+ final DataStreamReply reply = replyFuture.join();
if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
return false;
}
@@ -283,7 +297,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
ChannelPipeline p = ch.pipeline();
p.addLast(newDecoder());
p.addLast(newEncoder());
- p.addLast(getServerHandler());
+ p.addLast(newChannelInboundHandlerAdapter());
}
};
}
@@ -310,10 +324,6 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
};
}
- private Channel getChannel() {
- return channelFuture.awaitUninterruptibly().channel();
- }
-
@Override
public void start() {
channelFuture.syncUninterruptibly();
@@ -321,15 +331,14 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void close() {
- final ChannelFuture f = getChannel().close();
- f.syncUninterruptibly();
- bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
- workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
try {
+ channelFuture.channel().close().sync();
+ bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
+ workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- LOG.error("Interrupt EventLoopGroup terminate", e);
+ LOG.error(this + ": Interrupted close()", e);
}
proxies.close();