amaliujia commented on a change in pull request #224:
URL: https://github.com/apache/incubator-ratis/pull/224#discussion_r505132543



##########
File path: 
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -36,59 +36,56 @@
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Streaming client implementation
  * allows client to create streams and send asynchronously.
  */
-
 public class DataStreamClientImpl implements DataStreamClient {
   public static final Logger LOG = 
LoggerFactory.getLogger(DataStreamClientImpl.class);
 
-  private DataStreamClientRpc dataStreamClientRpc;
-  private OrderedStreamAsync orderedStreamAsync;
   // TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup 
in constructor.
   private final ClientId clientId = ClientId.randomId();
   private final RaftGroupId groupId =  RaftGroupId.randomId();
-  private RaftPeer raftServer;
-  private RaftProperties properties;
-  private Parameters parameters;
-  private long streamId = 0;
-
-  public DataStreamClientImpl(RaftPeer raftServer,
-                              RaftProperties properties,
-                              Parameters parameters) {
-    this.raftServer = Objects.requireNonNull(raftServer,
-                                          "peer == null");
-    this.properties = properties;
-    this.parameters = parameters;
+
+  private final RaftPeer raftServer;
+  private final DataStreamClientRpc dataStreamClientRpc;
+  private final OrderedStreamAsync orderedStreamAsync;
+
+  private final AtomicInteger streamId = new AtomicInteger();
+
+  public DataStreamClientImpl(RaftPeer server, RaftProperties properties, 
Parameters parameters) {
+    this.raftServer = Objects.requireNonNull(server, "server == null");
 
     final SupportedDataStreamType type = 
RaftConfigKeys.DataStream.type(properties, LOG::info);
     this.dataStreamClientRpc = 
DataStreamClientFactory.cast(type.newFactory(parameters))
                                .newDataStreamClientRpc(raftServer, properties);
 
-    this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, 
properties);
+    this.orderedStreamAsync = new OrderedStreamAsync(clientId, 
dataStreamClientRpc, properties);
   }
 
   public class DataStreamOutputImpl implements DataStreamOutput {
-    private long streamId = 0;
-    private long messageId = 0;
+    private final long streamId;
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
 
+    private long streamOffset = 0;
+
     public DataStreamOutputImpl(long id){
       this.streamId = id;
       this.header = new RaftClientRequest(clientId, raftServer.getId(), 
groupId, RaftClientImpl.nextCallId(),
           RaftClientRequest.writeRequestType());
-      this.headerFuture = orderedStreamAsync.sendRequest(streamId, messageId,
+      this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,

Review comment:
       Not sure on -1 here: so because this is just a header request so set 
streamOffset as `-1` does not matter? (e.g. no need to use 0).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to