This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2d72fbf RATIS-1081. Change NettyServerStreamRpc to write data to
StateMachine. (#212)
2d72fbf is described below
commit 2d72fbf1a3e6ea044ced410d60d5fcc20595ee77
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 6 08:42:52 2020 +0800
RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.
(#212)
* RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.
* Rewrote TestDataStream.
* Some minor changes in TestDataStream.
* Send reply after write.
---
.../org/apache/ratis/client/DataStreamClient.java | 2 +-
.../apache/ratis/netty/NettyDataStreamFactory.java | 5 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 109 +++++++++++-------
.../ratis/server/DataStreamServerFactory.java | 3 +-
.../ratis/server/impl/DataStreamServerImpl.java | 14 +--
.../apache/ratis/datastream/TestDataStream.java | 127 +++++++++++++++++----
6 files changed, 185 insertions(+), 75 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index ae94aab..5762ca0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -36,7 +36,7 @@ public interface DataStreamClient {
/** Return the rpc client instance **/
DataStreamClientRpc getClientRpc();
- /** Return Streamer Api instance. */
+ /** @return a new output stream. */
DataStreamOutput stream();
/** add information of the raft peers to communicate with */
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 588b7e7..565148a 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -27,6 +27,7 @@ import org.apache.ratis.netty.server.NettyServerStreamRpc;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.statemachine.StateMachine;
public class NettyDataStreamFactory implements DataStreamServerFactory,
DataStreamClientFactory {
public NettyDataStreamFactory(Parameters parameters){}
@@ -42,7 +43,7 @@ public class NettyDataStreamFactory implements
DataStreamServerFactory, DataStre
}
@Override
- public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server) {
- return new NettyServerStreamRpc(server);
+ public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server,
StateMachine stateMachine) {
+ return new NettyServerStreamRpc(server, stateMachine);
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index bf0e46a..3caaafd 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,9 +20,13 @@ package org.apache.ratis.netty.server;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.*;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
@@ -33,45 +37,82 @@ import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG =
LoggerFactory.getLogger(NettyServerStreamRpc.class);
- private RaftPeer raftServer;
- private EventLoopGroup bossGroup = new NioEventLoopGroup();
- private EventLoopGroup workerGroup = new NioEventLoopGroup();
- private ChannelFuture channelFuture;
- private RandomAccessFile stream;
- private FileChannel fileChannel;
- private File file = new File("client-data-stream");
+ private final RaftPeer raftServer;
+ private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+ private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private final ChannelFuture channelFuture;
+ private final StateMachine stateMachine;
+ private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams =
new ConcurrentHashMap<>();
- public NettyServerStreamRpc(RaftPeer server){
+ public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
this.raftServer = server;
- setupServer();
+ this.stateMachine = stateMachine;
+ this.channelFuture = buildChannel();
+ }
+
+ private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf,
AtomicBoolean released) {
+ try {
+ // TODO RATIS-1085: read the request from buf
+ final RaftClientRequest request = null;
+ return stateMachine.data().stream(request);
+ } finally {
+ buf.release();
+ released.set(true);
+ }
+ }
+
+ private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
+ if (released) {
+ return;
+ }
+ try {
+ if (stream == null) {
+ return;
+ }
+
+ final WritableByteChannel channel = stream.getWritableByteChannel();
+ for (ByteBuffer buffer : buf.nioBuffers()) {
+ try {
+ channel.write(buffer);
+ } catch (Throwable t) {
+ throw new CompletionException(t);
+ }
+ }
+ } finally {
+ buf.release();
+ }
+ }
+
+ private void sendReply(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
+ final DataStreamReply reply = new DataStreamReplyByteBuffer(
+ request.getStreamId(), request.getDataOffset(),
ByteBuffer.wrap("OK".getBytes()));
+ ctx.writeAndFlush(reply);
}
private ChannelInboundHandler getServerHandler(){
return new ChannelInboundHandlerAdapter(){
@Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
- ByteBuffer[] bfs = req.getBuf().nioBuffers();
- for(int i = 0; i < bfs.length; i++){
- fileChannel.write(bfs[i]);
- }
- req.getBuf().release();
- final DataStreamReply reply = new
DataStreamReplyByteBuffer(req.getStreamId(),
- req.getDataOffset(),
-
ByteBuffer.wrap("OK".getBytes()));
- ctx.writeAndFlush(reply);
+ final long streamId = req.getStreamId();
+ final ByteBuf buf = req.getBuf();
+ final AtomicBoolean released = new AtomicBoolean();
+ streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf,
released))
+ .thenAccept(stream -> writeTo(buf, stream, released.get()))
+ .thenAccept(dummy -> sendReply(req, ctx));
}
};
}
@@ -79,8 +120,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private ChannelInitializer<SocketChannel> getInitializer(){
return new ChannelInitializer<SocketChannel>(){
@Override
- public void initChannel(SocketChannel ch)
- throws Exception {
+ public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new DataStreamRequestDecoder());
p.addLast(new DataStreamReplyEncoder());
@@ -89,8 +129,8 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
};
}
- public void setupServer(){
- channelFuture = new ServerBootstrap()
+ ChannelFuture buildChannel() {
+ return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
@@ -98,12 +138,6 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
.childOption(ChannelOption.SO_KEEPALIVE, true)
.localAddress(NetUtils.createSocketAddr(raftServer.getAddress()))
.bind();
- try {
- stream = new RandomAccessFile(file, "rw");
- fileChannel = stream.getChannel();
- } catch (FileNotFoundException e){
- LOG.info("exception cause is {}", e.getCause());
- }
}
private Channel getChannel() {
@@ -118,13 +152,6 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
@Override
public void closeServer() {
final ChannelFuture f = getChannel().close();
- try {
- stream.close();
- file.delete();
- fileChannel.close();
- } catch (IOException e){
- LOG.info("Unable to close file on server");
- }
f.syncUninterruptibly();
bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index f22e48f..f2800d9 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server;
import org.apache.ratis.datastream.DataStreamFactory;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.statemachine.StateMachine;
public interface DataStreamServerFactory extends DataStreamFactory {
@@ -35,5 +36,5 @@ public interface DataStreamServerFactory extends
DataStreamFactory {
/**
* Server implementation for streaming in Raft group
*/
- DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);
+ DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine
stateMachine);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 38a0bc2..bfa3fe7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DataStreamServerFactory;
import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,20 +35,17 @@ public class DataStreamServerImpl implements
DataStreamServer {
private DataStreamServerRpc serverRpc;
private RaftPeer raftServer;
- private Parameters parameters;
- private RaftProperties properties;
+ private final StateMachine stateMachine;
- public DataStreamServerImpl(RaftPeer server,
- RaftProperties properties,
- Parameters parameters){
+ public DataStreamServerImpl(RaftPeer server, StateMachine stateMachine,
+ RaftProperties properties, Parameters parameters){
this.raftServer = server;
- this.parameters = parameters;
- this.properties = properties;
+ this.stateMachine = stateMachine;
final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
- .newDataStreamServerRpc(server);
+ .newDataStreamServerRpc(raftServer, stateMachine);
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index ecf7c50..3b5426c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,27 +24,90 @@ import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
public class TestDataStream extends BaseTest {
+ static final int MODULUS = 23;
+
+ static byte pos2byte(int pos) {
+ return (byte) ('A' + pos%MODULUS);
+ }
+
+ class SingleDataStreamStateMachine extends BaseStateMachine {
+ final WritableByteChannel channel = new WritableByteChannel() {
+ private volatile boolean open = true;
+
+ @Override
+ public int write(ByteBuffer src) {
+ if (!open) {
+ throw new IllegalStateException("Already closed");
+ }
+ final int remaining = src.remaining();
+ for(; src.remaining() > 0; ) {
+ Assert.assertEquals(pos2byte(byteWritten), src.get());
+ byteWritten++;
+ }
+ return remaining;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public void close() {
+ open = false;
+ }
+ };
+
+ final DataStream stream = new DataStream() {
+ @Override
+ public WritableByteChannel getWritableByteChannel() {
+ return channel;
+ }
+
+ @Override
+ public CompletableFuture<?> cleanUp() {
+ try {
+ channel.close();
+ } catch (Throwable t) {
+ return JavaUtils.completeExceptionally(t);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+
+ @Override
+ public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+ return CompletableFuture.completedFuture(stream);
+ }
+ }
private RaftPeer[] peers;
private RaftProperties properties;
private DataStreamServerImpl server;
private DataStreamClientImpl client;
- private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ private int byteWritten = 0;
public void setupServer(){
- server = new DataStreamServerImpl(peers[0], properties, null);
+ server = new DataStreamServerImpl(peers[0], new
SingleDataStreamStateMachine(), properties, null);
server.getServerRpc().startServer();
}
@@ -61,7 +124,7 @@ public class TestDataStream extends BaseTest {
@Test
public void testDataStream(){
properties = new RaftProperties();
- peers = Arrays.asList(MiniRaftCluster.generateIds(1, 0)).stream()
+ peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress()))
.toArray(RaftPeer[]::new);
@@ -72,27 +135,47 @@ public class TestDataStream extends BaseTest {
}
public void runTestDataStream(){
- DataStreamOutput stream = client.stream();
- ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
- for (int i = 0; i < bf.capacity(); i++) {
- bf.put((byte)'a');
- }
- bf.flip();
- int i = 0;
+ final int bufferSize = 1024*1024;
+ final int bufferNum = 10;
+ final DataStreamOutput out = client.stream();
- while(i < 1000){
- bf.position(0).limit(bf.capacity());
- futures.add(stream.streamAsync(bf));
- i++;
+ //send request
+ final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ futures.add(sendRequest(out, 1024));
+
+ //send data
+ final int halfBufferSize = bufferSize/2;
+ int dataSize = 0;
+ for(int i = 0; i < bufferNum; i++) {
+ final int size = halfBufferSize +
ThreadLocalRandom.current().nextInt(halfBufferSize);
+ final ByteBuffer bf = initBuffer(dataSize, size);
+ futures.add(out.streamAsync(bf));
+ dataSize += size;
}
- try {
- Thread.sleep(1000*3);
- } catch (InterruptedException e) {
- e.printStackTrace();
+
+ //join all requests
+ for(CompletableFuture<DataStreamReply> f : futures) {
+ f.join();
}
+ Assert.assertEquals(dataSize, byteWritten);
shutDownSetup();
- for(i = 0; i < futures.size(); i++){
- Assert.assertTrue(futures.get(i).isDone());
+ }
+
+ CompletableFuture<DataStreamReply> sendRequest(DataStreamOutput out, int
size) {
+ // TODO RATIS-1085: create a RaftClientRequest and put it in the buffer
+ final ByteBuffer buffer = initBuffer(0, size);
+ return out.streamAsync(buffer);
+ }
+
+ static ByteBuffer initBuffer(int offset, int size) {
+ final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+ final int length = buffer.capacity();
+ buffer.position(0).limit(length);
+ for (int j = 0; j < length; j++) {
+ buffer.put(pos2byte(offset + j));
}
+ buffer.flip();
+ Assert.assertEquals(length, buffer.remaining());
+ return buffer;
}
}