This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7878ddf RATIS-1085. Encode a RaftClientRequest as the head of a
stream request (#221)
7878ddf is described below
commit 7878ddf40b8c730f31ec7205b2a1b4fc64c6f0c7
Author: runzhiwang <[email protected]>
AuthorDate: Wed Oct 14 09:43:39 2020 +0800
RATIS-1085. Encode a RaftClientRequest as the head of a stream request
(#221)
---
.../apache/ratis/client/api/DataStreamOutput.java | 2 +-
.../ratis/client/impl/DataStreamClientImpl.java | 24 ++++++++++++++++++++--
.../ratis/netty/server/NettyServerStreamRpc.java | 9 ++++++--
.../apache/ratis/datastream/TestDataStream.java | 22 ++++++++++++--------
4 files changed, 43 insertions(+), 14 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 936b9b6..dd86569 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -26,5 +26,5 @@ import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
/** Send out the data in the buffer asynchronously */
- CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf);
+ CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2afc42a..2c9d175 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -25,8 +25,11 @@ import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +47,9 @@ public class DataStreamClientImpl implements DataStreamClient
{
private DataStreamClientRpc dataStreamClientRpc;
private OrderedStreamAsync orderedStreamAsync;
+ // TODO Similar to RaftClientImpl, pass ClientId and RaftGroupId/RaftGroup
in constructor.
+ private final ClientId clientId = ClientId.randomId();
+ private final RaftGroupId groupId = RaftGroupId.randomId();
private RaftPeer raftServer;
private RaftProperties properties;
private Parameters parameters;
@@ -64,17 +70,23 @@ public class DataStreamClientImpl implements
DataStreamClient {
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc,
properties);
}
- class DataStreamOutputImpl implements DataStreamOutput {
+ public class DataStreamOutputImpl implements DataStreamOutput {
private long streamId = 0;
private long messageId = 0;
+ private final RaftClientRequest header;
+ private final CompletableFuture<DataStreamReply> headerFuture;
public DataStreamOutputImpl(long id){
this.streamId = id;
+ this.header = new RaftClientRequest(clientId, raftServer.getId(),
groupId, RaftClientImpl.nextCallId(),
+ RaftClientRequest.writeRequestType());
+ this.headerFuture = orderedStreamAsync.sendRequest(streamId, messageId,
+
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer());
}
// send to the attached dataStreamClientRpc
@Override
- public CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf) {
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
messageId++;
return orderedStreamAsync.sendRequest(streamId, messageId, buf);
}
@@ -84,6 +96,14 @@ public class DataStreamClientImpl implements
DataStreamClient {
public CompletableFuture<DataStreamReply> closeAsync() {
return null;
}
+
+ public RaftClientRequest getHeader() {
+ return header;
+ }
+
+ public CompletableFuture<DataStreamReply> getHeaderFuture() {
+ return headerFuture;
+ }
}
@Override
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 3caaafd..6ccabbc 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -18,6 +18,8 @@
package org.apache.ratis.netty.server;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamReplyByteBuffer;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -25,6 +27,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.*;
@@ -65,9 +68,11 @@ public class NettyServerStreamRpc implements
DataStreamServerRpc {
private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf,
AtomicBoolean released) {
try {
- // TODO RATIS-1085: read the request from buf
- final RaftClientRequest request = null;
+ final RaftClientRequest request =
+
ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
return stateMachine.data().stream(request);
+ } catch (InvalidProtocolBufferException e) {
+ throw new CompletionException(e);
} finally {
buf.release();
released.set(true);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
index 3b5426c..cca6c94 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
@@ -96,6 +96,7 @@ public class TestDataStream extends BaseTest {
@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+ writeRequest = request;
return CompletableFuture.completedFuture(stream);
}
}
@@ -105,6 +106,7 @@ public class TestDataStream extends BaseTest {
private DataStreamServerImpl server;
private DataStreamClientImpl client;
private int byteWritten = 0;
+ private RaftClientRequest writeRequest;
public void setupServer(){
server = new DataStreamServerImpl(peers[0], new
SingleDataStreamStateMachine(), properties, null);
@@ -138,10 +140,12 @@ public class TestDataStream extends BaseTest {
final int bufferSize = 1024*1024;
final int bufferNum = 10;
final DataStreamOutput out = client.stream();
+ DataStreamClientImpl.DataStreamOutputImpl impl =
(DataStreamClientImpl.DataStreamOutputImpl) out;
- //send request
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- futures.add(sendRequest(out, 1024));
+
+ // add header
+ futures.add(impl.getHeaderFuture());
//send data
final int halfBufferSize = bufferSize/2;
@@ -149,7 +153,7 @@ public class TestDataStream extends BaseTest {
for(int i = 0; i < bufferNum; i++) {
final int size = halfBufferSize +
ThreadLocalRandom.current().nextInt(halfBufferSize);
final ByteBuffer bf = initBuffer(dataSize, size);
- futures.add(out.streamAsync(bf));
+ futures.add(out.writeAsync(bf));
dataSize += size;
}
@@ -157,16 +161,16 @@ public class TestDataStream extends BaseTest {
for(CompletableFuture<DataStreamReply> f : futures) {
f.join();
}
+
+ Assert.assertEquals(writeRequest.getClientId(),
impl.getHeader().getClientId());
+ Assert.assertEquals(writeRequest.getCallId(),
impl.getHeader().getCallId());
+ Assert.assertEquals(writeRequest.getRaftGroupId(),
impl.getHeader().getRaftGroupId());
+ Assert.assertEquals(writeRequest.getServerId(),
impl.getHeader().getServerId());
+
Assert.assertEquals(dataSize, byteWritten);
shutDownSetup();
}
- CompletableFuture<DataStreamReply> sendRequest(DataStreamOutput out, int
size) {
- // TODO RATIS-1085: create a RaftClientRequest and put it in the buffer
- final ByteBuffer buffer = initBuffer(0, size);
- return out.streamAsync(buffer);
- }
-
static ByteBuffer initBuffer(int offset, int size) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
final int length = buffer.capacity();