This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 88327da RATIS-1097. DataStreamOutputImpl should use streamOffset
instead of messageId (#224)
88327da is described below
commit 88327daff3261cb3e2929d7fbcbba1ec51d67a34
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 15 15:16:21 2020 +0800
RATIS-1097. DataStreamOutputImpl should use streamOffset instead of
messageId (#224)
* RATIS-1097. DataStreamOutputImpl should use streamOffset instead of
messageId.
* Add review comments.
* Remove the unused import.
* Fix testAsyncConfiguration.
---
.../apache/ratis/client/RaftClientConfigKeys.java | 18 ++++-
.../ratis/client/impl/DataStreamClientImpl.java | 42 ++++++------
.../ratis/client/impl/MessageStreamImpl.java | 2 +-
.../ratis/client/impl/OrderedStreamAsync.java | 77 ++++++----------------
.../impl/DataStreamPacketByteBuffer.java} | 39 +++++------
.../impl/DataStreamPacketImpl.java} | 41 ++++++------
.../impl/DataStreamReplyByteBuffer.java} | 16 +++--
.../impl/DataStreamRequestByteBuffer.java} | 16 +++--
...ataStreamMessage.java => DataStreamPacket.java} | 4 +-
.../org/apache/ratis/protocol/DataStreamReply.java | 5 +-
.../apache/ratis/protocol/DataStreamRequest.java | 2 +-
.../ratis/netty/client/DataStreamReplyDecoder.java | 2 +-
.../netty/client/DataStreamRequestEncoder.java | 13 ++--
.../ratis/netty/server/DataStreamReplyEncoder.java | 16 ++---
.../netty/server/DataStreamRequestByteBuf.java | 35 ++++------
.../ratis/netty/server/NettyServerStreamRpc.java | 31 ++++-----
.../org/apache/ratis/MessageStreamApiTests.java | 4 +-
17 files changed, 167 insertions(+), 196 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 7fc01c2..746a4ab 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -90,8 +90,22 @@ public interface RaftClientConfigKeys {
}
}
- interface Stream {
- String PREFIX = RaftClientConfigKeys.PREFIX + ".stream";
+ interface DataStream {
+ String PREFIX = RaftClientConfigKeys.PREFIX + ".data-stream";
+
+ String OUTSTANDING_REQUESTS_MAX_KEY = PREFIX + ".outstanding-requests.max";
+ int OUTSTANDING_REQUESTS_MAX_DEFAULT = 100;
+ static int outstandingRequestsMax(RaftProperties properties) {
+ return getInt(properties::getInt, OUTSTANDING_REQUESTS_MAX_KEY,
+ OUTSTANDING_REQUESTS_MAX_DEFAULT, getDefaultLog(), requireMin(2));
+ }
+ static void setOutstandingRequestsMax(RaftProperties properties, int
outstandingRequests) {
+ setInt(properties::setInt, OUTSTANDING_REQUESTS_MAX_KEY,
outstandingRequests);
+ }
+ }
+
+ interface MessageStream {
+ String PREFIX = RaftClientConfigKeys.PREFIX + ".message-stream";
String SUBMESSAGE_SIZE_KEY = PREFIX + ".submessage-size";
SizeInBytes SUBMESSAGE_SIZE_DEFAULT = SizeInBytes.valueOf("1MB");
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2c9d175..96c0cf0 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -36,59 +36,56 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Streaming client implementation
* allows client to create streams and send asynchronously.
*/
-
public class DataStreamClientImpl implements DataStreamClient {
public static final Logger LOG =
LoggerFactory.getLogger(DataStreamClientImpl.class);
- private DataStreamClientRpc dataStreamClientRpc;
- private OrderedStreamAsync orderedStreamAsync;
// TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup
in constructor.
private final ClientId clientId = ClientId.randomId();
private final RaftGroupId groupId = RaftGroupId.randomId();
- private RaftPeer raftServer;
- private RaftProperties properties;
- private Parameters parameters;
- private long streamId = 0;
-
- public DataStreamClientImpl(RaftPeer raftServer,
- RaftProperties properties,
- Parameters parameters) {
- this.raftServer = Objects.requireNonNull(raftServer,
- "peer == null");
- this.properties = properties;
- this.parameters = parameters;
+
+ private final RaftPeer raftServer;
+ private final DataStreamClientRpc dataStreamClientRpc;
+ private final OrderedStreamAsync orderedStreamAsync;
+
+ private final AtomicInteger streamId = new AtomicInteger();
+
+ public DataStreamClientImpl(RaftPeer server, RaftProperties properties,
Parameters parameters) {
+ this.raftServer = Objects.requireNonNull(server, "server == null");
final SupportedDataStreamType type =
RaftConfigKeys.DataStream.type(properties, LOG::info);
this.dataStreamClientRpc =
DataStreamClientFactory.cast(type.newFactory(parameters))
.newDataStreamClientRpc(raftServer, properties);
- this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
+ this.orderedStreamAsync = new OrderedStreamAsync(clientId,
dataStreamClientRpc, properties);
}
public class DataStreamOutputImpl implements DataStreamOutput {
- private long streamId = 0;
- private long messageId = 0;
+ private final long streamId;
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
+ private long streamOffset = 0;
+
public DataStreamOutputImpl(long id){
this.streamId = id;
this.header = new RaftClientRequest(clientId, raftServer.getId(),
groupId, RaftClientImpl.nextCallId(),
RaftClientRequest.writeRequestType());
- this.headerFuture = orderedStreamAsync.sendRequest(streamId, messageId,
+ this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer());
}
// send to the attached dataStreamClientRpc
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
- messageId++;
- return orderedStreamAsync.sendRequest(streamId, messageId, buf);
+ final CompletableFuture<DataStreamReply> f =
orderedStreamAsync.sendRequest(streamId, streamOffset, buf);
+ streamOffset += buf.remaining();
+ return f;
}
// should wait for attached sliding window to terminate
@@ -113,8 +110,7 @@ public class DataStreamClientImpl implements
DataStreamClient {
@Override
public DataStreamOutput stream() {
- streamId++;
- return new DataStreamOutputImpl(streamId);
+ return new DataStreamOutputImpl(streamId.incrementAndGet());
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index e2971d6..14779a1 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -71,7 +71,7 @@ public final class MessageStreamImpl implements
MessageStreamApi {
private MessageStreamImpl(RaftClientImpl client, RaftProperties properties) {
this.client = Objects.requireNonNull(client, "client == null");
- this.submessageSize =
RaftClientConfigKeys.Stream.submessageSize(properties);
+ this.submessageSize =
RaftClientConfigKeys.MessageStream.submessageSize(properties);
}
@Override
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 093190b..fb4a7d9 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -20,10 +20,9 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
-import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
@@ -38,37 +37,18 @@ import java.util.function.LongFunction;
public class OrderedStreamAsync {
public static final Logger LOG =
LoggerFactory.getLogger(OrderedStreamAsync.class);
- public static class DataStreamWindowRequest implements DataStreamRequest,
- SlidingWindow.ClientSideRequest<DataStreamReply> {
- private final long streamId;
- private final long messageId;
+ static class DataStreamWindowRequest extends DataStreamRequestByteBuffer
+ implements SlidingWindow.ClientSideRequest<DataStreamReply> {
private final long seqNum;
- private final ByteBuffer data;
- private boolean isFirst = false;
- private CompletableFuture<DataStreamReply> replyFuture = new
CompletableFuture<>();
+ private final CompletableFuture<DataStreamReply> replyFuture = new
CompletableFuture<>();
- public DataStreamRequestByteBuffer newDataStreamRequest(){
- return new DataStreamRequestByteBuffer(streamId, messageId,
data.slice());
- }
-
- @Override
- public long getStreamId() {
- return streamId;
- }
-
- @Override
- public long getDataOffset() {
- return messageId;
- }
-
- @Override
- public long getDataLength() {
- return data.capacity();
+ DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long
seqNum){
+ super(streamId, offset, data);
+ this.seqNum = seqNum;
}
@Override
public void setFirstRequest() {
- isFirst = true;
}
@Override
@@ -94,42 +74,28 @@ public class OrderedStreamAsync {
public CompletableFuture<DataStreamReply> getReplyFuture(){
return replyFuture;
}
-
- public DataStreamWindowRequest(long streamId, long messageId,
- long seqNum, ByteBuffer data){
- this.streamId = streamId;
- this.messageId = messageId;
- this.data = data.slice();
- this.seqNum = seqNum;
- }
}
- private SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply>
slidingWindow;
- private Semaphore requestSemaphore;
- private DataStreamClientRpc dataStreamClientRpc;
+ private final DataStreamClientRpc dataStreamClientRpc;
+ private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply>
slidingWindow;
+ private final Semaphore requestSemaphore;
- public OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc,
- RaftProperties properties){
+ OrderedStreamAsync(ClientId clientId, DataStreamClientRpc
dataStreamClientRpc, RaftProperties properties){
this.dataStreamClientRpc = dataStreamClientRpc;
- this.requestSemaphore = new
Semaphore(RaftClientConfigKeys.Async.outstandingRequestsMax(properties)*2);
- this.slidingWindow = new SlidingWindow.Client<>("sliding");
+ this.slidingWindow = new SlidingWindow.Client<>(clientId);
+ this.requestSemaphore = new
Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties)*2);
}
- private void resetSlidingWindow(RaftClientRequest request) {
- slidingWindow.resetFirstSeqNum();
- }
-
- public CompletableFuture<DataStreamReply> sendRequest(long streamId,
- long messageId,
- ByteBuffer data){
+ CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset,
ByteBuffer data){
+ final int length = data.remaining();
try {
requestSemaphore.acquire();
} catch (InterruptedException e){
return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
- "Interrupted when sending streamId=" + streamId + ", messageId= " +
messageId, e));
+ "Interrupted when sending streamId=" + streamId + ", offset= " +
offset + ", length=" + length, e));
}
- final LongFunction<DataStreamWindowRequest> constructor = seqNum -> new
DataStreamWindowRequest(streamId,
-
messageId, seqNum, data);
+ final LongFunction<DataStreamWindowRequest> constructor
+ = seqNum -> new DataStreamWindowRequest(streamId, offset,
data.slice(), seqNum);
return slidingWindow.submitNewRequest(constructor,
this::sendRequestToNetwork).
getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
}
@@ -142,8 +108,7 @@ public class OrderedStreamAsync {
if(slidingWindow.isFirst(request.getSeqNum())){
request.setFirstRequest();
}
- DataStreamRequestByteBuffer rpcRequest = request.newDataStreamRequest();
- CompletableFuture<DataStreamReply> requestFuture =
dataStreamClientRpc.streamAsync(rpcRequest);
+ final CompletableFuture<DataStreamReply> requestFuture =
dataStreamClientRpc.streamAsync(request);
requestFuture.thenApply(reply -> {
slidingWindow.receiveReply(
request.getSeqNum(), reply, this::sendRequestToNetwork);
@@ -155,6 +120,4 @@ public class OrderedStreamAsync {
f.complete(reply);
});
}
-
-
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
similarity index 61%
rename from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
rename to
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index fee5a04..02fc04a 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,38 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.ratis.protocol;
+package org.apache.ratis.datastream.impl;
import java.nio.ByteBuffer;
-public class DataStreamRequestByteBuffer implements DataStreamRequest{
- private long streamId;
- private long dataOffset;
- private ByteBuffer buf;
-
- public DataStreamRequestByteBuffer(long streamId, long dataOffset,
ByteBuffer buf){
- this.streamId = streamId;
- this.dataOffset = dataOffset;
- this.buf = buf;
- }
-
- @Override
- public long getStreamId() {
- return streamId;
- }
+/**
+ * Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link
ByteBuffer}.
+ *
+ * This class is immutable.
+ */
+public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
+ private final ByteBuffer buffer;
- @Override
- public long getDataOffset() {
- return dataOffset;
+ public DataStreamPacketByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
+ super(streamId, streamOffset);
+ this.buffer = buffer.asReadOnlyBuffer();
}
@Override
public long getDataLength() {
- return buf.capacity();
+ return buffer.remaining();
}
- public ByteBuffer getBuf() {
- return buf;
+ public ByteBuffer slice() {
+ return buffer.slice();
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
similarity index 56%
rename from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
rename to
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index f868bf8..7f76042 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,25 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.ratis.datastream.impl;
-package org.apache.ratis.protocol;
-
-import java.nio.ByteBuffer;
+import org.apache.ratis.protocol.DataStreamPacket;
-public class DataStreamReplyByteBuffer implements DataStreamReply {
- private long streamId;
- private long dataOffset;
- private ByteBuffer response;
+/**
+ * This is an abstract implementation of {@link DataStreamPacket}.
+ *
+ * This class is immutable.
+ */
+public abstract class DataStreamPacketImpl implements DataStreamPacket {
+ private final long streamId;
+ private final long streamOffset;
- public DataStreamReplyByteBuffer(long streamId, long dataOffset, ByteBuffer
bf){
+ public DataStreamPacketImpl(long streamId, long streamOffset) {
this.streamId = streamId;
- this.dataOffset = dataOffset;
- this.response = bf;
- }
-
- @Override
- public ByteBuffer getResponse() {
- return response;
+ this.streamOffset = streamOffset;
}
@Override
@@ -42,12 +39,16 @@ public class DataStreamReplyByteBuffer implements
DataStreamReply {
}
@Override
- public long getDataOffset() {
- return dataOffset;
+ public long getStreamOffset() {
+ return streamOffset;
}
@Override
- public long getDataLength() {
- return response.capacity();
+ public String toString() {
+ return getClass().getSimpleName() + "{"
+ + "streamId=" + getStreamId()
+ + ", streamOffset=" + getStreamOffset()
+ + ", dataLength=" + getDataLength()
+ + '}';
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
similarity index 65%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
copy to
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 856eed1..8370b25 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -15,11 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.ratis.datastream.impl;
-package org.apache.ratis.protocol;
+import org.apache.ratis.protocol.DataStreamReply;
import java.nio.ByteBuffer;
-public interface DataStreamReply extends DataStreamMessage {
- ByteBuffer getResponse();
-}
\ No newline at end of file
+/**
+ * Implements {@link DataStreamReply} with {@link ByteBuffer}.
+ *
+ * This class is immutable.
+ */
+public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
implements DataStreamReply {
+ public DataStreamReplyByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
+ super(streamId, streamOffset, buffer);
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
similarity index 65%
copy from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
copy to
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 856eed1..fb0f3ec 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -15,11 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.ratis.datastream.impl;
-package org.apache.ratis.protocol;
+import org.apache.ratis.protocol.DataStreamRequest;
import java.nio.ByteBuffer;
-public interface DataStreamReply extends DataStreamMessage {
- ByteBuffer getResponse();
-}
\ No newline at end of file
+/**
+ * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
+ *
+ * This class is immutable.
+ */
+public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer
implements DataStreamRequest {
+ public DataStreamRequestByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
+ super(streamId, streamOffset, buffer);
+ }
+}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
similarity index 93%
rename from
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
rename to
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index b05694c..10ca737 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamMessage.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -18,10 +18,10 @@
package org.apache.ratis.protocol;
-public interface DataStreamMessage{
+public interface DataStreamPacket {
long getStreamId();
- long getDataOffset();
+ long getStreamOffset();
long getDataLength();
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 856eed1..6ac5253 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,8 +18,5 @@
package org.apache.ratis.protocol;
-import java.nio.ByteBuffer;
-
-public interface DataStreamReply extends DataStreamMessage {
- ByteBuffer getResponse();
+public interface DataStreamReply extends DataStreamPacket {
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index b2431d6..8db10fe 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -18,5 +18,5 @@
package org.apache.ratis.protocol;
-public interface DataStreamRequest extends DataStreamMessage {
+public interface DataStreamRequest extends DataStreamPacket {
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
index 775833b..e91f606 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
@@ -18,7 +18,7 @@
package org.apache.ratis.netty.client;
-import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
index 6bafffa..0864b12 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamRequestEncoder.java
@@ -18,7 +18,7 @@
package org.apache.ratis.netty.client;
-import org.apache.ratis.protocol.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -30,12 +30,13 @@ public class DataStreamRequestEncoder extends
MessageToMessageEncoder<DataStream
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
- DataStreamRequestByteBuffer requestData, List<Object>
list) {
+ DataStreamRequestByteBuffer request, List<Object> list) {
+ final ByteBuffer data = request.slice();
ByteBuffer bb = ByteBuffer.allocateDirect(24);
- bb.putLong(requestData.getStreamId());
- bb.putLong(requestData.getDataOffset());
- bb.putLong(requestData.getDataLength());
+ bb.putLong(request.getStreamId());
+ bb.putLong(request.getStreamOffset());
+ bb.putLong(data.remaining());
bb.flip();
- list.add(Unpooled.wrappedBuffer(bb, requestData.getBuf()));
+ list.add(Unpooled.wrappedBuffer(bb, data));
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
index 71132d9..52ab54d 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
@@ -18,7 +18,7 @@
package org.apache.ratis.netty.server;
-import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -26,17 +26,15 @@ import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncode
import java.nio.ByteBuffer;
import java.util.List;
-public class DataStreamReplyEncoder extends
MessageToMessageEncoder<DataStreamReply> {
+public class DataStreamReplyEncoder extends
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
@Override
- protected void encode(ChannelHandlerContext channelHandlerContext,
- DataStreamReply dataStreamReply,
- List<Object> list) {
+ protected void encode(ChannelHandlerContext context,
DataStreamReplyByteBuffer reply, List<Object> list) {
ByteBuffer bb = ByteBuffer.allocateDirect(24);
- bb.putLong(dataStreamReply.getStreamId());
- bb.putLong(dataStreamReply.getDataOffset());
- bb.putLong(dataStreamReply.getDataLength());
+ bb.putLong(reply.getStreamId());
+ bb.putLong(reply.getStreamOffset());
+ bb.putLong(reply.getDataLength());
bb.flip();
- list.add(Unpooled.wrappedBuffer(bb, dataStreamReply.getResponse()));
+ list.add(Unpooled.wrappedBuffer(bb, reply.slice()));
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 6d5e4f6..b782481 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -18,36 +18,29 @@
package org.apache.ratis.netty.server;
+import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
-class DataStreamRequestByteBuf implements DataStreamRequest {
- private long streamId;
- private long dataOffset;
- private ByteBuf buf;
-
- DataStreamRequestByteBuf(long streamId, long dataOffset, ByteBuf buf) {
- this.streamId = streamId;
- this.dataOffset = dataOffset;
- this.buf = buf;
- }
-
- @Override
- public long getStreamId() {
- return streamId;
- }
+/**
+ * Implements {@link DataStreamRequest} with {@link ByteBuf}.
+ *
+ * This class is immutable.
+ */
+class DataStreamRequestByteBuf extends DataStreamPacketImpl implements
DataStreamRequest {
+ private final ByteBuf buf;
- @Override
- public long getDataOffset() {
- return dataOffset;
+ DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf buf) {
+ super(streamId, streamOffset);
+ this.buf = buf.asReadOnly();
}
@Override
public long getDataLength() {
- return buf.capacity();
+ return buf.readableBytes();
}
- public ByteBuf getBuf() {
- return buf;
+ public ByteBuf slice() {
+ return buf.slice();
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 6ccabbc..c8092ef 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -20,8 +20,7 @@ package org.apache.ratis.netty.server;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
@@ -79,31 +78,34 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
}
}
- private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
+ private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
if (released) {
- return;
+ return 0;
}
try {
if (stream == null) {
- return;
+ return 0;
}
final WritableByteChannel channel = stream.getWritableByteChannel();
+ long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
try {
- channel.write(buffer);
+ byteWritten += channel.write(buffer);
} catch (Throwable t) {
throw new CompletionException(t);
}
}
+ return byteWritten;
} finally {
buf.release();
}
}
- private void sendReply(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
- final DataStreamReply reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getDataOffset(),
ByteBuffer.wrap("OK".getBytes()));
+ private void sendReply(DataStreamRequestByteBuf request, long byteWritten,
ChannelHandlerContext ctx) {
+ // TODO RATIS-1098: include byteWritten and isSuccess in the reply
+ final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+ request.getStreamId(), request.getStreamOffset(),
ByteBuffer.wrap("OK".getBytes()));
ctx.writeAndFlush(reply);
}
@@ -111,13 +113,12 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
- final long streamId = req.getStreamId();
- final ByteBuf buf = req.getBuf();
+ final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
+ final ByteBuf buf = request.slice();
final AtomicBoolean released = new AtomicBoolean();
- streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf,
released))
- .thenAccept(stream -> writeTo(buf, stream, released.get()))
- .thenAccept(dummy -> sendReply(req, ctx));
+ streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf, released))
+ .thenApply(stream -> writeTo(buf, stream, released.get()))
+ .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
}
};
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
index f49564c..f37035c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
@@ -87,12 +87,12 @@ public abstract class MessageStreamApiTests<CLUSTER extends
MiniRaftCluster> ext
@Test
public void testStreamAsync() throws Exception {
final RaftProperties p = getProperties();
- RaftClientConfigKeys.Stream.setSubmessageSize(p, SUBMESSAGE_SIZE);
+ RaftClientConfigKeys.MessageStream.setSubmessageSize(p, SUBMESSAGE_SIZE);
p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
runWithNewCluster(NUM_SERVERS, this::runTestStreamAsync);
- RaftClientConfigKeys.Stream.setSubmessageSize(p);
+ RaftClientConfigKeys.MessageStream.setSubmessageSize(p);
}
void runTestStreamAsync(CLUSTER cluster) throws Exception {