This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 802f00e RATIS-1448. Include commit infos in DataStreamReply. (#546)
802f00e is described below
commit 802f00e192fad5416bfd548ce29a796afbee73ee
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Dec 1 13:19:14 2021 +0800
RATIS-1448. Include commit infos in DataStreamReply. (#546)
---
.../datastream/impl/DataStreamReplyByteBuffer.java | 25 ++++++++++++--
.../org/apache/ratis/protocol/DataStreamReply.java | 7 ++++
.../ratis/protocol/DataStreamReplyHeader.java | 14 +++++++-
.../apache/ratis/netty/NettyDataStreamUtils.java | 4 ++-
.../ratis/netty/server/DataStreamManagement.java | 40 +++++++++++++++-------
ratis-proto/src/main/proto/Raft.proto | 2 ++
.../java/org/apache/ratis/server/RaftServer.java | 5 +++
.../apache/ratis/server/impl/RaftServerImpl.java | 3 +-
8 files changed, 82 insertions(+), 18 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index 3ef6251..5cb5569 100644
---
a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++
b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.datastream.impl;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.DataStreamReply;
@@ -24,6 +25,8 @@ import org.apache.ratis.protocol.DataStreamReplyHeader;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
/**
* Implements {@link DataStreamReply} with {@link ByteBuffer}.
@@ -40,6 +43,7 @@ public final class DataStreamReplyByteBuffer extends
DataStreamPacketByteBuffer
private boolean success;
private long bytesWritten;
+ private Collection<CommitInfoProto> commitInfos;
private Builder() {}
@@ -78,10 +82,16 @@ public final class DataStreamReplyByteBuffer extends
DataStreamPacketByteBuffer
return this;
}
+ public Builder setCommitInfos(Collection<CommitInfoProto> commitInfos) {
+ this.commitInfos = commitInfos;
+ return this;
+ }
+
public Builder setDataStreamReplyHeader(DataStreamReplyHeader header) {
return setDataStreamPacket(header)
.setSuccess(header.isSuccess())
- .setBytesWritten(header.getBytesWritten());
+ .setBytesWritten(header.getBytesWritten())
+ .setCommitInfos(header.getCommitInfos());
}
public Builder setDataStreamPacket(DataStreamPacket packet) {
@@ -92,7 +102,8 @@ public final class DataStreamReplyByteBuffer extends
DataStreamPacketByteBuffer
}
public DataStreamReplyByteBuffer build() {
- return new DataStreamReplyByteBuffer(clientId, type, streamId,
streamOffset, buffer, success, bytesWritten);
+ return new DataStreamReplyByteBuffer(
+ clientId, type, streamId, streamOffset, buffer, success,
bytesWritten, commitInfos);
}
}
@@ -102,13 +113,16 @@ public final class DataStreamReplyByteBuffer extends
DataStreamPacketByteBuffer
private final boolean success;
private final long bytesWritten;
+ private final Collection<CommitInfoProto> commitInfos;
+ @SuppressWarnings("parameternumber")
private DataStreamReplyByteBuffer(ClientId clientId, Type type, long
streamId, long streamOffset, ByteBuffer buffer,
- boolean success, long bytesWritten) {
+ boolean success, long bytesWritten, Collection<CommitInfoProto>
commitInfos) {
super(clientId, type, streamId, streamOffset, buffer);
this.success = success;
this.bytesWritten = bytesWritten;
+ this.commitInfos = commitInfos != null? commitInfos:
Collections.emptyList();
}
@Override
@@ -122,6 +136,11 @@ public final class DataStreamReplyByteBuffer extends
DataStreamPacketByteBuffer
}
@Override
+ public Collection<CommitInfoProto> getCommitInfos() {
+ return commitInfos;
+ }
+
+ @Override
public String toString() {
return super.toString()
+ "," + (success? "SUCCESS": "FAILED")
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
index 9f50cc8..459aee3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReply.java
@@ -18,9 +18,16 @@
package org.apache.ratis.protocol;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+
+import java.util.Collection;
+
public interface DataStreamReply extends DataStreamPacket {
boolean isSuccess();
long getBytesWritten();
+
+ /** @return the commit information when the reply is created. */
+ Collection<CommitInfoProto> getCommitInfos();
}
\ No newline at end of file
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index f0dee6a..502b426 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,18 +18,25 @@
package org.apache.ratis.protocol;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import java.util.Collection;
+import java.util.Collections;
+
/** The header format is {@link DataStreamPacketHeader}, bytesWritten and
flags. */
public class DataStreamReplyHeader extends DataStreamPacketHeader implements
DataStreamReply {
private final long bytesWritten;
private final boolean success;
+ private final Collection<CommitInfoProto> commitInfos;
+ @SuppressWarnings("parameternumber")
public DataStreamReplyHeader(ClientId clientId, Type type, long streamId,
long streamOffset, long dataLength,
- long bytesWritten, boolean success) {
+ long bytesWritten, boolean success, Collection<CommitInfoProto>
commitInfos) {
super(clientId, type, streamId, streamOffset, dataLength);
this.bytesWritten = bytesWritten;
this.success = success;
+ this.commitInfos = commitInfos != null? commitInfos:
Collections.emptyList();
}
@Override
@@ -41,4 +48,9 @@ public class DataStreamReplyHeader extends
DataStreamPacketHeader implements Dat
public boolean isSuccess() {
return success;
}
+
+ @Override
+ public Collection<CommitInfoProto> getCommitInfos() {
+ return commitInfos;
+ }
}
\ No newline at end of file
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 5c93f2a..0f595c7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -81,6 +81,7 @@ public interface NettyDataStreamUtils {
.setPacketHeader(b)
.setBytesWritten(reply.getBytesWritten())
.setSuccess(reply.isSuccess())
+ .addAllCommitInfos(reply.getCommitInfos())
.build()
.toByteString()
.asReadOnlyByteBuffer();
@@ -208,7 +209,8 @@ public interface NettyDataStreamUtils {
if (header.getPacketHeader().getDataLength() + headerBufLen <=
buf.readableBytes()) {
buf.readerIndex(buf.readerIndex() + headerBufLen);
return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()),
h.getType(), h.getStreamId(),
- h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(),
header.getSuccess());
+ h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(),
header.getSuccess(),
+ header.getCommitInfosList());
} else {
buf.resetReaderIndex();
return null;
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 84679f6..7539245 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -26,6 +26,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.ClientId;
@@ -57,6 +58,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -133,6 +135,18 @@ public class DataStreamManagement {
return request;
}
+ Division getDivision() throws IOException {
+ return server.getDivision(request.getRaftGroupId());
+ }
+
+ Collection<CommitInfoProto> getCommitInfos() {
+ try {
+ return getDivision().getCommitInfos();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
boolean isPrimary() {
return primary;
}
@@ -151,8 +165,7 @@ public class DataStreamManagement {
}
private Set<RaftPeer> getSuccessors(RaftPeerId peerId) throws IOException {
- final RaftGroupId groupId = request.getRaftGroupId();
- final RaftConfiguration conf = server.getDivision(groupId).getRaftConf();
+ final RaftConfiguration conf = getDivision().getRaftConf();
final RoutingTable routingTable = request.getRoutingTable();
if (routingTable != null) {
@@ -276,32 +289,34 @@ public class DataStreamManagement {
return byteWritten;
}
- static long close(DataStream stream) {
+ static void close(DataStream stream) {
try {
stream.getDataChannel().close();
- return 0L;
} catch (IOException e) {
throw new CompletionException("Failed to close " + stream, e);
}
}
- static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
- DataStreamRequestByteBuf request, RaftClientReply reply, long
bytesWritten) {
+ static DataStreamReplyByteBuffer
newDataStreamReplyByteBuffer(DataStreamRequestByteBuf request,
+ RaftClientReply reply, long bytesWritten, Collection<CommitInfoProto>
commitInfos) {
final ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
return DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
.setBuffer(buffer)
.setSuccess(reply.isSuccess())
.setBytesWritten(bytesWritten)
+ .setCommitInfos(commitInfos)
.build();
}
static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
- DataStreamRequestByteBuf request, long bytesWritten,
ChannelHandlerContext ctx) {
+ DataStreamRequestByteBuf request, long bytesWritten,
Collection<CommitInfoProto> commitInfos,
+ ChannelHandlerContext ctx) {
final boolean success = checkSuccessRemoteWrite(remoteWrites,
bytesWritten, request);
final DataStreamReplyByteBuffer.Builder builder =
DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
- .setSuccess(success);
+ .setSuccess(success)
+ .setCommitInfos(commitInfos);
if (success) {
builder.setBytesWritten(bytesWritten);
}
@@ -316,7 +331,8 @@ public class DataStreamManagement {
.getRaftClient()
.async());
return asyncRpcApi.sendForward(info.request).thenAcceptAsync(
- reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request,
reply, bytesWritten)), requestExecutor);
+ reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request,
reply, bytesWritten, info.getCommitInfos())),
+ requestExecutor);
} catch (IOException e) {
throw new CompletionException(e);
}
@@ -345,7 +361,7 @@ public class DataStreamManagement {
ChannelHandlerContext ctx) {
LOG.warn("Failed to process {}", request, throwable);
try {
- ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0));
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0, null));
} catch (Throwable t) {
LOG.warn("Failed to sendDataStreamException {} for {}", throwable,
request, t);
}
@@ -389,13 +405,13 @@ public class DataStreamManagement {
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
if (request.getType() == Type.STREAM_HEADER
|| (request.getType() == Type.STREAM_DATA && !close)) {
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ sendReply(remoteWrites, request, bytesWritten,
info.getCommitInfos(), ctx);
} else if (close) {
if (info.isPrimary()) {
// after all server close stream, primary server start
transaction
startTransaction(info, request, bytesWritten, ctx);
} else {
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ sendReply(remoteWrites, request, bytesWritten,
info.getCommitInfos(), ctx);
}
} else {
throw new IllegalStateException(this + ": Unexpected type " +
request.getType() + ", request=" + request);
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 092e18c..109841f 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -344,6 +344,8 @@ message DataStreamReplyHeaderProto {
DataStreamPacketHeaderProto packetHeader = 1;
uint64 bytesWritten = 2;
bool success = 3;
+
+ repeated CommitInfoProto commitInfos = 15;
}
message NotLeaderExceptionProto {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index b8ee749..70e4dff 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.metrics.RaftServerMetrics;
@@ -38,6 +39,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
@@ -96,6 +98,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the storage of this division. */
RaftStorage getRaftStorage();
+ /** @return the commit information of this division. */
+ Collection<CommitInfoProto> getCommitInfos();
+
/** @return the retry cache of this division. */
RetryCache getRetryCache();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 31df3da..eebcd15 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -493,7 +493,8 @@ class RaftServerImpl implements RaftServer.Division,
getState().setRaftConf(e);
}
- Collection<CommitInfoProto> getCommitInfos() {
+ @Override
+ public Collection<CommitInfoProto> getCommitInfos() {
final List<CommitInfoProto> infos = new ArrayList<>();
// add the commit info of this server
infos.add(updateCommitInfoCache());