szetszwo commented on a change in pull request #258:
URL: https://github.com/apache/incubator-ratis/pull/258#discussion_r518506478
##########
File path:
ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
##########
@@ -474,6 +474,7 @@ void assertHeader(Server server, RaftClientRequest header,
int dataSize) throws
Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
Assert.assertEquals(dataSize, stream.getByteWritten());
Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
+ Assert.assertEquals(writeRequest.getClientId(), header.getClientId());
final Server primary = getPrimaryServer();
if (server == primary) {
Review comment:
We may remove the if-statement and check all servers.
```
Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
```
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -70,13 +70,13 @@ public DataStreamClientImpl(
private long streamOffset = 0;
public DataStreamOutputImpl(RaftGroupId groupId) {
- this(groupId, RaftClientImpl.nextCallId());
+ this(new RaftClientRequest(clientId, raftServer.getId(), groupId,
RaftClientImpl.nextCallId(),
+ RaftClientRequest.writeRequestType()));
}
- public DataStreamOutputImpl(RaftGroupId groupId, long streamId) {
- this.header = new RaftClientRequest(clientId, raftServer.getId(),
groupId, streamId,
- RaftClientRequest.writeRequestType());
- this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
+ public DataStreamOutputImpl(RaftClientRequest request) {
Review comment:
We should use private.
##########
File path:
ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -70,13 +70,13 @@ public DataStreamClientImpl(
private long streamOffset = 0;
public DataStreamOutputImpl(RaftGroupId groupId) {
- this(groupId, RaftClientImpl.nextCallId());
+ this(new RaftClientRequest(clientId, raftServer.getId(), groupId,
RaftClientImpl.nextCallId(),
+ RaftClientRequest.writeRequestType()));
}
Review comment:
Let's remove this constructor and move the new RaftClientRequest(..) to
stream(RaftGroupId gid).
```
@Override
public DataStreamOutputRpc stream(RaftGroupId gid) {
return stream(new RaftClientRequest(clientId, raftServer.getId(),
groupId, RaftClientImpl.nextCallId(),
RaftClientRequest.writeRequestType()));
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]