This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9141f74 RATIS-1098. DataStreamReply should include byteWritten and
isSuccess. (#228)
9141f74 is described below
commit 9141f74fe894bf06cc095675d856d85feb2baafc
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 22 07:17:35 2020 +0800
RATIS-1098. DataStreamReply should include byteWritten and isSuccess. (#228)
---
.../impl/DataStreamPacketByteBuffer.java | 4 +-
.../datastream/impl/DataStreamReplyByteBuffer.java | 24 +++++++-
.../apache/ratis/protocol/DataStreamPacket.java | 8 +++
.../DataStreamPacketHeader.java} | 43 ++++++++------
.../org/apache/ratis/protocol/DataStreamReply.java | 20 +++++++
.../ratis/protocol/DataStreamReplyHeader.java | 67 ++++++++++++++++++++++
.../apache/ratis/netty/NettyDataStreamUtils.java | 50 ++++++++++++++++
.../ratis/netty/client/DataStreamReplyDecoder.java | 24 ++------
.../ratis/netty/server/DataStreamReplyEncoder.java | 12 ++--
.../ratis/netty/server/NettyServerStreamRpc.java | 7 +--
.../apache/ratis/datastream/TestDataStream.java | 20 +++++--
11 files changed, 223 insertions(+), 56 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 02fc04a..15870a5 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -25,11 +25,13 @@ import java.nio.ByteBuffer;
* This class is immutable.
*/
public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
+ private static final ByteBuffer EMPTY =
ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
+
private final ByteBuffer buffer;
public DataStreamPacketByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
super(streamId, streamOffset);
- this.buffer = buffer.asReadOnlyBuffer();
+ this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY;
}
@Override
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 8370b25..230c59b 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream.impl;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamReplyHeader;
import java.nio.ByteBuffer;
@@ -27,7 +28,28 @@ import java.nio.ByteBuffer;
* This class is immutable.
*/
public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
implements DataStreamReply {
- public DataStreamReplyByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
+ private final long bytesWritten;
+ private final boolean success;
+
+ public DataStreamReplyByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer,
+ long bytesWritten, boolean success ) {
super(streamId, streamOffset, buffer);
+
+ this.success = success;
+ this.bytesWritten = bytesWritten;
+ }
+
+ public DataStreamReplyByteBuffer(DataStreamReplyHeader header, ByteBuffer
buffer) {
+ this(header.getStreamId(), header.getStreamOffset(), buffer,
header.getBytesWritten(), header.isSuccess());
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return success;
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index 10ca737..faf4648 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -18,10 +18,18 @@
package org.apache.ratis.protocol;
+import java.util.function.LongConsumer;
+
public interface DataStreamPacket {
long getStreamId();
long getStreamOffset();
long getDataLength();
+
+ default void putTo(LongConsumer putLong) {
+ putLong.accept(getStreamId());
+ putLong.accept(getStreamOffset());
+ putLong.accept(getDataLength());
+ }
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
similarity index 50%
copy from
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
copy to
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 02fc04a..b34216b 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -15,29 +15,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.datastream.impl;
-import java.nio.ByteBuffer;
+package org.apache.ratis.protocol;
-/**
- * Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link
ByteBuffer}.
- *
- * This class is immutable.
- */
-public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
- private final ByteBuffer buffer;
+import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
+import org.apache.ratis.util.SizeInBytes;
+
+import java.util.function.LongSupplier;
+
+/** The header format is streamId, streamOffset, dataLength. */
+public class DataStreamPacketHeader extends DataStreamPacketImpl {
+ private static final SizeInBytes SIZE = SizeInBytes.valueOf(24);
+
+ public static int getSize() {
+ return SIZE.getSizeInt();
+ }
- public DataStreamPacketByteBuffer(long streamId, long streamOffset,
ByteBuffer buffer) {
+ public static DataStreamPacketHeader read(LongSupplier readLong, int
readableBytes) {
+ if (readableBytes < getSize()) {
+ return null;
+ }
+ return new DataStreamPacketHeader(readLong.getAsLong(),
readLong.getAsLong(), readLong.getAsLong());
+ }
+
+ private final long dataLength;
+
+ public DataStreamPacketHeader(long streamId, long streamOffset, long
dataLength) {
super(streamId, streamOffset);
- this.buffer = buffer.asReadOnlyBuffer();
+ this.dataLength = dataLength;
}
@Override
public long getDataLength() {
- return buffer.remaining();
- }
-
- public ByteBuffer slice() {
- return buffer.slice();
+ return dataLength;
}
-}
+}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 6ac5253..44a6ea0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,5 +18,25 @@
package org.apache.ratis.protocol;
+import java.util.function.LongConsumer;
+
public interface DataStreamReply extends DataStreamPacket {
+ static boolean getSuccess(long flags) {
+ return flags == 0;
+ }
+
+ static long toFlags(boolean success) {
+ return success? 0: 1;
+ }
+
+ boolean isSuccess();
+
+ long getBytesWritten();
+
+ @Override
+ default void putTo(LongConsumer putLong) {
+ DataStreamPacket.super.putTo(putLong);
+ putLong.accept(getBytesWritten());
+ putLong.accept(toFlags(isSuccess()));
+ }
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
new file mode 100644
index 0000000..1db731c
--- /dev/null
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.util.SizeInBytes;
+
+import java.util.function.LongSupplier;
+
+/** The header format is {@link DataStreamPacketHeader}, bytesWritten and
flags. */
+public class DataStreamReplyHeader extends DataStreamPacketHeader implements
DataStreamReply {
+ private static final SizeInBytes SIZE =
SizeInBytes.valueOf(DataStreamPacketHeader.getSize() + 16);
+
+ public static int getSize() {
+ return SIZE.getSizeInt();
+ }
+
+ public static DataStreamReplyHeader read(LongSupplier readLong, int
readableBytes) {
+ if (readableBytes < getSize()) {
+ return null;
+ }
+ final DataStreamPacketHeader packerHeader =
DataStreamPacketHeader.read(readLong, readableBytes);
+ if (packerHeader == null) {
+ return null;
+ }
+ return new DataStreamReplyHeader(packerHeader, readLong.getAsLong(),
+ DataStreamReply.getSuccess(readLong.getAsLong()));
+ }
+
+ private final long bytesWritten;
+ private final boolean success;
+
+ public DataStreamReplyHeader(long streamId, long streamOffset, long
dataLength, long bytesWritten, boolean success) {
+ super(streamId, streamOffset, dataLength);
+ this.bytesWritten = bytesWritten;
+ this.success = success;
+ }
+
+ public DataStreamReplyHeader(DataStreamPacketHeader header, long
bytesWritten, boolean success) {
+ this(header.getStreamId(), header.getStreamOffset(),
header.getDataLength(), bytesWritten, success);
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ @Override
+ public boolean isSuccess() {
+ return success;
+ }
+}
\ No newline at end of file
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
new file mode 100644
index 0000000..21f8d04
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+public interface NettyDataStreamUtils {
+ static DataStreamReplyByteBuffer decode(ByteBuf buf) {
+ final DataStreamReplyHeader header =
DataStreamReplyHeader.read(buf::readLong, buf.readableBytes());
+ if (header == null) {
+ return null;
+ }
+ final int dataLength = Math.toIntExact(header.getDataLength());
+
+ if (buf.readableBytes() < dataLength) {
+ buf.resetReaderIndex();
+ return null;
+ }
+
+ final ByteBuffer buffer;
+ if (dataLength > 0) {
+ buffer = buf.slice(buf.readerIndex(), dataLength).nioBuffer();
+ buf.readerIndex(buf.readerIndex() + dataLength);
+ } else {
+ buffer = null;
+ }
+ buf.markReaderIndex();
+
+ return new DataStreamReplyByteBuffer(header, buffer);
+ }
+}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
index e91f606..0e92b9f 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
@@ -18,33 +18,17 @@
package org.apache.ratis.netty.client;
-import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
+import java.util.Optional;
public class DataStreamReplyDecoder extends ByteToMessageDecoder {
-
@Override
- protected void decode(ChannelHandlerContext channelHandlerContext,
- ByteBuf byteBuf, List<Object> list) {
-
- if(byteBuf.readableBytes() >= 24){
- long streamId = byteBuf.readLong();
- long dataOffset = byteBuf.readLong();
- long dataLength = byteBuf.readLong();
- if(byteBuf.readableBytes() >= dataLength){
- DataStreamReplyByteBuffer reply = new
DataStreamReplyByteBuffer(streamId,
- dataOffset,
- byteBuf.slice(byteBuf.readerIndex(), (int)
dataLength).nioBuffer());
- byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
- byteBuf.markReaderIndex();
- list.add(reply);
- } else {
- byteBuf.resetReaderIndex();
- }
- }
+ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf
buf, List<Object> list) {
+ Optional.ofNullable(NettyDataStreamUtils.decode(buf)).ifPresent(list::add);
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
index 52ab54d..58ac232 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
@@ -19,6 +19,7 @@
package org.apache.ratis.netty.server;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.protocol.DataStreamReplyHeader;
import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -29,12 +30,9 @@ import java.util.List;
public class DataStreamReplyEncoder extends
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
@Override
protected void encode(ChannelHandlerContext context,
DataStreamReplyByteBuffer reply, List<Object> list) {
-
- ByteBuffer bb = ByteBuffer.allocateDirect(24);
- bb.putLong(reply.getStreamId());
- bb.putLong(reply.getStreamOffset());
- bb.putLong(reply.getDataLength());
- bb.flip();
- list.add(Unpooled.wrappedBuffer(bb, reply.slice()));
+ final ByteBuffer buffer =
ByteBuffer.allocateDirect(DataStreamReplyHeader.getSize());
+ reply.putTo(buffer::putLong);
+ buffer.flip();
+ list.add(Unpooled.wrappedBuffer(buffer, reply.slice()));
}
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 5f41c0f..59ec77b 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -176,10 +176,9 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return byteWritten;
}
- private void sendReply(DataStreamRequestByteBuf request,
ChannelHandlerContext ctx) {
- // TODO RATIS-1098: include byteWritten and isSuccess in the reply
+ private void sendReply(DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(),
ByteBuffer.wrap("OK".getBytes()));
+ request.getStreamId(), request.getStreamOffset(), null, bytesWritten,
true);
ctx.writeAndFlush(reply);
}
@@ -212,7 +211,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
JavaUtils.allOf(remoteWrites).thenCombine(localWrite, (v,
bytesWritten) -> {
buf.release();
- sendReply(request, ctx);
+ sendReply(request, bytesWritten, ctx);
return null;
});
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 1077697..6e115eb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -187,23 +187,31 @@ public class TestDataStream extends BaseTest {
DataStreamClientImpl.DataStreamOutputImpl impl =
(DataStreamClientImpl.DataStreamOutputImpl) out;
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-
- // add header
- futures.add(impl.getHeaderFuture());
+ final List<Integer> sizes = new ArrayList<>();
//send data
final int halfBufferSize = bufferSize/2;
int dataSize = 0;
for(int i = 0; i < bufferNum; i++) {
final int size = halfBufferSize +
ThreadLocalRandom.current().nextInt(halfBufferSize);
+ sizes.add(size);
+
final ByteBuffer bf = initBuffer(dataSize, size);
futures.add(out.writeAsync(bf));
dataSize += size;
}
- //join all requests
- for(CompletableFuture<DataStreamReply> f : futures) {
- f.join();
+ { // check header
+ final DataStreamReply reply = impl.getHeaderFuture().join();
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertEquals(0, reply.getBytesWritten());
+ }
+
+ // check writeAsync requests
+ for(int i = 0; i < futures.size(); i++) {
+ final DataStreamReply reply = futures.get(i).join();
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
}
for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {