This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f55f386  RATIS-1031. Implement ratis streaming using netty - Server 
code (#177). Contributed by Ansh Khanna
f55f386 is described below

commit f55f3863f8523d50f2782ce3eb7b6477cf5f02f5
Author: anshkhannasbu <[email protected]>
AuthorDate: Mon Aug 17 14:46:31 2020 -0400

    RATIS-1031. Implement ratis streaming using netty - Server code (#177). 
Contributed by Ansh Khanna
---
 .../decoders/RequestDecoderComposite.java          |   1 +
 .../nettyzerocopy/server/NettyServer.java          |   1 +
 .../apache/ratis/netty/NettyDataStreamFactory.java |   3 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   | 141 +++++++++++++++++++++
 .../ratis/server/impl/DataStreamServerImpl.java    |  62 +++++++++
 .../apache/ratis/datastream/TestDataStream.java    |  97 ++++++++++++++
 6 files changed, 304 insertions(+), 1 deletion(-)

diff --git 
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
 
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
index 9199b7f..4e4be3c 100644
--- 
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
+++ 
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/decoders/RequestDecoderComposite.java
@@ -50,6 +50,7 @@ public class RequestDecoderComposite extends RequestDecoder {
         //System.out.printf("msg id and buflen %d and %d bytes\n", id, buflen, 
msg.readableBytes());
         try {
           ByteBuf bf = msg.slice(msg.readerIndex(), buflen);
+          bf.retain();
           req.setBuff(bf);
         } catch (Exception e) {
           System.out.println(e);
diff --git 
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
 
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
index bbde62d..60a7409 100644
--- 
a/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
+++ 
b/ratis-experiments/src/main/java/org/apache/ratis/experiments/nettyzerocopy/server/NettyServer.java
@@ -56,6 +56,7 @@ public class NettyServer {
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
         final ResponseData reply = new ResponseData();
         RequestDataComposite req = (RequestDataComposite)msg;
+        req.getBuff().release();
         reply.setId(req.getDataId());
         ctx.writeAndFlush(reply);
       }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 2480188..588b7e7 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.netty.client.NettyClientStreamRpc;
+import org.apache.ratis.netty.server.NettyServerStreamRpc;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.DataStreamServerFactory;
@@ -42,6 +43,6 @@ public class NettyDataStreamFactory implements 
DataStreamServerFactory, DataStre
 
   @Override
   public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server) {
-    return null;
+    return new NettyServerStreamRpc(server);
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
new file mode 100644
index 0000000..1c051c4
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ratis.netty.server;
+
+import org.apache.ratis.netty.decoders.DataStreamRequestDecoder;
+import org.apache.ratis.netty.encoders.DataStreamReplyEncoder;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamRequestServer;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.ratis.thirdparty.io.netty.channel.*;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
+import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.ratis.thirdparty.io.netty.handler.logging.LogLevel;
+import org.apache.ratis.thirdparty.io.netty.handler.logging.LoggingHandler;
+import org.apache.ratis.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.TimeUnit;
+
+public class NettyServerStreamRpc implements DataStreamServerRpc {
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyServerStreamRpc.class);
+
+  private RaftPeer raftServer;
+  private EventLoopGroup bossGroup = new NioEventLoopGroup();
+  private EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private ChannelFuture channelFuture;
+  private RandomAccessFile stream;
+  private FileChannel fileChannel;
+  private File file = new File("client-data-stream");
+
+
+  public NettyServerStreamRpc(RaftPeer server){
+    this.raftServer = server;
+    setupServer();
+  }
+
+  private ChannelInboundHandler getServerHandler(){
+    return new ChannelInboundHandlerAdapter(){
+      @Override
+      public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        DataStreamRequestServer req = (DataStreamRequestServer)msg;
+        ByteBuffer[] bfs = req.getBuf().nioBuffers();
+        for(int i = 0; i < bfs.length; i++){
+          fileChannel.write(bfs[i]);
+        }
+        req.getBuf().release();
+        DataStreamReply reply = new DataStreamReplyImpl(req.getStreamId(),
+                                                        req.getDataOffset(),
+                                                        
ByteBuffer.wrap("OK".getBytes()));
+        ctx.writeAndFlush(reply);
+      }
+    };
+  }
+
+  private ChannelInitializer<SocketChannel> getInitializer(){
+    return new ChannelInitializer<SocketChannel>(){
+      @Override
+      public void initChannel(SocketChannel ch)
+          throws Exception {
+        ChannelPipeline p = ch.pipeline();
+        p.addLast(new DataStreamRequestDecoder());
+        p.addLast(new DataStreamReplyEncoder());
+        p.addLast(getServerHandler());
+      }
+    };
+  }
+
+  public void setupServer(){
+    channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(raftServer.getAddress()))
+        .bind();
+    try {
+      stream = new RandomAccessFile(file, "rw");
+      fileChannel = stream.getChannel();
+    } catch (FileNotFoundException e){
+      LOG.info("exception cause is {}", e.getCause());
+    }
+  }
+
+  private Channel getChannel() {
+    return channelFuture.awaitUninterruptibly().channel();
+  }
+
+  @Override
+  public void startServer() {
+    channelFuture.syncUninterruptibly();
+  }
+
+  @Override
+  public void closeServer() {
+    final ChannelFuture f = getChannel().close();
+    try {
+      stream.close();
+      file.delete();
+      fileChannel.close();
+    } catch (IOException e){
+      LOG.info("Unable to close file on server");
+    }
+    f.syncUninterruptibly();
+    bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
+    workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
+    try {
+      bossGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
+      workerGroup.awaitTermination(1000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupt EventLoopGroup terminate", e);
+    }
+  }
+}
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
new file mode 100644
index 0000000..38a0bc2
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.DataStreamServer;
+import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.server.DataStreamServerRpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataStreamServerImpl implements DataStreamServer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DataStreamServerImpl.class);
+
+  private DataStreamServerRpc serverRpc;
+  private RaftPeer raftServer;
+  private Parameters parameters;
+  private RaftProperties properties;
+
+  public DataStreamServerImpl(RaftPeer server,
+                      RaftProperties properties,
+                       Parameters parameters){
+    this.raftServer = server;
+    this.parameters = parameters;
+    this.properties = properties;
+
+    final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
+
+    this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
+                      .newDataStreamServerRpc(server);
+  }
+
+  @Override
+  public DataStreamServerRpc getServerRpc() {
+    return serverRpc;
+  }
+
+  @Override
+  public void close(){
+    serverRpc.closeServer();
+  }
+}
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
new file mode 100644
index 0000000..f773216
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.impl.DataStreamClientImpl;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.util.NetUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+public class TestDataStream extends BaseTest {
+
+  private RaftPeer[] peers;
+  private RaftProperties properties;
+  private DataStreamServerImpl server;
+  private DataStreamClientImpl client;
+  private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+
+  public void setupServer(){
+    server = new DataStreamServerImpl(peers[0], properties, null);
+    server.getServerRpc().startServer();
+  }
+
+  public void setupClient(){
+    client = new DataStreamClientImpl(peers[0], properties, null);
+    client.start();
+  }
+
+  public void shutDownSetup(){
+    client.close();
+    server.close();
+  }
+
+  @Test
+  public void testDataStream(){
+    properties = new RaftProperties();
+    peers = Arrays.asList(MiniRaftCluster.generateIds(1, 0)).stream()
+                       .map(RaftPeerId::valueOf)
+                       .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress()))
+                       .toArray(RaftPeer[]::new);
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  public void runTestDataStream(){
+    DataStreamOutput stream = client.stream();
+    ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
+    for (int i = 0; i < bf.capacity(); i++) {
+      bf.put((byte)'a');
+    }
+    bf.flip();
+    int i = 0;
+    while(i < 2){
+      bf.position(0).limit(bf.capacity());
+      futures.add(stream.streamAsync(bf));
+      i++;
+    }
+    try {
+      Thread.sleep(1000*3);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    shutDownSetup();
+    for(i = 0; i < futures.size(); i++){
+      Assert.assertTrue(futures.get(i).isDone());
+    }
+  }
+}

Reply via email to