This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 694fe6f RATIS-1082. Netty stream server should forward the data to
the other servers in the group (#213)
694fe6f is described below
commit 694fe6f5985db3a2ffb1608f08b78769f0ba5abf
Author: Rui Wang <[email protected]>
AuthorDate: Mon Oct 19 19:23:36 2020 -0700
RATIS-1082. Netty stream server should forward the data to the other
servers in the group (#213)
* RATIS-1082. Netty stream server should forward the data to other server
in the group
* update
* update
* fixup! address comments
* fixup! update
* fixup! address comments
* fixup! fix checkstyle
* fixup! sddress comments
* fixup! address comments
* address comments
* address comments
* trigger new CI check
---
.../org/apache/ratis/client/DataStreamClient.java | 7 ++
.../apache/ratis/client/api/DataStreamOutput.java | 3 +
.../ratis/client/impl/DataStreamClientImpl.java | 2 +
.../apache/ratis/netty/NettyDataStreamFactory.java | 7 ++
.../ratis/netty/server/NettyServerStreamRpc.java | 108 +++++++++++++++------
.../ratis/server/DataStreamServerFactory.java | 9 ++
.../ratis/server/impl/DataStreamServerImpl.java | 14 +++
.../apache/ratis/datastream/TestDataStream.java | 88 +++++++++++++----
8 files changed, 192 insertions(+), 46 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index f49dc5f..201f3a8 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -42,6 +42,13 @@ public interface DataStreamClient extends DataStreamApi {
/** close the client */
void close();
+ /** start the client */
+ void start();
+
+ static Builder newBuilder() {
+ return new Builder();
+ }
+
/** To build {@link DataStreamClient} objects */
class Builder {
private RaftPeer raftServer;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index dd86569..39f62ca 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -27,4 +27,7 @@ import java.util.concurrent.CompletableFuture;
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
/** Send out the data in the buffer asynchronously */
CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
+
+ /** Get the future of the header request. */
+ CompletableFuture<DataStreamReply> getHeaderFuture();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 96c0cf0..9149b74 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -98,6 +98,7 @@ public class DataStreamClientImpl implements DataStreamClient
{
return header;
}
+ @Override
public CompletableFuture<DataStreamReply> getHeaderFuture() {
return headerFuture;
}
@@ -123,6 +124,7 @@ public class DataStreamClientImpl implements
DataStreamClient {
dataStreamClientRpc.closeClient();
}
+ @Override
public void start(){
dataStreamClientRpc.startClient();
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 565148a..f9af46e 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.netty;
+import java.util.List;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamClientFactory;
import org.apache.ratis.conf.Parameters;
@@ -46,4 +47,10 @@ public class NettyDataStreamFactory implements
DataStreamServerFactory, DataStre
public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server,
StateMachine stateMachine) {
return new NettyServerStreamRpc(server, stateMachine);
}
+
+ @Override
+ public DataStreamServerRpc newDataStreamServerRpc(
+ RaftPeer server, List<RaftPeer> peers, StateMachine stateMachine,
RaftProperties properties) {
+ return new NettyServerStreamRpc(server, peers, stateMachine, properties);
+ }
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index c8092ef..18a4695 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -18,9 +18,12 @@
package org.apache.ratis.netty.server;
+import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
@@ -39,14 +42,16 @@ import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.nio.ByteBuffer;
+import java.util.List;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -58,6 +63,9 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private final StateMachine stateMachine;
private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput
= new ConcurrentHashMap<>();
+
+ private List<DataStreamClient> clients = new ArrayList<>();
public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
this.raftServer = server;
@@ -65,44 +73,45 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
this.channelFuture = buildChannel();
}
- private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf,
AtomicBoolean released) {
+ public NettyServerStreamRpc(
+ RaftPeer server, List<RaftPeer> otherPeers,
+ StateMachine stateMachine, RaftProperties properties){
+ this(server, stateMachine);
+ setupClient(otherPeers, properties);
+ }
+
+ private List<DataStreamOutput> getDataStreamOutput() {
+ return clients.stream().map(client ->
client.stream()).collect(Collectors.toList());
+ }
+
+ private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
try {
final RaftClientRequest request =
ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
return stateMachine.data().stream(request);
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
- } finally {
- buf.release();
- released.set(true);
}
}
- private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
- if (released) {
+ private long writeTo(ByteBuf buf, DataStream stream) {
+ if (stream == null) {
return 0;
}
- try {
- if (stream == null) {
- return 0;
- }
- final WritableByteChannel channel = stream.getWritableByteChannel();
- long byteWritten = 0;
- for (ByteBuffer buffer : buf.nioBuffers()) {
- try {
- byteWritten += channel.write(buffer);
- } catch (Throwable t) {
- throw new CompletionException(t);
- }
+ final WritableByteChannel channel = stream.getWritableByteChannel();
+ long byteWritten = 0;
+ for (ByteBuffer buffer : buf.nioBuffers()) {
+ try {
+ byteWritten += channel.write(buffer);
+ } catch (Throwable t) {
+ throw new CompletionException(t);
}
- return byteWritten;
- } finally {
- buf.release();
}
+ return byteWritten;
}
- private void sendReply(DataStreamRequestByteBuf request, long byteWritten,
ChannelHandlerContext ctx) {
+ private void sendReply(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
// TODO RATIS-1098: include byteWritten and isSuccess in the reply
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
request.getStreamId(), request.getStreamOffset(),
ByteBuffer.wrap("OK".getBytes()));
@@ -115,10 +124,32 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
final ByteBuf buf = request.slice();
- final AtomicBoolean released = new AtomicBoolean();
- streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf, released))
- .thenApply(stream -> writeTo(buf, stream, released.get()))
- .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+ final boolean isHeader = request.getStreamOffset() == -1;
+
+ CompletableFuture<?>[] parallelWrites = new
CompletableFuture<?>[clients.size() + 1];
+
+ final CompletableFuture<?> localWrites = isHeader?
+ streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf))
+ : streams.get(request.getStreamId()).thenApply(stream ->
writeTo(buf, stream));
+ parallelWrites[0] = localWrites;
+ peersStreamOutput.putIfAbsent(request.getStreamId(),
getDataStreamOutput());
+
+ // do not need to forward header request
+ if (isHeader) {
+ for (int i = 0; i <
peersStreamOutput.get(request.getStreamId()).size(); i++) {
+ parallelWrites[i + 1] =
peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+ }
+ } else {
+ // body
+ for (int i = 0; i < clients.size(); i++) {
+ parallelWrites[i + 1] =
+
peersStreamOutput.get(request.getStreamId()).get(i).writeAsync(request.slice().nioBuffer());
+ }
+ }
+ CompletableFuture.allOf(parallelWrites).whenComplete((t, r) -> {
+ buf.release();
+ sendReply(request, ctx);
+ });
}
};
}
@@ -146,6 +177,16 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
.bind();
}
+ private void setupClient(List<RaftPeer> otherPeers, RaftProperties
properties) {
+ for (RaftPeer peer : otherPeers) {
+ clients.add(DataStreamClient.newBuilder()
+ .setParameters(null)
+ .setRaftServer(peer)
+ .setProperties(properties)
+ .build());
+ }
+ }
+
private Channel getChannel() {
return channelFuture.awaitUninterruptibly().channel();
}
@@ -155,6 +196,13 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
channelFuture.syncUninterruptibly();
}
+ // TODO: RATIS-1099 build connection with other server automatically.
+ public void startClientToPeers() {
+ for (DataStreamClient client : clients) {
+ client.start();
+ }
+ }
+
@Override
public void closeServer() {
final ChannelFuture f = getChannel().close();
@@ -167,5 +215,9 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
} catch (InterruptedException e) {
LOG.error("Interrupt EventLoopGroup terminate", e);
}
+
+ for (DataStreamClient client : clients) {
+ client.close();
+ }
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index f2800d9..b9390ee 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.server;
+import java.util.List;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.ServerFactory;
@@ -37,4 +39,11 @@ public interface DataStreamServerFactory extends
DataStreamFactory {
* Server implementation for streaming in Raft group
*/
DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine
stateMachine);
+
+ /**
+ * Server implementation for streaming in Raft group. The server will
forward requests
+ * to peers.
+ */
+ DataStreamServerRpc newDataStreamServerRpc(
+ RaftPeer server, List<RaftPeer> peers, StateMachine stateMachine,
RaftProperties properties);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index bfa3fe7..4a3cdc7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
+import java.util.List;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
@@ -48,6 +49,19 @@ public class DataStreamServerImpl implements
DataStreamServer {
.newDataStreamServerRpc(raftServer, stateMachine);
}
+ public DataStreamServerImpl(RaftPeer server,
+ RaftProperties properties,
+ Parameters parameters,
+ StateMachine stateMachine,
+ List<RaftPeer> peers){
+ this.raftServer = server;
+ this.stateMachine = stateMachine;
+ final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
+
+ this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+ .newDataStreamServerRpc(server, peers, stateMachine, properties);
+ }
+
@Override
public DataStreamServerRpc getServerRpc() {
return serverRpc;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index cca6c94..84bbc79 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -18,11 +18,13 @@
package org.apache.ratis.datastream;
+import java.util.stream.Collectors;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.server.NettyServerStreamRpc;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -50,6 +52,9 @@ public class TestDataStream extends BaseTest {
}
class SingleDataStreamStateMachine extends BaseStateMachine {
+ private int byteWritten = 0;
+ private RaftClientRequest writeRequest;
+
final WritableByteChannel channel = new WritableByteChannel() {
private volatile boolean open = true;
@@ -61,7 +66,7 @@ public class TestDataStream extends BaseTest {
final int remaining = src.remaining();
for(; src.remaining() > 0; ) {
Assert.assertEquals(pos2byte(byteWritten), src.get());
- byteWritten++;
+ byteWritten += 1;
}
return remaining;
}
@@ -99,28 +104,58 @@ public class TestDataStream extends BaseTest {
writeRequest = request;
return CompletableFuture.completedFuture(stream);
}
+
+ public int getByteWritten() {
+ return byteWritten;
+ }
+
+ public RaftClientRequest getWriteRequest() {
+ return writeRequest;
+ }
}
- private RaftPeer[] peers;
+ private List<RaftPeer> peers;
private RaftProperties properties;
- private DataStreamServerImpl server;
+ private List<DataStreamServerImpl> servers;
private DataStreamClientImpl client;
- private int byteWritten = 0;
- private RaftClientRequest writeRequest;
+ private List<SingleDataStreamStateMachine> singleDataStreamStateMachines;
+
+ private void setupServer(){
+ servers = new ArrayList<>(peers.size());
+ singleDataStreamStateMachines = new ArrayList<>(peers.size());
+ // start stream servers on raft peers.
+ for (int i = 0; i < peers.size(); i++) {
+ SingleDataStreamStateMachine singleDataStreamStateMachine = new
SingleDataStreamStateMachine();
+ singleDataStreamStateMachines.add(singleDataStreamStateMachine);
+ DataStreamServerImpl streamServer;
+ if (i == 0) {
+ // only the first server routes requests to peers.
+ List<RaftPeer> otherPeers = new ArrayList<>(peers);
+ otherPeers.remove(peers.get(i));
+ streamServer = new DataStreamServerImpl(
+ peers.get(i), properties, null, singleDataStreamStateMachine,
otherPeers);
+ } else {
+ streamServer = new DataStreamServerImpl(
+ peers.get(i), singleDataStreamStateMachine, properties, null);
+ }
+ servers.add(streamServer);
+ streamServer.getServerRpc().startServer();
+ }
- public void setupServer(){
- server = new DataStreamServerImpl(peers[0], new
SingleDataStreamStateMachine(), properties, null);
- server.getServerRpc().startServer();
+ // start peer clients on stream servers
+ for (DataStreamServerImpl streamServer : servers) {
+ ((NettyServerStreamRpc)
streamServer.getServerRpc()).startClientToPeers();
+ }
}
- public void setupClient(){
- client = new DataStreamClientImpl(peers[0], properties, null);
+ private void setupClient(){
+ client = new DataStreamClientImpl(peers.get(0), properties, null);
client.start();
}
public void shutDownSetup(){
client.close();
- server.close();
+ servers.stream().forEach(s -> s.close());
}
@Test
@@ -128,8 +163,21 @@ public class TestDataStream extends BaseTest {
properties = new RaftProperties();
peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
.map(RaftPeerId::valueOf)
- .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress()))
- .toArray(RaftPeer[]::new);
+ .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress())).collect(
+ Collectors.toList());
+
+ setupServer();
+ setupClient();
+ runTestDataStream();
+ }
+
+ @Test
+ public void testDataStreamMultipleServer(){
+ properties = new RaftProperties();
+ peers = Arrays.asList(MiniRaftCluster.generateIds(3, 0)).stream()
+ .map(RaftPeerId::valueOf)
+ .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress())).collect(
+ Collectors.toList());
setupServer();
setupClient();
@@ -162,12 +210,16 @@ public class TestDataStream extends BaseTest {
f.join();
}
- Assert.assertEquals(writeRequest.getClientId(),
impl.getHeader().getClientId());
- Assert.assertEquals(writeRequest.getCallId(),
impl.getHeader().getCallId());
- Assert.assertEquals(writeRequest.getRaftGroupId(),
impl.getHeader().getRaftGroupId());
- Assert.assertEquals(writeRequest.getServerId(),
impl.getHeader().getServerId());
+ for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
+ RaftClientRequest writeRequest = s.getWriteRequest();
+ if (writeRequest.getClientId().equals(impl.getHeader().getClientId())) {
+ Assert.assertEquals(writeRequest.getCallId(),
impl.getHeader().getCallId());
+ Assert.assertEquals(writeRequest.getRaftGroupId(),
impl.getHeader().getRaftGroupId());
+ Assert.assertEquals(writeRequest.getServerId(),
impl.getHeader().getServerId());
+ }
+ Assert.assertEquals(dataSize, s.getByteWritten());
+ }
- Assert.assertEquals(dataSize, byteWritten);
shutDownSetup();
}