This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 715bc0f  RATIS-1083. Create transaction once stream data replicated to 
all servers (#237)
715bc0f is described below

commit 715bc0f9eaf885febf0f2fc0a817f800eb314151
Author: runzhiwang <[email protected]>
AuthorDate: Wed Nov 4 22:54:25 2020 +0800

    RATIS-1083. Create transaction once stream data replicated to all servers 
(#237)
    
    * RATIS-1083. Create a transaction once the stream data is replicated to 
all servers
    
    * fix code review
    
    * mark primary
    
    * Revert "mark primary"
    
    This reverts commit 2d698e585573912f915176d1dce1600ad6a8d2e7.
    
    * fix code review
    
    * add STREAM_CLOSE_FORWARD
    
    * fix code review
    
    * Move closeFowardAsync to DataStreamOutput
    
    * rebase master
    
    * use ExecutorService in async
---
 .../apache/ratis/client/api/DataStreamOutput.java  |   6 ++
 .../ratis/client/impl/DataStreamClientImpl.java    |  17 +++-
 .../ratis/netty/server/NettyServerStreamRpc.java   | 106 +++++++++++++++++++--
 ratis-proto/src/main/proto/Raft.proto              |   3 +-
 .../ratis/datastream/DataStreamBaseTest.java       |  55 ++++++++++-
 .../ratis/datastream/TestDataStreamNetty.java      |  70 ++++++++++++++
 6 files changed, 242 insertions(+), 15 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 39f62ca..429a422 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -28,6 +28,12 @@ public interface DataStreamOutput extends 
CloseAsync<DataStreamReply> {
   /** Send out the data in the buffer asynchronously */
   CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
 
+  /** Create a transaction asynchronously once the stream data is replicated 
to all servers */
+  CompletableFuture<DataStreamReply> startTransactionAsync();
+
   /** Get the future of the header request. */
   CompletableFuture<DataStreamReply> getHeaderFuture();
+
+  /** Peer close asynchronously. */
+  CompletableFuture<DataStreamReply> closeForwardAsync();
 }
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 6b27f93..7f3546e 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -87,10 +88,22 @@ public class DataStreamClientImpl implements 
DataStreamClient {
       return f;
     }
 
-    // should wait for attached sliding window to terminate
     @Override
     public CompletableFuture<DataStreamReply> closeAsync() {
-      return null;
+      return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, 
Unpooled.EMPTY_BUFFER.nioBuffer(),
+          Type.STREAM_CLOSE);
+    }
+
+    @Override
+    public CompletableFuture<DataStreamReply> closeForwardAsync() {
+      return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, 
Unpooled.EMPTY_BUFFER.nioBuffer(),
+          Type.STREAM_CLOSE_FORWARD);
+    }
+
+    @Override
+    public CompletableFuture<DataStreamReply> startTransactionAsync() {
+      return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, 
Unpooled.EMPTY_BUFFER.nioBuffer(),
+          Type.START_TRANSACTION);
     }
 
     public RaftClientRequest getHeader() {
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 0f24b0b..3e7cce8 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -146,6 +146,10 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       return previous;
     }
 
+    RaftClientRequest getRequest() {
+      return request;
+    }
+
     @Override
     public String toString() {
       return getClass().getSimpleName() + ":" + request;
@@ -276,9 +280,10 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     ctx.writeAndFlush(reply);
   }
 
-  private void sendReplySuccess(DataStreamRequestByteBuf request, long 
bytesWritten, ChannelHandlerContext ctx) {
+  private void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer 
buffer, long bytesWritten,
+      ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), null, bytesWritten, 
true, request.getType());
+        request.getStreamId(), request.getStreamOffset(), buffer, 
bytesWritten, true, request.getType());
     LOG.debug("{}: write {}", this, reply);
     ctx.writeAndFlush(reply);
   }
@@ -288,7 +293,7 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
       if (!checkSuccessRemoteWrite(remoteWrites, bytesWritten)) {
         sendReplyNotSuccess(request, ctx);
       } else {
-        sendReplySuccess(request, bytesWritten, ctx);
+        sendReplySuccess(request, null, bytesWritten, ctx);
       }
   }
 
@@ -301,34 +306,115 @@ public class NettyServerStreamRpc implements 
DataStreamServerRpc {
     };
   }
 
+  private int startTransaction(StreamInfo info, DataStreamRequestByteBuf 
request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply 
-> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = 
ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else if (request.getType() == Type.STREAM_CLOSE) {
+          // if this server is not the leader, forward start transition to the 
other peers
+          // there maybe other unexpected reason cause failure except not 
leader, forwardStartTransaction anyway
+          forwardStartTransaction(info, request, ctx);
+        } else if (request.getType() == Type.START_TRANSACTION){
+          sendReplyNotSuccess(request, ctx);
+        } else {
+          LOG.error("{}: Unexpected type:{}", this, request.getType());
+        }
+      }, executorService);
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+    return 0;
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final 
ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = 
out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      }, executorService);
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, 
Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf 
request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new 
ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), 
request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
-      localWrite = info.getPrevious().get()
-          .thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf, 
stream), executorService);
+      final CompletableFuture<?> previous = info.getPrevious().get();
+
+      localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> 
writeTo(buf, stream), executorService);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
-        remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
+        remoteWrites.add(previous.thenComposeAsync(v -> 
out.writeAsync(request.slice().nioBuffer()), executorService));
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE || request.getType() == 
Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      final CompletableFuture<?> previous = info.getPrevious().get();
+
+      localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      }, executorService);
+
+      if (request.getType() == Type.STREAM_CLOSE) {
+        for (DataStreamOutput out : info.getDataStreamOutputs()) {
+          remoteWrites.add(previous.thenComposeAsync(v -> 
out.closeForwardAsync(), executorService));
+        }
       }
+    } else {
+      // peer server start transaction
+      info = streams.get(key);
+      final CompletableFuture<?> previous = info.getPrevious().get();
+      previous.thenApplyAsync(v -> startTransaction(streams.get(key), request, 
ctx), executorService);
+      return;
     }
 
     final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER
+              || request.getType() == Type.STREAM_DATA
+              || request.getType() == Type.STREAM_CLOSE_FORWARD) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            // after all server close stream, primary server start transaction
+            // TODO(runzhiwang): send start transaction to leader directly
+            startTransaction(info, request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
           return null;
         }, executorService);
 
diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index e99019b..e167f1e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -286,7 +286,8 @@ message DataStreamPacketHeaderProto {
     STREAM_HEADER = 0;
     STREAM_DATA = 1;
     STREAM_CLOSE = 2;
-    START_TRANSACTION = 3;
+    STREAM_CLOSE_FORWARD = 3;
+    START_TRANSACTION = 4;
   }
 
   uint64 streamId = 1;
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 67d2ecc..d034350 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -19,9 +19,12 @@ package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.GroupInfoReply;
@@ -158,6 +161,12 @@ abstract class DataStreamBaseTest extends BaseTest {
       this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
     }
 
+    Server(RaftPeer peer, RaftServer raftServer) {
+      this.peer = peer;
+      this.raftServer = raftServer;
+      this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
+    }
+
     RaftPeer getPeer() {
       return peer;
     }
@@ -319,12 +328,28 @@ abstract class DataStreamBaseTest extends BaseTest {
         .map(RaftPeerId::valueOf)
         .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
         .collect(Collectors.toList());
+
+    List<RaftServer> raftServers = new ArrayList<>();
+    peers.forEach(peer -> raftServers.add(newRaftServer(peer, properties)));
+    setup(peers, raftServers);
+  }
+
+  protected void setup(List<RaftServer> raftServers) {
+    final List<RaftPeer> peers = new ArrayList<>();
+    raftServers.forEach(raftServer ->
+        peers.add(new RaftPeer(raftServer.getId(),
+            NetUtils.createSocketAddrForHost("http://localhost";,
+                
NettyConfigKeys.DataStream.port(raftServer.getProperties())))));
+
+    setup(peers, raftServers);
+  }
+
+  private void setup(List<RaftPeer> peers, List<RaftServer> raftServers){
     raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
     servers = new ArrayList<>(peers.size());
     // start stream servers on raft peers.
     for (int i = 0; i < peers.size(); i++) {
-      final RaftPeer peer = peers.get(i);
-      final Server server = new Server(peer);
+      final Server server = new Server(peers.get(i), raftServers.get(i));
       if (i == 0) {
         // only the first server routes requests to peers.
         List<RaftPeer> otherPeers = new ArrayList<>(peers);
@@ -356,6 +381,16 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
+  protected void runTestCloseStream(List<RaftServer> raftServers, int 
bufferSize, int bufferNum,
+      RaftClientReply expectedClientReply) throws Exception {
+    try {
+      setup(raftServers);
+      runTestCloseStream(bufferSize, bufferNum, expectedClientReply);
+    } finally {
+      shutdown();
+    }
+  }
+
   private void runTestDataStream(int numClients, int numStreams, int 
bufferSize, int bufferNum) throws Exception {
     final List<CompletableFuture<Void>> futures = new ArrayList<>();
     final List<DataStreamClientImpl> clients = new ArrayList<>();
@@ -377,6 +412,22 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
+  private void runTestCloseStream(int bufferSize, int bufferNum, 
RaftClientReply expectedClientReply)
+      throws IOException {
+    try (final DataStreamClientImpl client = newDataStreamClientImpl()) {
+      DataStreamOutputImpl out = (DataStreamOutputImpl) 
client.stream(raftGroup.getGroupId());
+      runTestDataStream(out, bufferSize, bufferNum);
+      DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer) 
out.closeAsync().join();
+
+      final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
+          RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
+      Assert.assertTrue(replyByteBuffer.isSuccess());
+      Assert.assertEquals(clientReply.getCallId(), 
expectedClientReply.getCallId());
+      
Assert.assertTrue(clientReply.getClientId().equals(expectedClientReply.getClientId()));
+      Assert.assertEquals(clientReply.getLogIndex(), 
expectedClientReply.getLogIndex());
+    }
+  }
+
   private void runTestDataStream(DataStreamOutputImpl out, int bufferSize, int 
bufferNum) {
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
     final List<Integer> sizes = new ArrayList<>();
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 156bc9d..7ae96b1 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -21,12 +21,26 @@ package org.apache.ratis.datastream;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestDataStreamNetty extends DataStreamBaseTest {
   @Before
@@ -53,4 +67,60 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
     runTestDataStream(3, 2, 20, 1_000_000, 100);
     runTestDataStream(3, 2, 20, 1_000, 10_000);
   }
+
+  private void testCloseStream(int leaderIndex, int numServers) throws 
Exception {
+    List<RaftServer> raftServers = new ArrayList<>();
+    ClientId clientId = ClientId.randomId();
+    RaftGroupId groupId = RaftGroupId.randomId();
+    long callId = 100;
+    long longIndex = 200;
+    RaftPeer suggestedLeader = new RaftPeer(RaftPeerId.valueOf("s" + 
leaderIndex));
+    RaftClientReply expectedClientReply = new RaftClientReply(clientId, 
suggestedLeader.getId(),
+        groupId, callId, true, null, null, longIndex, null);
+
+    for (int i = 0; i < 3; i ++) {
+      RaftServer raftServer = mock(RaftServer.class);
+      RaftClientReply raftClientReply;
+      RaftPeerId peerId = RaftPeerId.valueOf("s" + i);
+      RaftProperties properties = new RaftProperties();
+      NettyConfigKeys.DataStream.setPort(properties, 
NetUtils.createLocalServerAddress().getPort());
+
+      if (i == leaderIndex) {
+        raftClientReply = expectedClientReply;
+      } else {
+        RaftGroupMemberId raftGroupMemberId = 
RaftGroupMemberId.valueOf(peerId, groupId);
+        NotLeaderException notLeaderException = new 
NotLeaderException(raftGroupMemberId, suggestedLeader, null);
+        raftClientReply = new RaftClientReply(clientId, peerId,
+            groupId, callId, false, null, notLeaderException, longIndex, null);
+      }
+
+      
when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
+          .thenReturn(CompletableFuture.completedFuture(raftClientReply));
+      when(raftServer.getProperties()).thenReturn(properties);
+      when(raftServer.getId()).thenReturn(peerId);
+      
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new 
MultiDataStreamStateMachine());
+
+      raftServers.add(raftServer);
+    }
+
+    runTestCloseStream(raftServers, 1_000_000, 10, expectedClientReply);
+  }
+
+  @Test
+  public void testCloseStreamPrimaryIsLeader() throws Exception {
+    // primary is 0, leader is 0
+    testCloseStream(0, 3);
+  }
+
+  @Test
+  public void testCloseStreamPrimaryIsNotLeader() throws Exception {
+    // primary is 0, leader is 1
+    testCloseStream(1, 3);
+  }
+
+  @Test
+  public void testCloseStreamOneServer() throws Exception {
+    // primary is 0, leader is 0
+    testCloseStream(0, 1);
+  }
 }

Reply via email to