This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9141f74  RATIS-1098. DataStreamReply should include byteWritten and 
isSuccess. (#228)
9141f74 is described below

commit 9141f74fe894bf06cc095675d856d85feb2baafc
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Thu Oct 22 07:17:35 2020 +0800

    RATIS-1098. DataStreamReply should include byteWritten and isSuccess. (#228)
---
 .../impl/DataStreamPacketByteBuffer.java           |  4 +-
 .../datastream/impl/DataStreamReplyByteBuffer.java | 24 +++++++-
 .../apache/ratis/protocol/DataStreamPacket.java    |  8 +++
 .../DataStreamPacketHeader.java}                   | 43 ++++++++------
 .../org/apache/ratis/protocol/DataStreamReply.java | 20 +++++++
 .../ratis/protocol/DataStreamReplyHeader.java      | 67 ++++++++++++++++++++++
 .../apache/ratis/netty/NettyDataStreamUtils.java   | 50 ++++++++++++++++
 .../ratis/netty/client/DataStreamReplyDecoder.java | 24 ++------
 .../ratis/netty/server/DataStreamReplyEncoder.java | 12 ++--
 .../ratis/netty/server/NettyServerStreamRpc.java   |  7 +--
 .../apache/ratis/datastream/TestDataStream.java    | 20 +++++--
 11 files changed, 223 insertions(+), 56 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index 02fc04a..15870a5 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -25,11 +25,13 @@ import java.nio.ByteBuffer;
  * This class is immutable.
  */
 public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
+  private static final ByteBuffer EMPTY = 
ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
+
   private final ByteBuffer buffer;
 
   public DataStreamPacketByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
     super(streamId, streamOffset);
-    this.buffer = buffer.asReadOnlyBuffer();
+    this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY;
   }
 
   @Override
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 8370b25..230c59b 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream.impl;
 
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamReplyHeader;
 
 import java.nio.ByteBuffer;
 
@@ -27,7 +28,28 @@ import java.nio.ByteBuffer;
  * This class is immutable.
  */
 public class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer 
implements DataStreamReply {
-  public DataStreamReplyByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
+  private final long bytesWritten;
+  private final boolean success;
+
+  public DataStreamReplyByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer,
+      long bytesWritten, boolean success ) {
     super(streamId, streamOffset, buffer);
+
+    this.success = success;
+    this.bytesWritten = bytesWritten;
+  }
+
+  public DataStreamReplyByteBuffer(DataStreamReplyHeader header, ByteBuffer 
buffer) {
+    this(header.getStreamId(), header.getStreamOffset(), buffer, 
header.getBytesWritten(), header.isSuccess());
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  public boolean isSuccess() {
+    return success;
   }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index 10ca737..faf4648 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -18,10 +18,18 @@
 
 package org.apache.ratis.protocol;
 
+import java.util.function.LongConsumer;
+
 public interface DataStreamPacket {
   long getStreamId();
 
   long getStreamOffset();
 
   long getDataLength();
+
+  default void putTo(LongConsumer putLong) {
+    putLong.accept(getStreamId());
+    putLong.accept(getStreamOffset());
+    putLong.accept(getDataLength());
+  }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
similarity index 50%
copy from 
ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
copy to 
ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 02fc04a..b34216b 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -15,29 +15,38 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.datastream.impl;
 
-import java.nio.ByteBuffer;
+package org.apache.ratis.protocol;
 
-/**
- * Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link 
ByteBuffer}.
- *
- * This class is immutable.
- */
-public class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
-  private final ByteBuffer buffer;
+import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
+import org.apache.ratis.util.SizeInBytes;
+
+import java.util.function.LongSupplier;
+
+/** The header format is streamId, streamOffset, dataLength. */
+public class DataStreamPacketHeader extends DataStreamPacketImpl {
+  private static final SizeInBytes SIZE = SizeInBytes.valueOf(24);
+
+  public static int getSize() {
+    return SIZE.getSizeInt();
+  }
 
-  public DataStreamPacketByteBuffer(long streamId, long streamOffset, 
ByteBuffer buffer) {
+  public static DataStreamPacketHeader read(LongSupplier readLong, int 
readableBytes) {
+    if (readableBytes < getSize()) {
+      return null;
+    }
+    return new DataStreamPacketHeader(readLong.getAsLong(), 
readLong.getAsLong(), readLong.getAsLong());
+  }
+
+  private final long dataLength;
+
+  public DataStreamPacketHeader(long streamId, long streamOffset, long 
dataLength) {
     super(streamId, streamOffset);
-    this.buffer = buffer.asReadOnlyBuffer();
+    this.dataLength = dataLength;
   }
 
   @Override
   public long getDataLength() {
-    return buffer.remaining();
-  }
-
-  public ByteBuffer slice() {
-    return buffer.slice();
+    return dataLength;
   }
-}
+}
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 6ac5253..44a6ea0 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,5 +18,25 @@
 
 package org.apache.ratis.protocol;
 
+import java.util.function.LongConsumer;
+
 public interface DataStreamReply extends DataStreamPacket {
+  static boolean getSuccess(long flags) {
+    return flags == 0;
+  }
+
+  static long toFlags(boolean success) {
+    return success? 0: 1;
+  }
+
+  boolean isSuccess();
+
+  long getBytesWritten();
+
+  @Override
+  default void putTo(LongConsumer putLong) {
+    DataStreamPacket.super.putTo(putLong);
+    putLong.accept(getBytesWritten());
+    putLong.accept(toFlags(isSuccess()));
+  }
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
new file mode 100644
index 0000000..1db731c
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.util.SizeInBytes;
+
+import java.util.function.LongSupplier;
+
+/** The header format is {@link DataStreamPacketHeader}, bytesWritten and 
flags. */
+public class DataStreamReplyHeader extends DataStreamPacketHeader implements 
DataStreamReply {
+  private static final SizeInBytes SIZE = 
SizeInBytes.valueOf(DataStreamPacketHeader.getSize() + 16);
+
+  public static int getSize() {
+    return SIZE.getSizeInt();
+  }
+
+  public static DataStreamReplyHeader read(LongSupplier readLong, int 
readableBytes) {
+    if (readableBytes < getSize()) {
+      return null;
+    }
+    final DataStreamPacketHeader packerHeader = 
DataStreamPacketHeader.read(readLong, readableBytes);
+    if (packerHeader == null) {
+      return null;
+    }
+    return new DataStreamReplyHeader(packerHeader, readLong.getAsLong(),
+        DataStreamReply.getSuccess(readLong.getAsLong()));
+  }
+
+  private final long bytesWritten;
+  private final boolean success;
+
+  public DataStreamReplyHeader(long streamId, long streamOffset, long 
dataLength, long bytesWritten, boolean success) {
+    super(streamId, streamOffset, dataLength);
+    this.bytesWritten = bytesWritten;
+    this.success = success;
+  }
+
+  public DataStreamReplyHeader(DataStreamPacketHeader header, long 
bytesWritten, boolean success) {
+    this(header.getStreamId(), header.getStreamOffset(), 
header.getDataLength(), bytesWritten, success);
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  public boolean isSuccess() {
+    return success;
+  }
+}
\ No newline at end of file
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
new file mode 100644
index 0000000..21f8d04
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+public interface NettyDataStreamUtils {
+  static DataStreamReplyByteBuffer decode(ByteBuf buf) {
+    final DataStreamReplyHeader header = 
DataStreamReplyHeader.read(buf::readLong, buf.readableBytes());
+    if (header == null) {
+      return null;
+    }
+    final int dataLength = Math.toIntExact(header.getDataLength());
+
+    if (buf.readableBytes() < dataLength) {
+      buf.resetReaderIndex();
+      return null;
+    }
+
+    final ByteBuffer buffer;
+    if (dataLength > 0) {
+      buffer = buf.slice(buf.readerIndex(), dataLength).nioBuffer();
+      buf.readerIndex(buf.readerIndex() + dataLength);
+    } else {
+      buffer = null;
+    }
+    buf.markReaderIndex();
+
+    return new DataStreamReplyByteBuffer(header, buffer);
+  }
+}
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
index e91f606..0e92b9f 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/DataStreamReplyDecoder.java
@@ -18,33 +18,17 @@
 
 package org.apache.ratis.netty.client;
 
-import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
 
 import java.util.List;
+import java.util.Optional;
 
 public class DataStreamReplyDecoder extends ByteToMessageDecoder {
-
   @Override
-  protected void decode(ChannelHandlerContext channelHandlerContext,
-                        ByteBuf byteBuf, List<Object> list) {
-
-    if(byteBuf.readableBytes() >= 24){
-      long streamId = byteBuf.readLong();
-      long dataOffset = byteBuf.readLong();
-      long dataLength = byteBuf.readLong();
-      if(byteBuf.readableBytes() >= dataLength){
-        DataStreamReplyByteBuffer reply = new 
DataStreamReplyByteBuffer(streamId,
-            dataOffset,
-            byteBuf.slice(byteBuf.readerIndex(), (int) 
dataLength).nioBuffer());
-        byteBuf.readerIndex(byteBuf.readerIndex() + (int)dataLength);
-        byteBuf.markReaderIndex();
-        list.add(reply);
-      } else {
-        byteBuf.resetReaderIndex();
-      }
-    }
+  protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf 
buf, List<Object> list) {
+    Optional.ofNullable(NettyDataStreamUtils.decode(buf)).ifPresent(list::add);
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
index 52ab54d..58ac232 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamReplyEncoder.java
@@ -19,6 +19,7 @@
 package org.apache.ratis.netty.server;
 
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.protocol.DataStreamReplyHeader;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
@@ -29,12 +30,9 @@ import java.util.List;
 public class DataStreamReplyEncoder extends 
MessageToMessageEncoder<DataStreamReplyByteBuffer> {
   @Override
   protected void encode(ChannelHandlerContext context, 
DataStreamReplyByteBuffer reply, List<Object> list) {
-
-    ByteBuffer bb = ByteBuffer.allocateDirect(24);
-    bb.putLong(reply.getStreamId());
-    bb.putLong(reply.getStreamOffset());
-    bb.putLong(reply.getDataLength());
-    bb.flip();
-    list.add(Unpooled.wrappedBuffer(bb, reply.slice()));
+    final ByteBuffer buffer = 
ByteBuffer.allocateDirect(DataStreamReplyHeader.getSize());
+    reply.putTo(buffer::putLong);
+    buffer.flip();
+    list.add(Unpooled.wrappedBuffer(buffer, reply.slice()));
   }
 }
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 5f41c0f..59ec77b 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -176,10 +176,9 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, 
ChannelHandlerContext ctx) {
-    // TODO RATIS-1098: include byteWritten and isSuccess in the reply
+  private void sendReply(DataStreamRequestByteBuf request, long bytesWritten, 
ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), 
ByteBuffer.wrap("OK".getBytes()));
+        request.getStreamId(), request.getStreamOffset(), null, bytesWritten, 
true);
     ctx.writeAndFlush(reply);
   }
 
@@ -212,7 +211,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
 
         JavaUtils.allOf(remoteWrites).thenCombine(localWrite, (v, 
bytesWritten) -> {
               buf.release();
-              sendReply(request, ctx);
+              sendReply(request, bytesWritten, ctx);
               return null;
         });
       }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 1077697..6e115eb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -187,23 +187,31 @@ public class TestDataStream extends BaseTest {
     DataStreamClientImpl.DataStreamOutputImpl impl = 
(DataStreamClientImpl.DataStreamOutputImpl) out;
 
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-
-    // add header
-    futures.add(impl.getHeaderFuture());
+    final List<Integer> sizes = new ArrayList<>();
 
     //send data
     final int halfBufferSize = bufferSize/2;
     int dataSize = 0;
     for(int i = 0; i < bufferNum; i++) {
       final int size = halfBufferSize + 
ThreadLocalRandom.current().nextInt(halfBufferSize);
+      sizes.add(size);
+
       final ByteBuffer bf = initBuffer(dataSize, size);
       futures.add(out.writeAsync(bf));
       dataSize += size;
     }
 
-    //join all requests
-    for(CompletableFuture<DataStreamReply> f : futures) {
-      f.join();
+    { // check header
+      final DataStreamReply reply = impl.getHeaderFuture().join();
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(0, reply.getBytesWritten());
+    }
+
+    // check writeAsync requests
+    for(int i = 0; i < futures.size(); i++) {
+      final DataStreamReply reply = futures.get(i).join();
+      Assert.assertTrue(reply.isSuccess());
+      Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
     }
 
     for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {

Reply via email to