This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f55f386 RATIS-1031. Implement ratis streaming using netty - Server
code (#177). Contributed by Ansh Khanna
f55f386 is described below
commit f55f3863f8523d50f2782ce3eb7b6477cf5f02f5
Author: anshkhannasbu <[email protected]>
AuthorDate: Mon Aug 17 14:46:31 2020 -0400
RATIS-1031. Implement ratis streaming using netty - Server code (#177).
Contributed by Ansh Khanna
---
.../decoders/RequestDecoderComposite.java | 1 +
.../nettyzerocopy/server/NettyServer.java | 1 +
.../apache/ratis/netty/NettyDataStreamFactory.java | 3 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 141 +++++++++++++++++++++
.../ratis/server/impl/DataStreamServerImpl.java | 62 +++++++++
.../apache/ratis/datastream/TestDataStream.java | 97 ++++++++++++++
6 files changed, 304 insertions(+), 1 deletion(-)
diff --git
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
index 9199b7f..4e4be3c 100644
---
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
+++
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
@@ -50,6 +50,7 @@ public class RequestDecoderComposite extends RequestDecoder {
//System.out.printf("msg id and buflen %d and %d bytes\n", id, buflen,
msg.readableBytes());
try {
ByteBuf bf = msg.slice(msg.readerIndex(), buflen);
+ bf.retain();
req.setBuff(bf);
} catch (Exception e) {
System.out.println(e);
diff --git
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
index bbde62d..60a7409 100644
---
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
+++
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
@@ -56,6 +56,7 @@ public class NettyServer {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
final ResponseData reply = new ResponseData();
RequestDataComposite req = (RequestDataComposite)msg;
+ req.getBuff().release();
reply.setId(req.getDataId());
ctx.writeAndFlush(reply);
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 2480188..588b7e7 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.netty.client.NettyClientStreamRpc;
+import org.apache.ratis.netty.server.NettyServerStreamRpc;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DataStreamServerFactory;
@@ -42,6 +43,6 @@ public class NettyDataStreamFactory implements
DataStreamServerFactory, DataStre
@Override
public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server) {
- return null;
+ return new NettyServerStreamRpc(server);
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
new file mode 100644
index 0000000..1c051c4
--- /dev/null
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.netty.decoders.DataStreamRequestDecoder;
+import org.apache.ratis.netty.encoders.DataStreamReplyEncoder;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamRequestServer;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.ratis.thirdparty.io.netty.channel.*;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
+import
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
+import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
+import org.apache.ratis.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.TimeUnit;
+
+public class NettyServerStreamRpc implements DataStreamServerRpc {
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyServerStreamRpc.class);
+
+ private RaftPeer raftServer;
+ private EventLoopGroup bossGroup = new NioEventLoopGroup();
+ private EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private ChannelFuture channelFuture;
+ private RandomAccessFile stream;
+ private FileChannel fileChannel;
+ private File file = new File("client-data-stream");
+
+
+ public NettyServerStreamRpc(RaftPeer server){
+ this.raftServer = server;
+ setupServer();
+ }
+
+ private ChannelInboundHandler getServerHandler(){
+ return new ChannelInboundHandlerAdapter(){
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ DataStreamRequestServer req = (DataStreamRequestServer)msg;
+ ByteBuffer[] bfs = req.getBuf().nioBuffers();
+ for(int i = 0; i < bfs.length; i++){
+ fileChannel.write(bfs[i]);
+ }
+ req.getBuf().release();
+ DataStreamReply reply = new DataStreamReplyImpl(req.getStreamId(),
+ req.getDataOffset(),
+
ByteBuffer.wrap("OK".getBytes()));
+ ctx.writeAndFlush(reply);
+ }
+ };
+ }
+
+ private ChannelInitializer<SocketChannel> getInitializer(){
+ return new ChannelInitializer<SocketChannel>(){
+ @Override
+ public void initChannel(SocketChannel ch)
+ throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new DataStreamRequestDecoder());
+ p.addLast(new DataStreamReplyEncoder());
+ p.addLast(getServerHandler());
+ }
+ };
+ }
+
+ public void setupServer(){
+ channelFuture = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(getInitializer())
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .localAddress(NetUtils.createSocketAddr(raftServer.getAddress()))
+ .bind();
+ try {
+ stream = new RandomAccessFile(file, "rw");
+ fileChannel = stream.getChannel();
+ } catch (FileNotFoundException e){
+ LOG.info("exception cause is {}", e.getCause());
+ }
+ }
+
+ private Channel getChannel() {
+ return channelFuture.awaitUninterruptibly().channel();
+ }
+
+ @Override
+ public void startServer() {
+ channelFuture.syncUninterruptibly();
+ }
+
+ @Override
+ public void closeServer() {
+ final ChannelFuture f = getChannel().close();
+ try {
+ stream.close();
+ file.delete();
+ fileChannel.close();
+ } catch (IOException e){
+ LOG.info("Unable to close file on server");
+ }
+ f.syncUninterruptibly();
+ bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
+ workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
+ try {
+ bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupt EventLoopGroup terminate", e);
+ }
+ }
+}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
new file mode 100644
index 0000000..38a0bc2
--- /dev/null
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.DataStreamServer;
+import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.server.DataStreamServerRpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStreamServerImpl implements DataStreamServer {
+ public static final Logger LOG =
LoggerFactory.getLogger(DataStreamServerImpl.class);
+
+ private DataStreamServerRpc serverRpc;
+ private RaftPeer raftServer;
+ private Parameters parameters;
+ private RaftProperties properties;
+
+ public DataStreamServerImpl(RaftPeer server,
+ RaftProperties properties,
+ Parameters parameters){
+ this.raftServer = server;
+ this.parameters = parameters;
+ this.properties = properties;
+
+ final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
+
+ this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+ .newDataStreamServerRpc(server);
+ }
+
+ @Override
+ public DataStreamServerRpc getServerRpc() {
+ return serverRpc;
+ }
+
+ @Override
+ public void close(){
+ serverRpc.closeServer();
+ }
+}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
new file mode 100644
index 0000000..f773216
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.impl.DataStreamClientImpl;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.util.NetUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+public class TestDataStream extends BaseTest {
+
+ private RaftPeer[] peers;
+ private RaftProperties properties;
+ private DataStreamServerImpl server;
+ private DataStreamClientImpl client;
+ private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+
+ public void setupServer(){
+ server = new DataStreamServerImpl(peers[0], properties, null);
+ server.getServerRpc().startServer();
+ }
+
+ public void setupClient(){
+ client = new DataStreamClientImpl(peers[0], properties, null);
+ client.start();
+ }
+
+ public void shutDownSetup(){
+ client.close();
+ server.close();
+ }
+
+ @Test
+ public void testDataStream(){
+ properties = new RaftProperties();
+ peers = Arrays.asList(MiniRaftCluster.generateIds(1, 0)).stream()
+ .map(RaftPeerId::valueOf)
+ .map(id -> new RaftPeer(id,
NetUtils.createLocalServerAddress()))
+ .toArray(RaftPeer[]::new);
+
+ setupServer();
+ setupClient();
+ runTestDataStream();
+ }
+
+ public void runTestDataStream(){
+ DataStreamOutput stream = client.stream();
+ ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
+ for (int i = 0; i < bf.capacity(); i++) {
+ bf.put((byte)'a');
+ }
+ bf.flip();
+ int i = 0;
+ while(i < 2){
+ bf.position(0).limit(bf.capacity());
+ futures.add(stream.streamAsync(bf));
+ i++;
+ }
+ try {
+ Thread.sleep(1000*3);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ shutDownSetup();
+ for(i = 0; i < futures.size(); i++){
+ Assert.assertTrue(futures.get(i).isDone());
+ }
+ }
+}