This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4e8940d RATIS-1106. Add type for DataStream (#232)
4e8940d is described below
commit 4e8940daf2f818c4e5d64c627bc971110edff62b
Author: runzhiwang <[email protected]>
AuthorDate: Tue Oct 27 11:56:51 2020 +0800
RATIS-1106. Add type for DataStream (#232)
---
.../ratis/client/impl/DataStreamClientImpl.java | 6 +-
.../ratis/client/impl/OrderedStreamAsync.java | 9 +--
.../impl/DataStreamPacketByteBuffer.java | 5 +-
.../datastream/impl/DataStreamPacketImpl.java | 11 +++-
.../datastream/impl/DataStreamReplyByteBuffer.java | 8 ++-
.../impl/DataStreamRequestByteBuffer.java | 5 +-
.../apache/ratis/protocol/DataStreamPacket.java | 10 +---
.../ratis/protocol/DataStreamPacketHeader.java | 25 ++------
.../org/apache/ratis/protocol/DataStreamReply.java | 21 -------
.../ratis/protocol/DataStreamReplyHeader.java | 57 ++++++++++++-------
.../apache/ratis/protocol/DataStreamRequest.java | 4 --
.../ratis/protocol/DataStreamRequestHeader.java | 53 +++++++++++------
.../apache/ratis/netty/NettyDataStreamUtils.java | 66 +++++++++++++++++++---
.../ratis/netty/client/NettyClientStreamRpc.java | 2 +-
.../netty/server/DataStreamRequestByteBuf.java | 7 ++-
.../ratis/netty/server/NettyServerStreamRpc.java | 9 +--
ratis-proto/src/main/proto/Raft.proto | 24 ++++++++
.../apache/ratis/datastream/TestDataStream.java | 3 +
18 files changed, 206 insertions(+), 119 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 0bfd662..51e9eb6 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -30,6 +30,7 @@ import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,13 +78,14 @@ public class DataStreamClientImpl implements
DataStreamClient {
this.header = new RaftClientRequest(clientId, raftServer.getId(),
groupId, RaftClientImpl.nextCallId(),
RaftClientRequest.writeRequestType());
this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
-
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer());
+
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(),
Type.STREAM_HEADER);
}
// send to the attached dataStreamClientRpc
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
- final CompletableFuture<DataStreamReply> f =
orderedStreamAsync.sendRequest(streamId, streamOffset, buf);
+ final CompletableFuture<DataStreamReply> f =
orderedStreamAsync.sendRequest(streamId, streamOffset, buf,
+ Type.STREAM_DATA);
streamOffset += buf.remaining();
return f;
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index fb4a7d9..6bfca51 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -42,8 +43,8 @@ public class OrderedStreamAsync {
private final long seqNum;
private final CompletableFuture<DataStreamReply> replyFuture = new
CompletableFuture<>();
- DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long
seqNum){
- super(streamId, offset, data);
+ DataStreamWindowRequest(long streamId, long offset, ByteBuffer data, long
seqNum, Type type){
+ super(streamId, offset, data, type);
this.seqNum = seqNum;
}
@@ -86,7 +87,7 @@ public class OrderedStreamAsync {
this.requestSemaphore = new
Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties)*2);
}
- CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset,
ByteBuffer data){
+ CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset,
ByteBuffer data, Type type){
final int length = data.remaining();
try {
requestSemaphore.acquire();
@@ -95,7 +96,7 @@ public class OrderedStreamAsync {
"Interrupted when sending streamId=" + streamId + ", offset= " +
offset + ", length=" + length, e));
}
final LongFunction<DataStreamWindowRequest> constructor
- = seqNum -> new DataStreamWindowRequest(streamId, offset,
data.slice(), seqNum);
+ = seqNum -> new DataStreamWindowRequest(streamId, offset,
data.slice(), seqNum, type);
return slidingWindow.submitNewRequest(constructor,
this::sendRequestToNetwork).
getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 6eea669..fcb420d 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.datastream.impl;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import java.nio.ByteBuffer;
/**
@@ -27,8 +28,8 @@ public abstract class DataStreamPacketByteBuffer extends
DataStreamPacketImpl {
private final ByteBuffer buffer;
- public DataStreamPacketByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
- super(streamId, streamOffset);
+ public DataStreamPacketByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer, Type type) {
+ super(streamId, streamOffset, type);
this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY;
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index 7f76042..f47caee 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream.impl;
import org.apache.ratis.protocol.DataStreamPacket;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
/**
* This is an abstract implementation of {@link DataStreamPacket}.
@@ -27,10 +28,12 @@ import org.apache.ratis.protocol.DataStreamPacket;
public abstract class DataStreamPacketImpl implements DataStreamPacket {
private final long streamId;
private final long streamOffset;
+ private final Type type;
- public DataStreamPacketImpl(long streamId, long streamOffset) {
+ public DataStreamPacketImpl(long streamId, long streamOffset, Type type) {
this.streamId = streamId;
this.streamOffset = streamOffset;
+ this.type = type;
}
@Override
@@ -44,11 +47,17 @@ public abstract class DataStreamPacketImpl implements
DataStreamPacket {
}
@Override
+ public Type getType() {
+ return type;
+ }
+
+ @Override
public String toString() {
return getClass().getSimpleName() + "{"
+ "streamId=" + getStreamId()
+ ", streamOffset=" + getStreamOffset()
+ ", dataLength=" + getDataLength()
+ + ", type=" + getType()
+ '}';
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 230c59b..3e0736e 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -19,6 +19,7 @@ package org.apache.ratis.datastream.impl;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import java.nio.ByteBuffer;
@@ -32,15 +33,16 @@ public class DataStreamReplyByteBuffer extends
DataStreamPacketByteBuffer implem
private final boolean success;
public DataStreamReplyByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer,
- long bytesWritten, boolean success ) {
- super(streamId, streamOffset, buffer);
+ long bytesWritten, boolean success, Type type) {
+ super(streamId, streamOffset, buffer, type);
this.success = success;
this.bytesWritten = bytesWritten;
}
public DataStreamReplyByteBuffer(DataStreamReplyHeader header, ByteBuffer
buffer) {
- this(header.getStreamId(), header.getStreamOffset(), buffer,
header.getBytesWritten(), header.isSuccess());
+ this(header.getStreamId(), header.getStreamOffset(), buffer,
header.getBytesWritten(), header.isSuccess(),
+ header.getType());
}
@Override
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index fb0f3ec..7e63340 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream.impl;
import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import java.nio.ByteBuffer;
@@ -27,7 +28,7 @@ import java.nio.ByteBuffer;
* This class is immutable.
*/
public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer
implements DataStreamRequest {
- public DataStreamRequestByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
- super(streamId, streamOffset, buffer);
+ public DataStreamRequestByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer, Type type) {
+ super(streamId, streamOffset, buffer, type);
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index fa3f706..995ed16 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -18,7 +18,7 @@
package org.apache.ratis.protocol;
-import java.util.function.LongConsumer;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
public interface DataStreamPacket {
long getStreamId();
@@ -27,11 +27,5 @@ public interface DataStreamPacket {
long getDataLength();
- int getHeaderSize();
-
- default void writeHeaderTo(LongConsumer putLong) {
- putLong.accept(getStreamId());
- putLong.accept(getStreamOffset());
- putLong.accept(getDataLength());
- }
+ Type getType();
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 56274b6..f3d705a 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -19,29 +19,17 @@
package org.apache.ratis.protocol;
import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.util.SizeInBytes;
-import java.util.function.LongSupplier;
-
/** The header format is streamId, streamOffset, dataLength. */
public class DataStreamPacketHeader extends DataStreamPacketImpl {
- private static final SizeInBytes SIZE = SizeInBytes.valueOf(24);
-
- public static int getSize() {
- return SIZE.getSizeInt();
- }
-
- public static DataStreamPacketHeader read(LongSupplier readLong, int
readableBytes) {
- if (readableBytes < getSize()) {
- return null;
- }
- return new DataStreamPacketHeader(readLong.getAsLong(),
readLong.getAsLong(), readLong.getAsLong());
- }
+ private static final SizeInBytes SIZE_OF_HEADER_LEN = SizeInBytes.valueOf(4);
private final long dataLength;
- public DataStreamPacketHeader(long streamId, long streamOffset, long
dataLength) {
- super(streamId, streamOffset);
+ public DataStreamPacketHeader(long streamId, long streamOffset, long
dataLength, Type type) {
+ super(streamId, streamOffset, type);
this.dataLength = dataLength;
}
@@ -50,8 +38,7 @@ public class DataStreamPacketHeader extends
DataStreamPacketImpl {
return dataLength;
}
- @Override
- public int getHeaderSize() {
- return getSize();
+ public static int getSizeOfHeaderLen() {
+ return SIZE_OF_HEADER_LEN.getSizeInt();
}
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 9fb8fa7..9f50cc8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,30 +18,9 @@
package org.apache.ratis.protocol;
-import java.util.function.LongConsumer;
-
public interface DataStreamReply extends DataStreamPacket {
- static boolean getSuccess(long flags) {
- return flags == 0;
- }
-
- static long toFlags(boolean success) {
- return success? 0: 1;
- }
boolean isSuccess();
long getBytesWritten();
-
- @Override
- default int getHeaderSize() {
- return DataStreamReplyHeader.getSize();
- }
-
- @Override
- default void writeHeaderTo(LongConsumer putLong) {
- DataStreamPacket.super.writeHeaderTo(putLong);
- putLong.accept(getBytesWritten());
- putLong.accept(toFlags(isSuccess()));
- }
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index 1db731c..e829b9f 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,43 +18,62 @@
package org.apache.ratis.protocol;
-import org.apache.ratis.util.SizeInBytes;
-
-import java.util.function.LongSupplier;
+import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** The header format is {@link DataStreamPacketHeader}, bytesWritten and
flags. */
public class DataStreamReplyHeader extends DataStreamPacketHeader implements
DataStreamReply {
- private static final SizeInBytes SIZE =
SizeInBytes.valueOf(DataStreamPacketHeader.getSize() + 16);
+ private static final Logger LOG =
LoggerFactory.getLogger(DataStreamReplyHeader.class);
- public static int getSize() {
- return SIZE.getSizeInt();
- }
+ public static DataStreamReplyHeader read(ByteBuf buf) {
+ if (getSizeOfHeaderLen() > buf.readableBytes()) {
+ return null;
+ }
- public static DataStreamReplyHeader read(LongSupplier readLong, int
readableBytes) {
- if (readableBytes < getSize()) {
+ int headerBufLen = buf.readInt();
+ if (headerBufLen > buf.readableBytes()) {
+ buf.resetReaderIndex();
return null;
}
- final DataStreamPacketHeader packerHeader =
DataStreamPacketHeader.read(readLong, readableBytes);
- if (packerHeader == null) {
+
+ try {
+ ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+ DataStreamReplyHeaderProto header =
DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+ if (header.getPacketHeader().getDataLength() + headerBufLen <=
buf.readableBytes()) {
+ buf.readerIndex(buf.readerIndex() + headerBufLen);
+ return new DataStreamReplyHeader(
+ header.getPacketHeader().getStreamId(),
+ header.getPacketHeader().getStreamOffset(),
+ header.getPacketHeader().getDataLength(),
+ header.getPacketHeader().getType(),
+ header.getBytesWritten(),
+ header.getSuccess());
+ } else {
+ buf.resetReaderIndex();
+ return null;
+ }
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Fail to decode reply header:", e);
+ buf.resetReaderIndex();
return null;
}
- return new DataStreamReplyHeader(packerHeader, readLong.getAsLong(),
- DataStreamReply.getSuccess(readLong.getAsLong()));
}
private final long bytesWritten;
private final boolean success;
- public DataStreamReplyHeader(long streamId, long streamOffset, long
dataLength, long bytesWritten, boolean success) {
- super(streamId, streamOffset, dataLength);
+ public DataStreamReplyHeader(
+ long streamId, long streamOffset, long dataLength, Type type, long
bytesWritten, boolean success) {
+ super(streamId, streamOffset, dataLength, type);
this.bytesWritten = bytesWritten;
this.success = success;
}
- public DataStreamReplyHeader(DataStreamPacketHeader header, long
bytesWritten, boolean success) {
- this(header.getStreamId(), header.getStreamOffset(),
header.getDataLength(), bytesWritten, success);
- }
-
@Override
public long getBytesWritten() {
return bytesWritten;
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index 797ddf4..8db10fe 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -19,8 +19,4 @@
package org.apache.ratis.protocol;
public interface DataStreamRequest extends DataStreamPacket {
- @Override
- default int getHeaderSize() {
- return DataStreamRequestHeader.getSize();
- }
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index d6646df..b87d68f 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,37 +18,54 @@
package org.apache.ratis.protocol;
-import org.apache.ratis.util.SizeInBytes;
-
-import java.util.function.LongSupplier;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The header format is the same {@link DataStreamPacketHeader}
* since there are no additional fields.
*/
public class DataStreamRequestHeader extends DataStreamPacketHeader implements
DataStreamRequest {
- private static final SizeInBytes SIZE =
SizeInBytes.valueOf(DataStreamPacketHeader.getSize());
-
- public static int getSize() {
- return SIZE.getSizeInt();
- }
+ private static final Logger LOG =
LoggerFactory.getLogger(DataStreamRequestHeader.class);
- public static DataStreamRequestHeader read(LongSupplier readLong, int
readableBytes) {
- if (readableBytes < getSize()) {
+ public static DataStreamRequestHeader read(ByteBuf buf) {
+ if (getSizeOfHeaderLen() > buf.readableBytes()) {
return null;
}
- final DataStreamPacketHeader packerHeader =
DataStreamPacketHeader.read(readLong, readableBytes);
- if (packerHeader == null) {
+
+ int headerBufLen = buf.readInt();
+ if (headerBufLen > buf.readableBytes()) {
+ buf.resetReaderIndex();
return null;
}
- return new DataStreamRequestHeader(packerHeader);
- }
- public DataStreamRequestHeader(long streamId, long streamOffset, long
dataLength) {
- super(streamId, streamOffset, dataLength);
+ try {
+ ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
+ DataStreamRequestHeaderProto header =
DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
+
+ if (header.getPacketHeader().getDataLength() + headerBufLen <=
buf.readableBytes()) {
+ buf.readerIndex(buf.readerIndex() + headerBufLen);
+ return new DataStreamRequestHeader(
+ header.getPacketHeader().getStreamId(),
+ header.getPacketHeader().getStreamOffset(),
+ header.getPacketHeader().getDataLength(),
+ header.getPacketHeader().getType());
+ } else {
+ buf.resetReaderIndex();
+ return null;
+ }
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Fail to decode request header:", e);
+ buf.resetReaderIndex();
+ return null;
+ }
}
- public DataStreamRequestHeader(DataStreamPacketHeader header) {
- this(header.getStreamId(), header.getStreamOffset(),
header.getDataLength());
+ public DataStreamRequestHeader(long streamId, long streamOffset, long
dataLength, Type type) {
+ super(streamId, streamOffset, dataLength, type);
}
}
\ No newline at end of file
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 757c179..0aadfea 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -17,9 +17,12 @@
*/
package org.apache.ratis.netty;
-import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
+import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
import org.apache.ratis.protocol.DataStreamPacketHeader;
import org.apache.ratis.protocol.DataStreamReplyHeader;
import org.apache.ratis.protocol.DataStreamRequestHeader;
@@ -27,27 +30,74 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
public interface NettyDataStreamUtils {
- static void encodeDataStreamPacketByteBuffer(DataStreamPacketByteBuffer
packet, Consumer<ByteBuf> out) {
- final ByteBuf buf =
PooledByteBufAllocator.DEFAULT.directBuffer(packet.getHeaderSize());
- packet.writeHeaderTo(buf::writeLong);
- out.accept(buf);
- out.accept(Unpooled.wrappedBuffer(packet.slice()));
+
+ static ByteBuffer
getDataStreamRequestHeaderProtoByteBuf(DataStreamRequestByteBuffer request) {
+ DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
+ .newBuilder()
+ .setStreamId(request.getStreamId())
+ .setStreamOffset(request.getStreamOffset())
+ .setType(request.getType())
+ .setDataLength(request.getDataLength());
+ return DataStreamRequestHeaderProto
+ .newBuilder()
+ .setPacketHeader(b)
+ .build()
+ .toByteString()
+ .asReadOnlyByteBuffer();
+ }
+
+ static ByteBuffer
getDataStreamReplyHeaderProtoByteBuf(DataStreamReplyByteBuffer reply) {
+ DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
+ .newBuilder()
+ .setStreamId(reply.getStreamId())
+ .setStreamOffset(reply.getStreamOffset())
+ .setType(reply.getType())
+ .setDataLength(reply.getDataLength());
+ return DataStreamReplyHeaderProto
+ .newBuilder()
+ .setPacketHeader(b)
+ .setBytesWritten(reply.getBytesWritten())
+ .setSuccess(reply.isSuccess())
+ .build()
+ .toByteString()
+ .asReadOnlyByteBuffer();
+ }
+
+ static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer
request, Consumer<ByteBuf> out) {
+ ByteBuffer headerBuf = getDataStreamRequestHeaderProtoByteBuf(request);
+ final ByteBuf headerLenBuf =
+
PooledByteBufAllocator.DEFAULT.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
+ headerLenBuf.writeInt(headerBuf.remaining());
+ out.accept(headerLenBuf);
+ out.accept(Unpooled.wrappedBuffer(headerBuf));
+ out.accept(Unpooled.wrappedBuffer(request.slice()));
+ }
+
+ static void encodeDataStreamReplyByteBuffer(DataStreamReplyByteBuffer reply,
Consumer<ByteBuf> out) {
+ ByteBuffer headerBuf = getDataStreamReplyHeaderProtoByteBuf(reply);
+ final ByteBuf headerLenBuf =
+
PooledByteBufAllocator.DEFAULT.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
+ headerLenBuf.writeInt(headerBuf.remaining());
+ out.accept(headerLenBuf);
+ out.accept(Unpooled.wrappedBuffer(headerBuf));
+ out.accept(Unpooled.wrappedBuffer(reply.slice()));
}
static DataStreamRequestByteBuf decodeDataStreamRequestByteBuf(ByteBuf buf) {
- return Optional.ofNullable(DataStreamRequestHeader.read(buf::readLong,
buf.readableBytes()))
+ return Optional.ofNullable(DataStreamRequestHeader.read(buf))
.map(header -> checkHeader(header, buf))
.map(header -> new DataStreamRequestByteBuf(header, decodeData(buf,
header, ByteBuf::retain)))
.orElse(null);
}
static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf
buf) {
- return Optional.ofNullable(DataStreamReplyHeader.read(buf::readLong,
buf.readableBytes()))
+ return Optional.ofNullable(DataStreamReplyHeader.read(buf))
.map(header -> checkHeader(header, buf))
.map(header -> new DataStreamReplyByteBuffer(header, decodeData(buf,
header, ByteBuf::nioBuffer)))
.orElse(null);
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index c6b6266..5478880 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -88,7 +88,7 @@ public class NettyClientStreamRpc implements
DataStreamClientRpc {
return new MessageToMessageEncoder<DataStreamRequestByteBuffer>() {
@Override
protected void encode(ChannelHandlerContext context,
DataStreamRequestByteBuffer request, List<Object> out) {
- NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(request,
out::add);
+ NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request,
out::add);
}
};
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 9f67c75..499e6a5 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -21,6 +21,7 @@ package org.apache.ratis.netty.server;
import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
@@ -32,13 +33,13 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements
DataStreamRequest {
private final ByteBuf buf;
- public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf
buf) {
- super(streamId, streamOffset);
+ public DataStreamRequestByteBuf(long streamId, long streamOffset, ByteBuf
buf, Type type) {
+ super(streamId, streamOffset, type);
this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
}
public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf)
{
- this(header.getStreamId(), header.getStreamOffset(), buf);
+ this(header.getStreamId(), header.getStreamOffset(), buf,
header.getType());
}
@Override
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index b248572..bc98720 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -26,6 +26,7 @@ import
org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -185,13 +186,13 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private void sendReplyNotSuccess(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), null, -1, false);
+ request.getStreamId(), request.getStreamOffset(), null, -1, false,
request.getType());
ctx.writeAndFlush(reply);
}
private void sendReplySuccess(DataStreamRequestByteBuf request, long
bytesWritten, ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), null, bytesWritten,
true);
+ request.getStreamId(), request.getStreamOffset(), null, bytesWritten,
true, request.getType());
ctx.writeAndFlush(reply);
}
@@ -217,7 +218,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
IOException {
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)
msg;
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getStreamOffset() == -1;
+ final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final CompletableFuture<Long> localWrite = isHeader ?
streams.computeIfAbsent(request.getStreamId(), id ->
getDataStreamFuture(buf)).thenApply(stream -> 0L)
@@ -291,7 +292,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return new MessageToMessageEncoder<DataStreamReplyByteBuffer>() {
@Override
protected void encode(ChannelHandlerContext context,
DataStreamReplyByteBuffer reply, List<Object> out) {
- NettyDataStreamUtils.encodeDataStreamPacketByteBuffer(reply, out::add);
+ NettyDataStreamUtils.encodeDataStreamReplyByteBuffer(reply, out::add);
}
};
}
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 4b82228..3a19f70 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -279,6 +279,30 @@ message RaftClientRequestProto {
}
}
+message DataStreamPacketHeaderProto {
+ enum Type {
+ STREAM_HEADER = 0;
+ STREAM_DATA = 1;
+ STREAM_CLOSE = 2;
+ START_TRANSACTION = 3;
+ }
+
+ uint64 streamId = 1;
+ uint64 streamOffset = 2;
+ Type type = 3;
+ uint64 dataLength = 4;
+}
+
+message DataStreamRequestHeaderProto {
+ DataStreamPacketHeaderProto packetHeader = 1;
+}
+
+message DataStreamReplyHeaderProto {
+ DataStreamPacketHeaderProto packetHeader = 1;
+ uint64 bytesWritten = 2;
+ bool success = 3;
+}
+
message NotLeaderExceptionProto {
RaftPeerProto suggestedLeader = 1;
repeated RaftPeerProto peersInConf = 2;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 6e115eb..f398174 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -29,6 +29,7 @@ import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -205,6 +206,7 @@ public class TestDataStream extends BaseTest {
final DataStreamReply reply = impl.getHeaderFuture().join();
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(0, reply.getBytesWritten());
+ Assert.assertEquals(reply.getType(), Type.STREAM_HEADER);
}
// check writeAsync requests
@@ -212,6 +214,7 @@ public class TestDataStream extends BaseTest {
final DataStreamReply reply = futures.get(i).join();
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
+ Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
}
for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {