This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d72fbf  RATIS-1081. Change NettyServerStreamRpc to write data to 
StateMachine. (#212)
2d72fbf is described below

commit 2d72fbf1a3e6ea044ced410d60d5fcc20595ee77
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 6 08:42:52 2020 +0800

    RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine. 
(#212)
    
    * RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.
    
    * Rewrote TestDataStream.
    
    * Some minor changes in TestDataStream.
    
    * Send reply after write.
---
 .../org/apache/ratis/client/DataStreamClient.java  |   2 +-
 .../apache/ratis/netty/NettyDataStreamFactory.java |   5 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   | 109 +++++++++++-------
 .../ratis/server/DataStreamServerFactory.java      |   3 +-
 .../ratis/server/impl/DataStreamServerImpl.java    |  14 +--
 .../apache/ratis/datastream/TestDataStream.java    | 127 +++++++++++++++++----
 6 files changed, 185 insertions(+), 75 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index ae94aab..5762ca0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -36,7 +36,7 @@ public interface DataStreamClient {
   /** Return the rpc client instance **/
   DataStreamClientRpc getClientRpc();
 
-  /** Return Streamer Api instance. */
+  /** @return a new output stream. */
   DataStreamOutput stream();
 
   /** add information of the raft peers to communicate with */
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 588b7e7..565148a 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -27,6 +27,7 @@ import org.apache.ratis.netty.server.NettyServerStreamRpc;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.DataStreamServerFactory;
+import org.apache.ratis.statemachine.StateMachine;
 
 public class NettyDataStreamFactory implements DataStreamServerFactory, 
DataStreamClientFactory {
   public NettyDataStreamFactory(Parameters parameters){}
@@ -42,7 +43,7 @@ public class NettyDataStreamFactory implements 
DataStreamServerFactory, DataStre
   }
 
   @Override
-  public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server) {
-    return new NettyServerStreamRpc(server);
+  public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, 
StateMachine stateMachine) {
+    return new NettyServerStreamRpc(server, stateMachine);
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index bf0e46a..3caaafd 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,9 +20,13 @@ package org.apache.ratis.netty.server;
 
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.*;
 import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
@@ -33,45 +37,82 @@ import org.apache.ratis.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = 
LoggerFactory.getLogger(NettyServerStreamRpc.class);
 
-  private RaftPeer raftServer;
-  private EventLoopGroup bossGroup = new NioEventLoopGroup();
-  private EventLoopGroup workerGroup = new NioEventLoopGroup();
-  private ChannelFuture channelFuture;
-  private RandomAccessFile stream;
-  private FileChannel fileChannel;
-  private File file = new File("client-data-stream");
+  private final RaftPeer raftServer;
+  private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private final ChannelFuture channelFuture;
 
+  private final StateMachine stateMachine;
+  private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = 
new ConcurrentHashMap<>();
 
-  public NettyServerStreamRpc(RaftPeer server){
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
-    setupServer();
+    this.stateMachine = stateMachine;
+    this.channelFuture = buildChannel();
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, 
AtomicBoolean released) {
+    try {
+      // TODO RATIS-1085: read the request from buf
+      final RaftClientRequest request = null;
+      return stateMachine.data().stream(request);
+    } finally {
+      buf.release();
+      released.set(true);
+    }
+  }
+
+  private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
+    if (released) {
+      return;
+    }
+    try {
+      if (stream == null) {
+        return;
+      }
+
+      final WritableByteChannel channel = stream.getWritableByteChannel();
+      for (ByteBuffer buffer : buf.nioBuffers()) {
+        try {
+          channel.write(buffer);
+        } catch (Throwable t) {
+          throw new CompletionException(t);
+        }
+      }
+    } finally {
+      buf.release();
+    }
+  }
+
+  private void sendReply(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx) {
+    final DataStreamReply reply = new DataStreamReplyByteBuffer(
+        request.getStreamId(), request.getDataOffset(), 
ByteBuffer.wrap("OK".getBytes()));
+    ctx.writeAndFlush(reply);
   }
 
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+      public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
-        ByteBuffer[] bfs = req.getBuf().nioBuffers();
-        for(int i = 0; i < bfs.length; i++){
-          fileChannel.write(bfs[i]);
-        }
-        req.getBuf().release();
-        final DataStreamReply reply = new 
DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        
ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+        final long streamId = req.getStreamId();
+        final ByteBuf buf = req.getBuf();
+        final AtomicBoolean released = new AtomicBoolean();
+        streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, 
released))
+            .thenAccept(stream -> writeTo(buf, stream, released.get()))
+            .thenAccept(dummy -> sendReply(req, ctx));
       }
     };
   }
@@ -79,8 +120,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   private ChannelInitializer<SocketChannel> getInitializer(){
     return new ChannelInitializer<SocketChannel>(){
       @Override
-      public void initChannel(SocketChannel ch)
-          throws Exception {
+      public void initChannel(SocketChannel ch) {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new DataStreamRequestDecoder());
         p.addLast(new DataStreamReplyEncoder());
@@ -89,8 +129,8 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     };
   }
 
-  public void setupServer(){
-    channelFuture = new ServerBootstrap()
+  ChannelFuture buildChannel() {
+    return new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .handler(new LoggingHandler(LogLevel.INFO))
@@ -98,12 +138,6 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .localAddress(NetUtils.createSocketAddr(raftServer.getAddress()))
         .bind();
-    try {
-      stream = new RandomAccessFile(file, "rw");
-      fileChannel = stream.getChannel();
-    } catch (FileNotFoundException e){
-      LOG.info("exception cause is {}", e.getCause());
-    }
   }
 
   private Channel getChannel() {
@@ -118,13 +152,6 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
   @Override
   public void closeServer() {
     final ChannelFuture f = getChannel().close();
-    try {
-      stream.close();
-      file.delete();
-      fileChannel.close();
-    } catch (IOException e){
-      LOG.info("Unable to close file on server");
-    }
     f.syncUninterruptibly();
     bossGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
     workerGroup.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index f22e48f..f2800d9 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server;
 import org.apache.ratis.datastream.DataStreamFactory;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.statemachine.StateMachine;
 
 public interface DataStreamServerFactory extends DataStreamFactory {
 
@@ -35,5 +36,5 @@ public interface DataStreamServerFactory extends 
DataStreamFactory {
   /**
    * Server implementation for streaming in Raft group
    */
-  DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);
+  DataStreamServerRpc newDataStreamServerRpc(RaftPeer server, StateMachine 
stateMachine);
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 38a0bc2..bfa3fe7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServer;
 import org.apache.ratis.server.DataStreamServerFactory;
 import org.apache.ratis.server.DataStreamServerRpc;
+import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,20 +35,17 @@ public class DataStreamServerImpl implements 
DataStreamServer {
 
   private DataStreamServerRpc serverRpc;
   private RaftPeer raftServer;
-  private Parameters parameters;
-  private RaftProperties properties;
+  private final StateMachine stateMachine;
 
-  public DataStreamServerImpl(RaftPeer server,
-                      RaftProperties properties,
-                       Parameters parameters){
+  public DataStreamServerImpl(RaftPeer server, StateMachine stateMachine,
+      RaftProperties properties, Parameters parameters){
     this.raftServer = server;
-    this.parameters = parameters;
-    this.properties = properties;
+    this.stateMachine = stateMachine;
 
     final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
 
     this.serverRpc = DataStreamServerFactory.cast(type.newFactory(parameters))
-                      .newDataStreamServerRpc(server);
+        .newDataStreamServerRpc(raftServer, stateMachine);
   }
 
   @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index ecf7c50..3b5426c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,27 +24,90 @@ import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.client.impl.DataStreamClientImpl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 
 public class TestDataStream extends BaseTest {
+  static final int MODULUS = 23;
+
+  static byte pos2byte(int pos) {
+    return (byte) ('A' + pos%MODULUS);
+  }
+
+  class SingleDataStreamStateMachine extends BaseStateMachine {
+    final WritableByteChannel channel = new WritableByteChannel() {
+      private volatile boolean open = true;
+
+      @Override
+      public int write(ByteBuffer src) {
+        if (!open) {
+          throw new IllegalStateException("Already closed");
+        }
+        final int remaining = src.remaining();
+        for(; src.remaining() > 0; ) {
+          Assert.assertEquals(pos2byte(byteWritten), src.get());
+          byteWritten++;
+        }
+        return remaining;
+      }
+
+      @Override
+      public boolean isOpen() {
+        return open;
+      }
+
+      @Override
+      public void close() {
+        open = false;
+      }
+    };
+
+    final DataStream stream = new DataStream() {
+      @Override
+      public WritableByteChannel getWritableByteChannel() {
+        return channel;
+      }
+
+      @Override
+      public CompletableFuture<?> cleanUp() {
+        try {
+          channel.close();
+        } catch (Throwable t) {
+          return JavaUtils.completeExceptionally(t);
+        }
+        return CompletableFuture.completedFuture(null);
+      }
+    };
+
+    @Override
+    public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+      return CompletableFuture.completedFuture(stream);
+    }
+  }
 
   private RaftPeer[] peers;
   private RaftProperties properties;
   private DataStreamServerImpl server;
   private DataStreamClientImpl client;
-  private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+  private int byteWritten = 0;
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);
+    server = new DataStreamServerImpl(peers[0], new 
SingleDataStreamStateMachine(), properties, null);
     server.getServerRpc().startServer();
   }
 
@@ -61,7 +124,7 @@ public class TestDataStream extends BaseTest {
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
-    peers = Arrays.asList(MiniRaftCluster.generateIds(1, 0)).stream()
+    peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
                        .map(id -> new RaftPeer(id, 
NetUtils.createLocalServerAddress()))
                        .toArray(RaftPeer[]::new);
@@ -72,27 +135,47 @@ public class TestDataStream extends BaseTest {
   }
 
   public void runTestDataStream(){
-    DataStreamOutput stream = client.stream();
-    ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
-    for (int i = 0; i < bf.capacity(); i++) {
-      bf.put((byte)'a');
-    }
-    bf.flip();
-    int i = 0;
+    final int bufferSize = 1024*1024;
+    final int bufferNum = 10;
+    final DataStreamOutput out = client.stream();
 
-    while(i < 1000){
-      bf.position(0).limit(bf.capacity());
-      futures.add(stream.streamAsync(bf));
-      i++;
+    //send request
+    final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    futures.add(sendRequest(out, 1024));
+
+    //send data
+    final int halfBufferSize = bufferSize/2;
+    int dataSize = 0;
+    for(int i = 0; i < bufferNum; i++) {
+      final int size = halfBufferSize + 
ThreadLocalRandom.current().nextInt(halfBufferSize);
+      final ByteBuffer bf = initBuffer(dataSize, size);
+      futures.add(out.streamAsync(bf));
+      dataSize += size;
     }
-    try {
-      Thread.sleep(1000*3);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+
+    //join all requests
+    for(CompletableFuture<DataStreamReply> f : futures) {
+      f.join();
     }
+    Assert.assertEquals(dataSize, byteWritten);
     shutDownSetup();
-    for(i = 0; i < futures.size(); i++){
-      Assert.assertTrue(futures.get(i).isDone());
+  }
+
+  CompletableFuture<DataStreamReply> sendRequest(DataStreamOutput out, int 
size) {
+    // TODO RATIS-1085: create a RaftClientRequest and put it in the buffer
+    final ByteBuffer buffer = initBuffer(0, size);
+    return out.streamAsync(buffer);
+  }
+
+  static ByteBuffer initBuffer(int offset, int size) {
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+    final int length = buffer.capacity();
+    buffer.position(0).limit(length);
+    for (int j = 0; j < length; j++) {
+      buffer.put(pos2byte(offset + j));
     }
+    buffer.flip();
+    Assert.assertEquals(length, buffer.remaining());
+    return buffer;
   }
 }

Reply via email to