szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507109304
##########
File path:
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
##########
@@ -40,4 +40,4 @@ public long getDataLength() {
public ByteBuffer slice() {
return buffer.slice();
}
-}
+}
Review comment:
Please revert whitespace like this
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -166,8 +213,11 @@ public void runTestDataStream(){
Assert.assertEquals(writeRequest.getCallId(),
impl.getHeader().getCallId());
Assert.assertEquals(writeRequest.getRaftGroupId(),
impl.getHeader().getRaftGroupId());
Assert.assertEquals(writeRequest.getServerId(),
impl.getHeader().getServerId());
-
- Assert.assertEquals(dataSize, byteWritten);
+ int actualBytesWritten = 0;
+ for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
+ actualBytesWritten += s.getByteWritten();
+ }
+ Assert.assertEquals(dataSize * peers.size(), actualBytesWritten);
Review comment:
Check individual sizes.
```
for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
Assert.assertEquals(dataSize, s.getByteWritten());
}
```
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
final ByteBuf buf = request.slice();
- final AtomicBoolean released = new AtomicBoolean();
- streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf, released))
- .thenApply(stream -> writeTo(buf, stream, released.get()))
- .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+ final AtomicBoolean shouldRelease = new AtomicBoolean();
+
+ CompletableFuture<?>[] parallelWrites = new
CompletableFuture<?>[clients.size() + 1];
+ CompletableFuture<?> localWrites =
+ streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf, shouldRelease))
+ .thenApply(stream -> writeTo(buf, stream, shouldRelease.get()));
+ parallelWrites[0] = localWrites;
Review comment:
We may use isHeader to determine the cases.
```
final CompletableFuture<?> localWrites = isHeader?
streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf))
: streams.get(request.getStreamId()).thenApply(stream ->
writeTo(buf, stream));
parallelWrites[parallelWrites.length - 1] = localWrites;
```
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,31 +65,44 @@
private final StateMachine stateMachine;
private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput
= new ConcurrentHashMap<>();
+
+ private List<DataStreamClientImpl> clients = new ArrayList<>();
Review comment:
NettyServerStreamRpc should use the public internal api but not the
impls. Therefore, it should use DataStreamClient instead of
DataStreamClientImpl.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -146,6 +190,13 @@ ChannelFuture buildChannel() {
.bind();
}
+ public void setupClient(List<RaftPeer> otherPeers, RaftProperties
properties) {
+ for (RaftPeer peer : otherPeers) {
+ DataStreamClientImpl impl = new DataStreamClientImpl(peer, properties,
null);
Review comment:
Use DataStreamClient.Builder
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
final ByteBuf buf = request.slice();
- final AtomicBoolean released = new AtomicBoolean();
- streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf, released))
- .thenApply(stream -> writeTo(buf, stream, released.get()))
- .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+ final AtomicBoolean shouldRelease = new AtomicBoolean();
Review comment:
Change it to
```
final boolean isHeader = request.getStreamOffset() == -1;
```
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
final ByteBuf buf = request.slice();
- final AtomicBoolean released = new AtomicBoolean();
- streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf, released))
- .thenApply(stream -> writeTo(buf, stream, released.get()))
- .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+ final AtomicBoolean shouldRelease = new AtomicBoolean();
+
+ CompletableFuture<?>[] parallelWrites = new
CompletableFuture<?>[clients.size() + 1];
+ CompletableFuture<?> localWrites =
+ streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf, shouldRelease))
+ .thenApply(stream -> writeTo(buf, stream, shouldRelease.get()));
+ parallelWrites[0] = localWrites;
+ peersStreamOutput.putIfAbsent(request.getStreamId(),
getDataStreamOutput());
+
+ for (int i = 0; i < clients.size(); i++) {
+ // do not need to forward header request
+ if (request.getStreamOffset() == -1) {
+ DataStreamClientImpl.DataStreamOutputImpl impl =
+ (DataStreamClientImpl.DataStreamOutputImpl)
peersStreamOutput.get(request.getStreamId()).get(i);
+ parallelWrites[i + 1] = impl.getHeaderFuture();
+ } else {
+ CompletableFuture<?> remoteWrites;
+ if (shouldRelease.get()) {
+ remoteWrites = new CompletableFuture<>();
+ } else {
+ remoteWrites =
+
peersStreamOutput.get(request.getStreamId()).get(i).writeAsync(request.slice().nioBuffer());
+ }
+ parallelWrites[i + 1] = remoteWrites;
+ }
+ }
Review comment:
Similar, use isHeader for the remote writes
```
if (isHeader) {
// header
final List<DataStreamOutput> outs = getDataStreamOutput();
peersStreamOutput.put(request.getStreamId(), outs);
for (int i = 0; i < outs.size(); i++) {
parallelWrites[i] = outs.get(i).getHeaderFuture();
}
} else {
// body
final List<DataStreamOutput> outs =
peersStreamOutput.get(request.getStreamId());
for (int i = 0; i < clients.size(); i++) {
parallelWrites[i] =
outs.get(i).writeAsync(request.slice().nioBuffer());
}
}
```
Also, add getHeaderFuture() to DataStreamClient.
##########
File path:
ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
*/
void startServer();
+ /**
+ * start clients that used to forward requests to peers.
+ */
+ void startClientToPeers();
Review comment:
The correct way to make startClient to retry. In a distributed system,
we cannot assume that all the servers are already started.
It needs more works for retrying. I am fine to add
NettyServerStreamRpc.startClientToPeers as a work around in the mean time.
However, please remove DataStreamServerRpc.startClientToPeers.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -18,9 +18,17 @@
package org.apache.ratis.netty.server;
+import java.util.stream.Collectors;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.impl.DataStreamClientImpl;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
Review comment:
Please sort the new imports with the existing imports.
##########
File path:
ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -155,6 +206,13 @@ public void startServer() {
channelFuture.syncUninterruptibly();
}
+ @Override
+ public void startClientToPeers() {
+ for (DataStreamClientImpl client : clients) {
+ client.start();
Review comment:
Add start() to DataStreamClient.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]