This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7878ddf  RATIS-1085. Encode a RaftClientRequest as the head of a 
stream request (#221)
7878ddf is described below

commit 7878ddf40b8c730f31ec7205b2a1b4fc64c6f0c7
Author: runzhiwang <[email protected]>
AuthorDate: Wed Oct 14 09:43:39 2020 +0800

    RATIS-1085. Encode a RaftClientRequest as the head of a stream request 
(#221)
---
 .../apache/ratis/client/api/DataStreamOutput.java  |  2 +-
 .../ratis/client/impl/DataStreamClientImpl.java    | 24 ++++++++++++++++++++--
 .../ratis/netty/server/NettyServerStreamRpc.java   |  9 ++++++--
 .../apache/ratis/datastream/TestDataStream.java    | 22 ++++++++++++--------
 4 files changed, 43 insertions(+), 14 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 936b9b6..dd86569 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -26,5 +26,5 @@ import java.util.concurrent.CompletableFuture;
 /** An asynchronous output stream supporting zero buffer copying. */
 public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
   /** Send out the data in the buffer asynchronously */
-  CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf);
+  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2afc42a..2c9d175 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -25,8 +25,11 @@ import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +47,9 @@ public class DataStreamClientImpl implements DataStreamClient 
{
 
   private DataStreamClientRpc dataStreamClientRpc;
   private OrderedStreamAsync orderedStreamAsync;
+  // TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup 
in constructor.
+  private final ClientId clientId = ClientId.randomId();
+  private final RaftGroupId groupId =  RaftGroupId.randomId();
   private RaftPeer raftServer;
   private RaftProperties properties;
   private Parameters parameters;
@@ -64,17 +70,23 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, 
properties);
   }
 
-  class DataStreamOutputImpl implements DataStreamOutput {
+  public class DataStreamOutputImpl implements DataStreamOutput {
     private long streamId = 0;
     private long messageId = 0;
+    private final RaftClientRequest header;
+    private final CompletableFuture<DataStreamReply> headerFuture;
 
     public DataStreamOutputImpl(long id){
       this.streamId = id;
+      this.header = new RaftClientRequest(clientId, raftServer.getId(), 
groupId, RaftClientImpl.nextCallId(),
+          RaftClientRequest.writeRequestType());
+      this.headerFuture = orderedStreamAsync.sendRequest(streamId, messageId,
+          
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer());
     }
 
     // send to the attached dataStreamClientRpc
     @Override
-    public CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf) {
+    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
       messageId++;
       return orderedStreamAsync.sendRequest(streamId, messageId, buf);
     }
@@ -84,6 +96,14 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     public CompletableFuture<DataStreamReply> closeAsync() {
       return null;
     }
+
+    public RaftClientRequest getHeader() {
+      return header;
+    }
+
+    public CompletableFuture<DataStreamReply> getHeaderFuture() {
+      return headerFuture;
+    }
   }
 
   @Override
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 3caaafd..6ccabbc 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -18,6 +18,8 @@
 
 package org.apache.ratis.netty.server;
 
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -25,6 +27,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.*;
@@ -65,9 +68,11 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
   private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, 
AtomicBoolean released) {
     try {
-      // TODO RATIS-1085: read the request from buf
-      final RaftClientRequest request = null;
+      final RaftClientRequest request =
+          
ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
+    } catch (InvalidProtocolBufferException e) {
+      throw new CompletionException(e);
     } finally {
       buf.release();
       released.set(true);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 3b5426c..cca6c94 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -96,6 +96,7 @@ public class TestDataStream extends BaseTest {
 
     @Override
     public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+      writeRequest = request;
       return CompletableFuture.completedFuture(stream);
     }
   }
@@ -105,6 +106,7 @@ public class TestDataStream extends BaseTest {
   private DataStreamServerImpl server;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
+  private RaftClientRequest writeRequest;
 
   public void setupServer(){
     server = new DataStreamServerImpl(peers[0], new 
SingleDataStreamStateMachine(), properties, null);
@@ -138,10 +140,12 @@ public class TestDataStream extends BaseTest {
     final int bufferSize = 1024*1024;
     final int bufferNum = 10;
     final DataStreamOutput out = client.stream();
+    DataStreamClientImpl.DataStreamOutputImpl impl = 
(DataStreamClientImpl.DataStreamOutputImpl) out;
 
-    //send request
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-    futures.add(sendRequest(out, 1024));
+
+    // add header
+    futures.add(impl.getHeaderFuture());
 
     //send data
     final int halfBufferSize = bufferSize/2;
@@ -149,7 +153,7 @@ public class TestDataStream extends BaseTest {
     for(int i = 0; i < bufferNum; i++) {
       final int size = halfBufferSize + 
ThreadLocalRandom.current().nextInt(halfBufferSize);
       final ByteBuffer bf = initBuffer(dataSize, size);
-      futures.add(out.streamAsync(bf));
+      futures.add(out.writeAsync(bf));
       dataSize += size;
     }
 
@@ -157,16 +161,16 @@ public class TestDataStream extends BaseTest {
     for(CompletableFuture<DataStreamReply> f : futures) {
       f.join();
     }
+
+    Assert.assertEquals(writeRequest.getClientId(), 
impl.getHeader().getClientId());
+    Assert.assertEquals(writeRequest.getCallId(), 
impl.getHeader().getCallId());
+    Assert.assertEquals(writeRequest.getRaftGroupId(), 
impl.getHeader().getRaftGroupId());
+    Assert.assertEquals(writeRequest.getServerId(), 
impl.getHeader().getServerId());
+
     Assert.assertEquals(dataSize, byteWritten);
     shutDownSetup();
   }
 
-  CompletableFuture<DataStreamReply> sendRequest(DataStreamOutput out, int 
size) {
-    // TODO RATIS-1085: create a RaftClientRequest and put it in the buffer
-    final ByteBuffer buffer = initBuffer(0, size);
-    return out.streamAsync(buffer);
-  }
-
   static ByteBuffer initBuffer(int offset, int size) {
     final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
     final int length = buffer.capacity();

Reply via email to