This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 715bc0f RATIS-1083. Create transaction once stream data replicated to
all servers (#237)
715bc0f is described below
commit 715bc0f9eaf885febf0f2fc0a817f800eb314151
Author: runzhiwang <[email protected]>
AuthorDate: Wed Nov 4 22:54:25 2020 +0800
RATIS-1083. Create transaction once stream data replicated to all servers
(#237)
* RATIS-1083. Create a transaction once the stream data is replicated to
all servers
* fix code review
* mark primary
* Revert "mark primary"
This reverts commit 2d698e585573912f915176d1dce1600ad6a8d2e7.
* fix code review
* add STREAM_CLOSE_FORWARD
* fix code review
* Move closeFowardAsync to DataStreamOutput
* rebase master
* use ExecutorService in async
---
.../apache/ratis/client/api/DataStreamOutput.java | 6 ++
.../ratis/client/impl/DataStreamClientImpl.java | 17 +++-
.../ratis/netty/server/NettyServerStreamRpc.java | 106 +++++++++++++++++++--
ratis-proto/src/main/proto/Raft.proto | 3 +-
.../ratis/datastream/DataStreamBaseTest.java | 55 ++++++++++-
.../ratis/datastream/TestDataStreamNetty.java | 70 ++++++++++++++
6 files changed, 242 insertions(+), 15 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 39f62ca..429a422 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -28,6 +28,12 @@ public interface DataStreamOutput extends
CloseAsync<DataStreamReply> {
/** Send out the data in the buffer asynchronously */
CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
+ /** Create a transaction asynchronously once the stream data is replicated
to all servers */
+ CompletableFuture<DataStreamReply> startTransactionAsync();
+
/** Get the future of the header request. */
CompletableFuture<DataStreamReply> getHeaderFuture();
+
+ /** Peer close asynchronously. */
+ CompletableFuture<DataStreamReply> closeForwardAsync();
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 6b27f93..7f3546e 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -87,10 +88,22 @@ public class DataStreamClientImpl implements
DataStreamClient {
return f;
}
- // should wait for attached sliding window to terminate
@Override
public CompletableFuture<DataStreamReply> closeAsync() {
- return null;
+ return orderedStreamAsync.sendRequest(getStreamId(), streamOffset,
Unpooled.EMPTY_BUFFER.nioBuffer(),
+ Type.STREAM_CLOSE);
+ }
+
+ @Override
+ public CompletableFuture<DataStreamReply> closeForwardAsync() {
+ return orderedStreamAsync.sendRequest(getStreamId(), streamOffset,
Unpooled.EMPTY_BUFFER.nioBuffer(),
+ Type.STREAM_CLOSE_FORWARD);
+ }
+
+ @Override
+ public CompletableFuture<DataStreamReply> startTransactionAsync() {
+ return orderedStreamAsync.sendRequest(getStreamId(), streamOffset,
Unpooled.EMPTY_BUFFER.nioBuffer(),
+ Type.START_TRANSACTION);
}
public RaftClientRequest getHeader() {
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 0f24b0b..3e7cce8 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -146,6 +146,10 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
return previous;
}
+ RaftClientRequest getRequest() {
+ return request;
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + ":" + request;
@@ -276,9 +280,10 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
ctx.writeAndFlush(reply);
}
- private void sendReplySuccess(DataStreamRequestByteBuf request, long
bytesWritten, ChannelHandlerContext ctx) {
+ private void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer
buffer, long bytesWritten,
+ ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
- request.getStreamId(), request.getStreamOffset(), null, bytesWritten,
true, request.getType());
+ request.getStreamId(), request.getStreamOffset(), buffer,
bytesWritten, true, request.getType());
LOG.debug("{}: write {}", this, reply);
ctx.writeAndFlush(reply);
}
@@ -288,7 +293,7 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
sendReplyNotSuccess(request, ctx);
} else {
- sendReplySuccess(request, bytesWritten, ctx);
+ sendReplySuccess(request, null, bytesWritten, ctx);
}
}
@@ -301,34 +306,115 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
};
}
+ private int startTransaction(StreamInfo info, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
+ try {
+ server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply
-> {
+ if (reply.isSuccess()) {
+ ByteBuffer buffer =
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+ sendReplySuccess(request, buffer, -1, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ // if this server is not the leader, forward start transition to the
other peers
+ // there maybe other unexpected reason cause failure except not
leader, forwardStartTransaction anyway
+ forwardStartTransaction(info, request, ctx);
+ } else if (request.getType() == Type.START_TRANSACTION){
+ sendReplyNotSuccess(request, ctx);
+ } else {
+ LOG.error("{}: Unexpected type:{}", this, request.getType());
+ }
+ }, executorService);
+ } catch (IOException e) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ return 0;
+ }
+
+ private void forwardStartTransaction(
+ final StreamInfo info, final DataStreamRequestByteBuf request, final
ChannelHandlerContext ctx) {
+ final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ final CompletableFuture<Boolean> f =
out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ }, executorService);
+
+ results.add(f);
+ }
+
+ JavaUtils.allOf(results).thenAccept(v -> {
+ if (!results.stream().map(CompletableFuture::join).reduce(false,
Boolean::logicalOr)) {
+ sendReplyNotSuccess(request, ctx);
+ }
+ });
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf
request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
- final boolean isHeader = request.getType() == Type.STREAM_HEADER;
final StreamInfo info;
final CompletableFuture<Long> localWrite;
final List<CompletableFuture<DataStreamReply>> remoteWrites = new
ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(),
request.getStreamId());
- if (isHeader) {
+ if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
}
- } else {
+ } else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
- localWrite = info.getPrevious().get()
- .thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf,
stream), executorService);
+ final CompletableFuture<?> previous = info.getPrevious().get();
+
+ localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) ->
writeTo(buf, stream), executorService);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
- remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
+ remoteWrites.add(previous.thenComposeAsync(v ->
out.writeAsync(request.slice().nioBuffer()), executorService));
+ }
+ } else if (request.getType() == Type.STREAM_CLOSE || request.getType() ==
Type.STREAM_CLOSE_FORWARD) {
+ info = streams.get(key);
+ final CompletableFuture<?> previous = info.getPrevious().get();
+
+ localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ }, executorService);
+
+ if (request.getType() == Type.STREAM_CLOSE) {
+ for (DataStreamOutput out : info.getDataStreamOutputs()) {
+ remoteWrites.add(previous.thenComposeAsync(v ->
out.closeForwardAsync(), executorService));
+ }
}
+ } else {
+ // peer server start transaction
+ info = streams.get(key);
+ final CompletableFuture<?> previous = info.getPrevious().get();
+ previous.thenApplyAsync(v -> startTransaction(streams.get(key), request,
ctx), executorService);
+ return;
}
final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
- sendReply(remoteWrites, request, bytesWritten, ctx);
+ if (request.getType() == Type.STREAM_HEADER
+ || request.getType() == Type.STREAM_DATA
+ || request.getType() == Type.STREAM_CLOSE_FORWARD) {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ } else if (request.getType() == Type.STREAM_CLOSE) {
+ // after all server close stream, primary server start transaction
+ // TODO(runzhiwang): send start transaction to leader directly
+ startTransaction(info, request, ctx);
+ } else {
+ LOG.error("{}: Unexpected type:{}", this, request.getType());
+ }
return null;
}, executorService);
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index e99019b..e167f1e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -286,7 +286,8 @@ message DataStreamPacketHeaderProto {
STREAM_HEADER = 0;
STREAM_DATA = 1;
STREAM_CLOSE = 2;
- START_TRANSACTION = 3;
+ STREAM_CLOSE_FORWARD = 3;
+ START_TRANSACTION = 4;
}
uint64 streamId = 1;
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 67d2ecc..d034350 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -19,9 +19,12 @@ package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.GroupInfoReply;
@@ -158,6 +161,12 @@ abstract class DataStreamBaseTest extends BaseTest {
this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
}
+ Server(RaftPeer peer, RaftServer raftServer) {
+ this.peer = peer;
+ this.raftServer = raftServer;
+ this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
+ }
+
RaftPeer getPeer() {
return peer;
}
@@ -319,12 +328,28 @@ abstract class DataStreamBaseTest extends BaseTest {
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
.collect(Collectors.toList());
+
+ List<RaftServer> raftServers = new ArrayList<>();
+ peers.forEach(peer -> raftServers.add(newRaftServer(peer, properties)));
+ setup(peers, raftServers);
+ }
+
+ protected void setup(List<RaftServer> raftServers) {
+ final List<RaftPeer> peers = new ArrayList<>();
+ raftServers.forEach(raftServer ->
+ peers.add(new RaftPeer(raftServer.getId(),
+ NetUtils.createSocketAddrForHost("http://localhost",
+
NettyConfigKeys.DataStream.port(raftServer.getProperties())))));
+
+ setup(peers, raftServers);
+ }
+
+ private void setup(List<RaftPeer> peers, List<RaftServer> raftServers){
raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
servers = new ArrayList<>(peers.size());
// start stream servers on raft peers.
for (int i = 0; i < peers.size(); i++) {
- final RaftPeer peer = peers.get(i);
- final Server server = new Server(peer);
+ final Server server = new Server(peers.get(i), raftServers.get(i));
if (i == 0) {
// only the first server routes requests to peers.
List<RaftPeer> otherPeers = new ArrayList<>(peers);
@@ -356,6 +381,16 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
+ protected void runTestCloseStream(List<RaftServer> raftServers, int
bufferSize, int bufferNum,
+ RaftClientReply expectedClientReply) throws Exception {
+ try {
+ setup(raftServers);
+ runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
+ } finally {
+ shutdown();
+ }
+ }
+
private void runTestDataStream(int numClients, int numStreams, int
bufferSize, int bufferNum) throws Exception {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
final List<DataStreamClientImpl> clients = new ArrayList<>();
@@ -377,6 +412,22 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
+ private void runTestCloseStream(int bufferSize, int bufferNum,
RaftClientReply expectedClientReply)
+ throws IOException {
+ try (final DataStreamClientImpl client = newDataStreamClientImpl()) {
+ DataStreamOutputImpl out = (DataStreamOutputImpl)
client.stream(raftGroup.getGroupId());
+ runTestDataStream(out, bufferSize, bufferNum);
+ DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer)
out.closeAsync().join();
+
+ final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
+ RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
+ Assert.assertTrue(replyByteBuffer.isSuccess());
+ Assert.assertEquals(clientReply.getCallId(),
expectedClientReply.getCallId());
+
Assert.assertTrue(clientReply.getClientId().equals(expectedClientReply.getClientId()));
+ Assert.assertEquals(clientReply.getLogIndex(),
expectedClientReply.getLogIndex());
+ }
+ }
+
private void runTestDataStream(DataStreamOutputImpl out, int bufferSize, int
bufferNum) {
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final List<Integer> sizes = new ArrayList<>();
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 156bc9d..7ae96b1 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -21,12 +21,26 @@ package org.apache.ratis.datastream;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.util.NetUtils;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestDataStreamNetty extends DataStreamBaseTest {
@Before
@@ -53,4 +67,60 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
runTestDataStream(3, 2, 20, 1_000_000, 100);
runTestDataStream(3, 2, 20, 1_000, 10_000);
}
+
+ private void testCloseStream(int leaderIndex, int numServers) throws
Exception {
+ List<RaftServer> raftServers = new ArrayList<>();
+ ClientId clientId = ClientId.randomId();
+ RaftGroupId groupId = RaftGroupId.randomId();
+ long callId = 100;
+ long longIndex = 200;
+ RaftPeer suggestedLeader = new RaftPeer(RaftPeerId.valueOf("s" +
leaderIndex));
+ RaftClientReply expectedClientReply = new RaftClientReply(clientId,
suggestedLeader.getId(),
+ groupId, callId, true, null, null, longIndex, null);
+
+ for (int i = 0; i < 3; i ++) {
+ RaftServer raftServer = mock(RaftServer.class);
+ RaftClientReply raftClientReply;
+ RaftPeerId peerId = RaftPeerId.valueOf("s" + i);
+ RaftProperties properties = new RaftProperties();
+ NettyConfigKeys.DataStream.setPort(properties,
NetUtils.createLocalServerAddress().getPort());
+
+ if (i == leaderIndex) {
+ raftClientReply = expectedClientReply;
+ } else {
+ RaftGroupMemberId raftGroupMemberId =
RaftGroupMemberId.valueOf(peerId, groupId);
+ NotLeaderException notLeaderException = new
NotLeaderException(raftGroupMemberId, suggestedLeader, null);
+ raftClientReply = new RaftClientReply(clientId, peerId,
+ groupId, callId, false, null, notLeaderException, longIndex, null);
+ }
+
+
when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
+ .thenReturn(CompletableFuture.completedFuture(raftClientReply));
+ when(raftServer.getProperties()).thenReturn(properties);
+ when(raftServer.getId()).thenReturn(peerId);
+
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new
MultiDataStreamStateMachine());
+
+ raftServers.add(raftServer);
+ }
+
+ runTestCloseStream(raftServers, 1_000_000, 10, expectedClientReply);
+ }
+
+ @Test
+ public void testCloseStreamPrimaryIsLeader() throws Exception {
+ // primary is 0, leader is 0
+ testCloseStream(0, 3);
+ }
+
+ @Test
+ public void testCloseStreamPrimaryIsNotLeader() throws Exception {
+ // primary is 0, leader is 1
+ testCloseStream(1, 3);
+ }
+
+ @Test
+ public void testCloseStreamOneServer() throws Exception {
+ // primary is 0, leader is 0
+ testCloseStream(0, 1);
+ }
}