This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1af0980  RATIS-1030: Implement ratis streaming using netty - Client 
code (#179). Contributed by Ansh Khanna
1af0980 is described below

commit 1af09806518c40a401948af5352a3192f183c8d3
Author: anshkhannasbu <[email protected]>
AuthorDate: Thu Aug 13 15:56:42 2020 -0400

    RATIS-1030: Implement ratis streaming using netty - Client code (#179). 
Contributed by Ansh Khanna
---
 .../org/apache/ratis/client/DataStreamClient.java  |  46 ++----
 .../ratis/client/DataStreamClientFactory.java      |   4 +-
 .../apache/ratis/client/DataStreamClientRpc.java   |   3 +
 .../ratis/client/impl/DataStreamClientImpl.java    |  94 +++++++++---
 .../ratis/client/impl/OrderedStreamAsync.java      | 160 +++++++++++++++++++++
 .../apache/ratis/protocol/DataStreamMessage.java   |   2 +-
 ...StreamMessage.java => DataStreamReplyImpl.java} |  40 +++++-
 ...amMessage.java => DataStreamRequestClient.java} |  38 ++++-
 .../ratis/protocol/DataStreamRequestServer.java    |  53 +++++++
 .../apache/ratis/netty/NettyDataStreamFactory.java |  10 +-
 .../ratis/netty/client/NettyClientStreamRpc.java   | 116 +++++++++++++++
 .../netty/decoders/DataStreamReplyDecoder.java     |  53 +++++++
 .../netty/decoders/DataStreamRequestDecoder.java   |  59 ++++++++
 .../netty/encoders/DataStreamReplyEncoder.java     |  44 ++++++
 .../netty/encoders/DataStreamRequestEncoder.java   |  42 ++++++
 .../org/apache/ratis/server/DataStreamServer.java  |  18 +--
 .../ratis/server/DataStreamServerFactory.java      |  14 +-
 .../apache/ratis/server/DataStreamServerRpc.java   |  10 ++
 18 files changed, 728 insertions(+), 78 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
index 8e3f7ae..ae94aab 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
@@ -17,13 +17,11 @@
  */
 package org.apache.ratis.client;
 
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.api.DataStreamApi;
+import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.client.impl.DataStreamClientImpl;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.SupportedDataStreamType;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,42 +33,32 @@ public interface DataStreamClient {
 
   Logger LOG = LoggerFactory.getLogger(DataStreamClient.class);
 
-  /** Return Client id. */
-  ClientId getId();
+  /** Return the rpc client instance **/
+  DataStreamClientRpc getClientRpc();
 
   /** Return Streamer Api instance. */
-  DataStreamApi getDataStreamApi();
+  DataStreamOutput stream();
 
-  /**
-   * send to server via streaming.
-   * Return a completable future.
-   */
+  /** add information of the raft peers to communicate with */
+  void addPeers(Iterable<RaftPeer> peers);
+
+  /** close the client */
+  void close();
 
   /** To build {@link DataStreamClient} objects */
   class Builder {
-    private ClientId clientId;
-    private DataStreamClientRpc dataStreamClientRpc;
+    private RaftPeer raftServer;
     private RaftProperties properties;
     private Parameters parameters;
 
     private Builder() {}
 
     public DataStreamClientImpl build(){
-      if (clientId == null) {
-        clientId = ClientId.randomId();
-      }
-      if (properties != null) {
-        if (dataStreamClientRpc == null) {
-          final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
-          dataStreamClientRpc = 
DataStreamClientFactory.cast(type.newFactory(parameters))
-              .newDataStreamClientRpc(clientId, properties);
-        }
-      }
-      return new DataStreamClientImpl(clientId, properties, 
dataStreamClientRpc);
+      return new DataStreamClientImpl(raftServer, properties, parameters);
     }
 
-    public Builder setClientId(ClientId clientId) {
-      this.clientId = clientId;
+    public Builder setRaftServer(RaftPeer peer) {
+      this.raftServer = peer;
       return this;
     }
 
@@ -79,15 +67,9 @@ public interface DataStreamClient {
       return this;
     }
 
-    public Builder setDataStreamClientRpc(DataStreamClientRpc 
dataStreamClientRpc){
-      this.dataStreamClientRpc = dataStreamClientRpc;
-      return this;
-    }
-
     public Builder setProperties(RaftProperties properties) {
       this.properties = properties;
       return this;
     }
   }
-
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
index f5b00e0..c77fb7a 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientFactory.java
@@ -20,7 +20,7 @@ package org.apache.ratis.client;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.DataStreamFactory;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftPeer;
 
 /**
  * A factory to create streaming client.
@@ -36,5 +36,5 @@ public interface DataStreamClientFactory extends 
DataStreamFactory {
         + "; stream type is " + dataStreamFactory.getDataStreamType());
   }
 
-  DataStreamClientRpc newDataStreamClientRpc(ClientId clientId, RaftProperties 
properties);
+  DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, RaftProperties 
properties);
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
index 7021de9..4db1c55 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClientRpc.java
@@ -35,4 +35,7 @@ public interface DataStreamClientRpc {
         + JavaUtils.getCurrentStackTraceElement().getMethodName());
   }
 
+  void startClient();
+
+  void closeClient();
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index fff816c..2afc42a 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -14,38 +14,100 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package org.apache.ratis.client.impl;
 
-import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.DataStreamClient;
-import org.apache.ratis.client.api.DataStreamApi;
+import org.apache.ratis.client.DataStreamClientFactory;
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Streaming client implementation
+ * allows client to create streams and send asynchronously.
+ */
 
 public class DataStreamClientImpl implements DataStreamClient {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DataStreamClientImpl.class);
 
-  private final ClientId clientId;
-  private final RaftProperties properties;
-  private final DataStreamClientRpc dataStreamClientRpc;
-  private final DataStreamApi dataStreamApi = null; //TODO need an impl
+  private DataStreamClientRpc dataStreamClientRpc;
+  private OrderedStreamAsync orderedStreamAsync;
+  private RaftPeer raftServer;
+  private RaftProperties properties;
+  private Parameters parameters;
+  private long streamId = 0;
 
-  public DataStreamClientImpl(ClientId clientId,
+  public DataStreamClientImpl(RaftPeer raftServer,
                               RaftProperties properties,
-                              DataStreamClientRpc dataStreamClientRpc){
-    this.clientId = clientId;
+                              Parameters parameters) {
+    this.raftServer = Objects.requireNonNull(raftServer,
+                                          "peer == null");
     this.properties = properties;
-    this.dataStreamClientRpc = dataStreamClientRpc;
+    this.parameters = parameters;
+
+    final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
+    this.dataStreamClientRpc = 
DataStreamClientFactory.cast(type.newFactory(parameters))
+                               .newDataStreamClientRpc(raftServer, properties);
+
+    this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, 
properties);
+  }
+
+  class DataStreamOutputImpl implements DataStreamOutput {
+    private long streamId = 0;
+    private long messageId = 0;
+
+    public DataStreamOutputImpl(long id){
+      this.streamId = id;
+    }
+
+    // send to the attached dataStreamClientRpc
+    @Override
+    public CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf) {
+      messageId++;
+      return orderedStreamAsync.sendRequest(streamId, messageId, buf);
+    }
+
+    // should wait for attached sliding window to terminate
+    @Override
+    public CompletableFuture<DataStreamReply> closeAsync() {
+      return null;
+    }
+  }
+
+  @Override
+  public DataStreamClientRpc getClientRpc() {
+    return dataStreamClientRpc;
   }
 
   @Override
-  public ClientId getId() {
-    return clientId;
+  public DataStreamOutput stream() {
+    streamId++;
+    return new DataStreamOutputImpl(streamId);
   }
 
   @Override
-  public DataStreamApi getDataStreamApi(){
-    return dataStreamApi;
+  public void addPeers(Iterable<RaftPeer> peers) {
+    return;
   }
 
+  @Override
+  public void close(){
+    dataStreamClientRpc.closeClient();
+  }
+
+  public void start(){
+    dataStreamClientRpc.startClient();
+  }
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
new file mode 100644
index 0000000..34ba99e
--- /dev/null
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SlidingWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.function.LongFunction;
+
+public class OrderedStreamAsync {
+  public static final Logger LOG = 
LoggerFactory.getLogger(OrderedStreamAsync.class);
+
+  public static class DataStreamWindowRequest implements DataStreamRequest,
+      SlidingWindow.ClientSideRequest<DataStreamReply> {
+    private final long streamId;
+    private final long messageId;
+    private final long seqNum;
+    private final ByteBuffer data;
+    private boolean isFirst = false;
+    private CompletableFuture<DataStreamReply> replyFuture = new 
CompletableFuture<>();
+
+    public DataStreamRequestClient newDataStreamRequest(){
+      return new DataStreamRequestClient(streamId, messageId, data.slice());
+    }
+
+    @Override
+    public long getStreamId() {
+      return streamId;
+    }
+
+    @Override
+    public long getDataOffset() {
+      return messageId;
+    }
+
+    @Override
+    public long getDataLength() {
+      return data.capacity();
+    }
+
+    @Override
+    public void setFirstRequest() {
+      isFirst = true;
+    }
+
+    @Override
+    public long getSeqNum() {
+      return seqNum;
+    }
+
+    @Override
+    public void setReply(DataStreamReply dataStreamReply) {
+      replyFuture.complete(dataStreamReply);
+    }
+
+    @Override
+    public boolean hasReply() {
+      return replyFuture.isDone();
+    }
+
+    @Override
+    public void fail(Throwable e) {
+      replyFuture.completeExceptionally(e);
+    }
+
+    public CompletableFuture<DataStreamReply> getReplyFuture(){
+      return replyFuture;
+    }
+
+    public DataStreamWindowRequest(long streamId, long messageId,
+                                 long seqNum, ByteBuffer data){
+      this.streamId = streamId;
+      this.messageId = messageId;
+      this.data = data.slice();
+      this.seqNum = seqNum;
+    }
+  }
+
+  private SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> 
slidingWindow;
+  private Semaphore requestSemaphore;
+  private DataStreamClientRpc dataStreamClientRpc;
+
+  public OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc,
+                            RaftProperties properties){
+    this.dataStreamClientRpc = dataStreamClientRpc;
+    this.requestSemaphore = new 
Semaphore(RaftClientConfigKeys.Async.outstandingRequestsMax(properties)*2);
+    this.slidingWindow = new SlidingWindow.Client<>("sliding");
+  }
+
+  private void resetSlidingWindow(RaftClientRequest request) {
+    slidingWindow.resetFirstSeqNum();
+  }
+
+  public CompletableFuture<DataStreamReply> sendRequest(long streamId,
+                                                        long messageId,
+                                                        ByteBuffer data){
+    try {
+      requestSemaphore.acquire();
+    } catch (InterruptedException e){
+      return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
+          "Interrupted when sending streamId=" + streamId + ", messageId= " + 
messageId, e));
+    }
+    final LongFunction<DataStreamWindowRequest> constructor = seqNum -> new 
DataStreamWindowRequest(streamId,
+                                                                          
messageId, seqNum, data);
+    return slidingWindow.submitNewRequest(constructor, 
this::sendRequestToNetwork).
+           getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
+  }
+
+  private void sendRequestToNetwork(DataStreamWindowRequest request){
+    CompletableFuture<DataStreamReply> f = request.getReplyFuture();
+    if(f.isDone()) {
+      return;
+    }
+    if(slidingWindow.isFirst(request.getSeqNum())){
+      request.setFirstRequest();
+    }
+    DataStreamRequestClient rpcRequest = request.newDataStreamRequest();
+    CompletableFuture<DataStreamReply> requestFuture = 
dataStreamClientRpc.streamAsync(rpcRequest);
+    requestFuture.thenApply(reply -> {
+      slidingWindow.receiveReply(
+          request.getSeqNum(), reply, this::sendRequestToNetwork);
+      return reply;
+    }).thenAccept(reply -> {
+      if (f.isDone()) {
+        return;
+      }
+      f.complete(reply);
+    });
+  }
+
+
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
index eae8137..b05694c 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
@@ -18,7 +18,7 @@
 
 package org.apache.ratis.protocol;
 
-public interface DataStreamMessage extends RaftRpcMessage {
+public interface DataStreamMessage{
   long getStreamId();
 
   long getDataOffset();
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
similarity index 54%
copy from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
copy to 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
index eae8137..fba60f0 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyImpl.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,10 +18,38 @@
 
 package org.apache.ratis.protocol;
 
-public interface DataStreamMessage extends RaftRpcMessage {
-  long getStreamId();
+import java.nio.ByteBuffer;
 
-  long getDataOffset();
+public class DataStreamReplyImpl implements DataStreamReply {
+  private long streamId;
+  private long dataOffset;
+  private ByteBuffer response;
 
-  long getDataLength();
-}
\ No newline at end of file
+  public DataStreamReplyImpl(long streamId,
+                             long dataOffset,
+                             ByteBuffer bf){
+    this.streamId = streamId;
+    this.dataOffset = dataOffset;
+    this.response = bf;
+  }
+
+  @Override
+  public ByteBuffer getResponse() {
+    return response;
+  }
+
+  @Override
+  public long getStreamId() {
+    return streamId;
+  }
+
+  @Override
+  public long getDataOffset() {
+    return dataOffset;
+  }
+
+  @Override
+  public long getDataLength() {
+    return response.capacity();
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
similarity index 56%
copy from 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
copy to 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
index eae8137..a34809c 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestClient.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,10 +18,36 @@
 
 package org.apache.ratis.protocol;
 
-public interface DataStreamMessage extends RaftRpcMessage {
-  long getStreamId();
+import java.nio.ByteBuffer;
 
-  long getDataOffset();
+public class DataStreamRequestClient implements DataStreamRequest{
+  private long streamId;
+  private long dataOffset;
+  private ByteBuffer buf;
 
-  long getDataLength();
-}
\ No newline at end of file
+  public DataStreamRequestClient(long streamId, long dataOffset,
+                                 ByteBuffer buf){
+    this.streamId = streamId;
+    this.dataOffset = dataOffset;
+    this.buf = buf;
+  }
+
+  @Override
+  public long getStreamId() {
+    return streamId;
+  }
+
+  @Override
+  public long getDataOffset() {
+    return dataOffset;
+  }
+
+  @Override
+  public long getDataLength() {
+    return buf.capacity();
+  }
+
+  public ByteBuffer getBuf() {
+    return buf;
+  }
+}
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
new file mode 100644
index 0000000..8906700
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestServer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+
+public class DataStreamRequestServer implements DataStreamRequest {
+  private long streamId;
+  private long dataOffset;
+  private ByteBuf buf;
+
+  public DataStreamRequestServer(long streamId, long dataOffset,
+                                 ByteBuf buf){
+    this.streamId = streamId;
+    this.dataOffset = dataOffset;
+    this.buf = buf;
+  }
+
+  @Override
+  public long getStreamId() {
+    return streamId;
+  }
+
+  @Override
+  public long getDataOffset() {
+    return dataOffset;
+  }
+
+  @Override
+  public long getDataLength() {
+    return buf.capacity();
+  }
+
+  public ByteBuf getBuf() {
+    return buf;
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 66c360a..2480188 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -22,8 +22,8 @@ import org.apache.ratis.client.DataStreamClientFactory;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.netty.client.NettyClientStreamRpc;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.DataStreamServerFactory;
 
@@ -36,12 +36,12 @@ public class NettyDataStreamFactory implements 
DataStreamServerFactory, DataStre
   }
 
   @Override
-  public DataStreamClientRpc newDataStreamClientRpc(ClientId clientId, 
RaftProperties properties) {
-    return null;
+  public DataStreamClientRpc newDataStreamClientRpc(RaftPeer server, 
RaftProperties properties) {
+    return new NettyClientStreamRpc(server, properties);
   }
 
   @Override
-  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
+  public DataStreamServerRpc newDataStreamServerRpc(RaftPeer server) {
     return null;
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
new file mode 100644
index 0000000..c59a928
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.decoders.DataStreamReplyDecoder;
+import org.apache.ratis.netty.encoders.DataStreamRequestEncoder;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.ratis.thirdparty.io.netty.channel.*;
+import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
+import 
org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.ratis.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+public class NettyClientStreamRpc implements DataStreamClientRpc {
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientStreamRpc.class);
+
+  private RaftPeer server;
+  private RaftProperties raftProperties;
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private Channel channel;
+  private Queue<CompletableFuture<DataStreamReply>> replies
+      = new LinkedList<>();
+
+  public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
+    this.server = server;
+    this.raftProperties = properties;
+  }
+
+  synchronized CompletableFuture<DataStreamReply> pollReply() {
+    return replies.poll();
+  }
+
+  private ChannelInboundHandler getClientHandler(){
+    return new ChannelInboundHandlerAdapter(){
+      @Override
+      public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
InterruptedException {
+        final DataStreamReplyImpl reply = (DataStreamReplyImpl) msg;
+        CompletableFuture<DataStreamReply> f = pollReply();
+        f.complete(reply);
+      }
+    };
+  }
+
+  private ChannelInitializer<SocketChannel> getInitializer(){
+    return new ChannelInitializer<SocketChannel>(){
+      @Override
+      public void initChannel(SocketChannel ch)
+          throws Exception {
+        ChannelPipeline p = ch.pipeline();
+        p.addLast(new DataStreamRequestEncoder());
+        p.addLast(new DataStreamReplyDecoder());
+        p.addLast(getClientHandler());
+      }
+    };
+  }
+
+  @Override
+  public synchronized CompletableFuture<DataStreamReply> 
streamAsync(DataStreamRequest request) {
+    CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
+    replies.offer(f);
+    channel.writeAndFlush(request);
+    return f;
+  }
+
+  @Override
+  public void startClient() {
+    final InetSocketAddress address = 
NetUtils.createSocketAddr(server.getAddress());
+    try {
+      channel = (new Bootstrap())
+          .group(workerGroup)
+          .channel(NioSocketChannel.class)
+          .handler(getInitializer())
+          .option(ChannelOption.SO_KEEPALIVE, true)
+          .connect(address)
+          .sync()
+          .channel();
+      System.out.println(channel);
+    } catch (Exception e){
+      LOG.info("Exception {}", e.getCause());
+    }
+  }
+
+  @Override
+  public void closeClient(){
+    channel.close().syncUninterruptibly();
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
new file mode 100644
index 0000000..2417707
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamReplyDecoder.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.decoders;
+
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+public class DataStreamReplyDecoder extends ByteToMessageDecoder {
+
+  @Override
+  protected void decode(ChannelHandlerContext channelHandlerContext,
+                        ByteBuf byteBuf, List<Object> list) throws Exception {
+
+    if(byteBuf.readableBytes() >= 24){
+      long streamId = byteBuf.readLong();
+      long dataOffset = byteBuf.readLong();
+      long dataLength = byteBuf.readLong();
+      if(byteBuf.readableBytes() >= dataLength){
+        DataStreamReplyImpl reply = new DataStreamReplyImpl(streamId,
+            dataOffset,
+            byteBuf.slice(byteBuf.readerIndex(), (int) 
dataLength).nioBuffer());
+        byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
+        byteBuf.markReaderIndex();
+        list.add(reply);
+      } else {
+        byteBuf.resetReaderIndex();
+        return;
+      }
+    } else{
+      return;
+    }
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
new file mode 100644
index 0000000..b0aca36
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/decoders/DataStreamRequestDecoder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ratis.netty.decoders;
+
+import org.apache.ratis.protocol.DataStreamRequestServer;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+public class DataStreamRequestDecoder extends ByteToMessageDecoder {
+
+  public DataStreamRequestDecoder() {
+    this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
+  }
+
+  @Override
+  protected void decode(ChannelHandlerContext channelHandlerContext,
+                        ByteBuf byteBuf,
+                        List<Object> list) throws Exception {
+    if(byteBuf.readableBytes() >= 24){
+      long streamId = byteBuf.readLong();
+      long dataOffset = byteBuf.readLong();
+      long dataLength = byteBuf.readLong();
+      if(byteBuf.readableBytes() >= dataLength){
+        ByteBuf bf = byteBuf.slice(byteBuf.readerIndex(), (int)dataLength);
+        bf.retain();
+        DataStreamRequestServer req = new DataStreamRequestServer(streamId,
+            dataOffset,
+            bf);
+        byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
+        byteBuf.markReaderIndex();
+        list.add(req);
+      } else {
+        byteBuf.resetReaderIndex();
+        return;
+      }
+    } else{
+      return;
+    }
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
new file mode 100644
index 0000000..2d7957d
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamReplyEncoder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ratis.netty.encoders;
+
+import org.apache.ratis.protocol.DataStreamReplyImpl;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class DataStreamReplyEncoder extends
+    MessageToMessageEncoder<DataStreamReplyImpl> {
+
+  @Override
+  protected void encode(ChannelHandlerContext channelHandlerContext,
+                        DataStreamReplyImpl dataStreamReply,
+                        List<Object> list) throws Exception {
+
+    ByteBuffer bb = ByteBuffer.allocateDirect(24);
+    bb.putLong(dataStreamReply.getStreamId());
+    bb.putLong(dataStreamReply.getDataOffset());
+    bb.putLong(dataStreamReply.getDataLength());
+    bb.flip();
+    list.add(Unpooled.wrappedBuffer(bb, dataStreamReply.getResponse()));
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
new file mode 100644
index 0000000..84bf570
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/encoders/DataStreamRequestEncoder.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.netty.encoders;
+
+import org.apache.ratis.protocol.DataStreamRequestClient;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class DataStreamRequestEncoder
+    extends MessageToMessageEncoder<DataStreamRequestClient> {
+
+  @Override
+  protected void encode(ChannelHandlerContext channelHandlerContext,
+                        DataStreamRequestClient requestData, List<Object> 
list) throws Exception {
+    ByteBuffer bb = ByteBuffer.allocateDirect(24);
+    bb.putLong(requestData.getStreamId());
+    bb.putLong(requestData.getDataOffset());
+    bb.putLong(requestData.getDataLength());
+    bb.flip();
+    list.add(Unpooled.wrappedBuffer(bb, requestData.getBuf()));
+  }
+}
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServer.java
similarity index 73%
rename from 
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
rename to 
ratis-server/src/main/java/org/apache/ratis/server/DataStreamServer.java
index 03fd4b3..434aee6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServer.java
@@ -15,19 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.client.api;
+package org.apache.ratis.server;
 
 /**
- * An interface for streaming data.
- * Associated with it's implementation will be a client.
+ * Interface for streaming server.
  */
-
-public interface DataStreamApi {
-
+public interface DataStreamServer {
   /**
-   * Create a new stream for a new streamToRatis invocation
-   * allows multiple stream from a single client.
+   * Get network interface for server.
    */
-  DataStreamOutput stream();
+  DataStreamServerRpc getServerRpc();
 
+  /**
+   * close server.
+   */
+  void close();
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index f0c3bd9..f22e48f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -18,10 +18,22 @@
 package org.apache.ratis.server;
 
 import org.apache.ratis.datastream.DataStreamFactory;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.ServerFactory;
 
 public interface DataStreamServerFactory extends DataStreamFactory {
+
+  static DataStreamServerFactory cast(DataStreamFactory dataStreamFactory) {
+    if (dataStreamFactory instanceof DataStreamFactory) {
+      return (DataStreamServerFactory)dataStreamFactory;
+    }
+    throw new ClassCastException("Cannot cast " + dataStreamFactory.getClass()
+        + " to " + ServerFactory.class
+        + "; rpc type is " + dataStreamFactory.getDataStreamType());
+  }
+
   /**
    * Server implementation for streaming in Raft group
    */
-  DataStreamServerRpc newDataStreamServerRpc(RaftServer server);
+  DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
index c5a074b..f8a270e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
@@ -22,4 +22,14 @@ package org.apache.ratis.server;
  * Relays those streams to other servers after persisting
  */
 public interface DataStreamServerRpc {
+  /**
+   * start server
+   */
+  void startServer();
+
+  /**
+   * shutdown server
+   */
+  void closeServer();
+
 }

Reply via email to