This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 802f00e  RATIS-1448. Include commit infos in DataStreamReply. (#546)
802f00e is described below

commit 802f00e192fad5416bfd548ce29a796afbee73ee
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Dec 1 13:19:14 2021 +0800

    RATIS-1448. Include commit infos in DataStreamReply. (#546)
---
 .../datastream/impl/DataStreamReplyByteBuffer.java | 25 ++++++++++++--
 .../org/apache/ratis/protocol/DataStreamReply.java |  7 ++++
 .../ratis/protocol/DataStreamReplyHeader.java      | 14 +++++++-
 .../apache/ratis/netty/NettyDataStreamUtils.java   |  4 ++-
 .../ratis/netty/server/DataStreamManagement.java   | 40 +++++++++++++++-------
 ratis-proto/src/main/proto/Raft.proto              |  2 ++
 .../java/org/apache/ratis/server/RaftServer.java   |  5 +++
 .../apache/ratis/server/impl/RaftServerImpl.java   |  3 +-
 8 files changed, 82 insertions(+), 18 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 3ef6251..5cb5569 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.datastream.impl;
 
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamPacket;
 import org.apache.ratis.protocol.DataStreamReply;
@@ -24,6 +25,8 @@ import org.apache.ratis.protocol.DataStreamReplyHeader;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Implements {@link DataStreamReply} with {@link ByteBuffer}.
@@ -40,6 +43,7 @@ public final class DataStreamReplyByteBuffer extends 
DataStreamPacketByteBuffer
 
     private boolean success;
     private long bytesWritten;
+    private Collection<CommitInfoProto> commitInfos;
 
     private Builder() {}
 
@@ -78,10 +82,16 @@ public final class DataStreamReplyByteBuffer extends 
DataStreamPacketByteBuffer
       return this;
     }
 
+    public Builder setCommitInfos(Collection<CommitInfoProto> commitInfos) {
+      this.commitInfos = commitInfos;
+      return this;
+    }
+
     public Builder setDataStreamReplyHeader(DataStreamReplyHeader header) {
       return setDataStreamPacket(header)
           .setSuccess(header.isSuccess())
-          .setBytesWritten(header.getBytesWritten());
+          .setBytesWritten(header.getBytesWritten())
+          .setCommitInfos(header.getCommitInfos());
     }
 
     public Builder setDataStreamPacket(DataStreamPacket packet) {
@@ -92,7 +102,8 @@ public final class DataStreamReplyByteBuffer extends 
DataStreamPacketByteBuffer
     }
 
     public DataStreamReplyByteBuffer build() {
-      return new DataStreamReplyByteBuffer(clientId, type, streamId, 
streamOffset, buffer, success, bytesWritten);
+      return new DataStreamReplyByteBuffer(
+          clientId, type, streamId, streamOffset, buffer, success, 
bytesWritten, commitInfos);
     }
   }
 
@@ -102,13 +113,16 @@ public final class DataStreamReplyByteBuffer extends 
DataStreamPacketByteBuffer
 
   private final boolean success;
   private final long bytesWritten;
+  private final Collection<CommitInfoProto> commitInfos;
 
+  @SuppressWarnings("parameternumber")
   private DataStreamReplyByteBuffer(ClientId clientId, Type type, long 
streamId, long streamOffset, ByteBuffer buffer,
-      boolean success, long bytesWritten) {
+      boolean success, long bytesWritten, Collection<CommitInfoProto> 
commitInfos) {
     super(clientId, type, streamId, streamOffset, buffer);
 
     this.success = success;
     this.bytesWritten = bytesWritten;
+    this.commitInfos = commitInfos != null? commitInfos: 
Collections.emptyList();
   }
 
   @Override
@@ -122,6 +136,11 @@ public final class DataStreamReplyByteBuffer extends 
DataStreamPacketByteBuffer
   }
 
   @Override
+  public Collection<CommitInfoProto> getCommitInfos() {
+    return commitInfos;
+  }
+
+  @Override
   public String toString() {
     return super.toString()
         + "," + (success? "SUCCESS": "FAILED")
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 9f50cc8..459aee3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,9 +18,16 @@
 
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+
+import java.util.Collection;
+
 public interface DataStreamReply extends DataStreamPacket {
 
   boolean isSuccess();
 
   long getBytesWritten();
+
+  /** @return the commit information when the reply is created. */
+  Collection<CommitInfoProto> getCommitInfos();
 }
\ No newline at end of file
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index f0dee6a..502b426 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,18 +18,25 @@
 
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
+import java.util.Collection;
+import java.util.Collections;
+
 /** The header format is {@link DataStreamPacketHeader}, bytesWritten and 
flags. */
 public class DataStreamReplyHeader extends DataStreamPacketHeader implements 
DataStreamReply {
   private final long bytesWritten;
   private final boolean success;
+  private final Collection<CommitInfoProto> commitInfos;
 
+  @SuppressWarnings("parameternumber")
   public DataStreamReplyHeader(ClientId clientId, Type type, long streamId, 
long streamOffset, long dataLength,
-      long bytesWritten, boolean success) {
+      long bytesWritten, boolean success, Collection<CommitInfoProto> 
commitInfos) {
     super(clientId, type, streamId, streamOffset, dataLength);
     this.bytesWritten = bytesWritten;
     this.success = success;
+    this.commitInfos = commitInfos != null? commitInfos: 
Collections.emptyList();
   }
 
   @Override
@@ -41,4 +48,9 @@ public class DataStreamReplyHeader extends 
DataStreamPacketHeader implements Dat
   public boolean isSuccess() {
     return success;
   }
+
+  @Override
+  public Collection<CommitInfoProto> getCommitInfos() {
+    return commitInfos;
+  }
 }
\ No newline at end of file
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 5c93f2a..0f595c7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -81,6 +81,7 @@ public interface NettyDataStreamUtils {
         .setPacketHeader(b)
         .setBytesWritten(reply.getBytesWritten())
         .setSuccess(reply.isSuccess())
+        .addAllCommitInfos(reply.getCommitInfos())
         .build()
         .toByteString()
         .asReadOnlyByteBuffer();
@@ -208,7 +209,8 @@ public interface NettyDataStreamUtils {
       if (header.getPacketHeader().getDataLength() + headerBufLen <= 
buf.readableBytes()) {
         buf.readerIndex(buf.readerIndex() + headerBufLen);
         return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()), 
h.getType(), h.getStreamId(),
-            h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), 
header.getSuccess());
+            h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), 
header.getSuccess(),
+            header.getCommitInfosList());
       } else {
         buf.resetReaderIndex();
         return null;
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 84679f6..7539245 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -26,6 +26,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.ClientId;
@@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -133,6 +135,18 @@ public class DataStreamManagement {
       return request;
     }
 
+    Division getDivision() throws IOException {
+      return server.getDivision(request.getRaftGroupId());
+    }
+
+    Collection<CommitInfoProto> getCommitInfos() {
+      try {
+        return getDivision().getCommitInfos();
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
     boolean isPrimary() {
       return primary;
     }
@@ -151,8 +165,7 @@ public class DataStreamManagement {
     }
 
     private Set<RaftPeer> getSuccessors(RaftPeerId peerId) throws IOException {
-      final RaftGroupId groupId = request.getRaftGroupId();
-      final RaftConfiguration conf = server.getDivision(groupId).getRaftConf();
+      final RaftConfiguration conf = getDivision().getRaftConf();
       final RoutingTable routingTable = request.getRoutingTable();
 
       if (routingTable != null) {
@@ -276,32 +289,34 @@ public class DataStreamManagement {
     return byteWritten;
   }
 
-  static long close(DataStream stream) {
+  static void close(DataStream stream) {
     try {
       stream.getDataChannel().close();
-      return 0L;
     } catch (IOException e) {
       throw new CompletionException("Failed to close " + stream, e);
     }
   }
 
-  static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
-      DataStreamRequestByteBuf request, RaftClientReply reply, long 
bytesWritten) {
+  static DataStreamReplyByteBuffer 
newDataStreamReplyByteBuffer(DataStreamRequestByteBuf request,
+      RaftClientReply reply, long bytesWritten, Collection<CommitInfoProto> 
commitInfos) {
     final ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
     return DataStreamReplyByteBuffer.newBuilder()
         .setDataStreamPacket(request)
         .setBuffer(buffer)
         .setSuccess(reply.isSuccess())
         .setBytesWritten(bytesWritten)
+        .setCommitInfos(commitInfos)
         .build();
   }
 
   static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
-      DataStreamRequestByteBuf request, long bytesWritten, 
ChannelHandlerContext ctx) {
+      DataStreamRequestByteBuf request, long bytesWritten, 
Collection<CommitInfoProto> commitInfos,
+      ChannelHandlerContext ctx) {
     final boolean success = checkSuccessRemoteWrite(remoteWrites, 
bytesWritten, request);
     final DataStreamReplyByteBuffer.Builder builder = 
DataStreamReplyByteBuffer.newBuilder()
         .setDataStreamPacket(request)
-        .setSuccess(success);
+        .setSuccess(success)
+        .setCommitInfos(commitInfos);
     if (success) {
       builder.setBytesWritten(bytesWritten);
     }
@@ -316,7 +331,8 @@ public class DataStreamManagement {
           .getRaftClient()
           .async());
       return asyncRpcApi.sendForward(info.request).thenAcceptAsync(
-          reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, 
reply, bytesWritten)), requestExecutor);
+          reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, 
reply, bytesWritten, info.getCommitInfos())),
+          requestExecutor);
     } catch (IOException e) {
       throw new CompletionException(e);
     }
@@ -345,7 +361,7 @@ public class DataStreamManagement {
       ChannelHandlerContext ctx) {
     LOG.warn("Failed to process {}",  request, throwable);
     try {
-      ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0));
+      ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0, null));
     } catch (Throwable t) {
       LOG.warn("Failed to sendDataStreamException {} for {}", throwable, 
request, t);
     }
@@ -389,13 +405,13 @@ public class DataStreamManagement {
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           if (request.getType() == Type.STREAM_HEADER
               || (request.getType() == Type.STREAM_DATA && !close)) {
-            sendReply(remoteWrites, request, bytesWritten, ctx);
+            sendReply(remoteWrites, request, bytesWritten, 
info.getCommitInfos(), ctx);
           } else if (close) {
             if (info.isPrimary()) {
               // after all server close stream, primary server start 
transaction
               startTransaction(info, request, bytesWritten, ctx);
             } else {
-              sendReply(remoteWrites, request, bytesWritten, ctx);
+              sendReply(remoteWrites, request, bytesWritten, 
info.getCommitInfos(), ctx);
             }
           } else {
             throw new IllegalStateException(this + ": Unexpected type " + 
request.getType() + ", request=" + request);
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 092e18c..109841f 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -344,6 +344,8 @@ message DataStreamReplyHeaderProto {
   DataStreamPacketHeaderProto packetHeader = 1;
   uint64 bytesWritten = 2;
   bool success = 3;
+
+  repeated CommitInfoProto commitInfos = 15;
 }
 
 message NotLeaderExceptionProto {
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index b8ee749..70e4dff 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
@@ -38,6 +39,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -96,6 +98,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
     /** @return the storage of this division. */
     RaftStorage getRaftStorage();
 
+    /** @return the commit information of this division. */
+    Collection<CommitInfoProto> getCommitInfos();
+
     /** @return the retry cache of this division. */
     RetryCache getRetryCache();
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 31df3da..eebcd15 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -493,7 +493,8 @@ class RaftServerImpl implements RaftServer.Division,
     getState().setRaftConf(e);
   }
 
-  Collection<CommitInfoProto> getCommitInfos() {
+  @Override
+  public Collection<CommitInfoProto> getCommitInfos() {
     final List<CommitInfoProto> infos = new ArrayList<>();
     // add the commit info of this server
     infos.add(updateCommitInfoCache());

Reply via email to