This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 913f5a4 RATIS-1154. Add sync flag when write stream data (#284)
913f5a4 is described below
commit 913f5a4ad54ddc948f49eca8907bb13ea67a6e35
Author: runzhiwang <[email protected]>
AuthorDate: Wed Nov 18 13:41:55 2020 +0800
RATIS-1154. Add sync flag when write stream data (#284)
* RATIS-1154. Add sync flag when write stream data
* Update
ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
* fix code review
Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
---
.../apache/ratis/client/api/DataStreamOutput.java | 6 ++++-
.../ratis/client/impl/DataStreamClientImpl.java | 4 ++--
.../ratis/netty/server/DataStreamManagement.java | 26 ++++++++++++++--------
ratis-proto/src/main/proto/Raft.proto | 5 +++--
.../apache/ratis/statemachine/StateMachine.java | 9 +++++++-
.../ratis/datastream/DataStreamBaseTest.java | 20 +++++++++++++----
6 files changed, 51 insertions(+), 19 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index dd86569..8219d69 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -26,5 +26,9 @@ import java.util.concurrent.CompletableFuture;
/** An asynchronous output stream supporting zero buffer copying. */
public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
/** Send out the data in the buffer asynchronously */
- CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf);
+ default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer) {
+ return writeAsync(buffer, false);
+ }
+
+ CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean
sync);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2acf657..c7acb6e 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -85,12 +85,12 @@ public class DataStreamClientImpl implements
DataStreamClient {
// send to the attached dataStreamClientRpc
@Override
- public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
+ public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf,
boolean sync) {
if (isClosed()) {
return JavaUtils.completeExceptionally(new AlreadyClosedException(
clientId + ": stream already closed, request=" + header));
}
- final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
+ final CompletableFuture<DataStreamReply> f = send(sync ?
Type.STREAM_DATA_SYNC : Type.STREAM_DATA, buf);
streamOffset += buf.remaining();
return combineHeader(f);
}
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 81e4f68..f88a52a 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -35,6 +35,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -73,9 +73,9 @@ public class DataStreamManagement {
this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s ->
0L));
}
- CompletableFuture<Long> write(ByteBuf buf, Executor executor) {
+ CompletableFuture<Long> write(ByteBuf buf, boolean sync, Executor
executor) {
return composeAsync(writeFuture, executor,
- n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, stream),
executor));
+ n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, sync,
stream), executor));
}
CompletableFuture<Long> close(Executor executor) {
@@ -92,7 +92,7 @@ public class DataStreamManagement {
}
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request)
{
- return out.writeAsync(request.slice().nioBuffer());
+ return out.writeAsync(request.slice().nioBuffer(), request.getType() ==
Type.STREAM_DATA_SYNC);
}
CompletableFuture<DataStreamReply>
startTransaction(DataStreamRequestByteBuf request,
@@ -235,8 +235,8 @@ public class DataStreamManagement {
return composed;
}
- static long writeTo(ByteBuf buf, DataStream stream) {
- final WritableByteChannel channel = stream.getWritableByteChannel();
+ static long writeTo(ByteBuf buf, boolean sync, DataStream stream) {
+ final StateMachineDataChannel channel = stream.getWritableByteChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
try {
@@ -245,6 +245,14 @@ public class DataStreamManagement {
throw new CompletionException(t);
}
}
+
+ if (sync) {
+ try {
+ channel.force(false);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ }
return byteWritten;
}
@@ -419,8 +427,8 @@ public class DataStreamManagement {
if (request.getType() == Type.STREAM_HEADER) {
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
- } else if (request.getType() == Type.STREAM_DATA) {
- localWrite = info.getLocal().write(buf, executor);
+ } else if (request.getType() == Type.STREAM_DATA || request.getType() ==
Type.STREAM_DATA_SYNC) {
+ localWrite = info.getLocal().write(buf, request.getType() ==
Type.STREAM_DATA_SYNC, executor);
remoteWrites = info.applyToRemotes(out -> out.write(request));
} else if (request.getType() == Type.STREAM_CLOSE) {
localWrite = info.getLocal().close(executor);
@@ -432,7 +440,7 @@ public class DataStreamManagement {
composeAsync(info.getPrevious(), executor, n ->
JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
if (request.getType() == Type.STREAM_HEADER
- || request.getType() == Type.STREAM_DATA) {
+ || request.getType() == Type.STREAM_DATA || request.getType() ==
Type.STREAM_DATA_SYNC) {
sendReply(remoteWrites, request, bytesWritten, ctx);
} else if (request.getType() == Type.STREAM_CLOSE) {
if (info.isPrimary()) {
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index da82151..b6ae878 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -286,8 +286,9 @@ message DataStreamPacketHeaderProto {
enum Type {
STREAM_HEADER = 0;
STREAM_DATA = 1;
- STREAM_CLOSE = 2;
- START_TRANSACTION = 3;
+ STREAM_DATA_SYNC = 2;
+ STREAM_CLOSE = 3;
+ START_TRANSACTION = 4;
}
uint64 streamId = 1;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 351137d..74d4df5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -231,11 +231,18 @@ public interface StateMachine extends Closeable {
}
/**
+ * For write state machine data.
+ */
+ interface StateMachineDataChannel extends WritableByteChannel {
+ void force(boolean metadata) throws IOException;
+ }
+
+ /**
* For streaming state machine data.
*/
interface DataStream {
/** @return a channel for streaming state machine data. */
- WritableByteChannel getWritableByteChannel();
+ StateMachineDataChannel getWritableByteChannel();
/**
* Clean up asynchronously this stream.
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b206b0c..5b7baf9 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -47,6 +47,7 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -132,8 +133,14 @@ abstract class DataStreamBaseTest extends BaseTest {
static class SingleDataStream implements DataStream {
private int byteWritten = 0;
private final RaftClientRequest writeRequest;
+ private int forcedPosition = 0;
+
+ final StateMachineDataChannel channel = new StateMachineDataChannel() {
+ @Override
+ public void force(boolean metadata) throws IOException {
+ forcedPosition = byteWritten;
+ }
- final WritableByteChannel channel = new WritableByteChannel() {
private volatile boolean open = true;
@Override
@@ -161,7 +168,7 @@ abstract class DataStreamBaseTest extends BaseTest {
};
@Override
- public WritableByteChannel getWritableByteChannel() {
+ public StateMachineDataChannel getWritableByteChannel() {
return channel;
}
@@ -186,6 +193,10 @@ abstract class DataStreamBaseTest extends BaseTest {
public RaftClientRequest getWriteRequest() {
return writeRequest;
}
+
+ public int getForcedPosition() {
+ return forcedPosition;
+ }
}
static class Server {
@@ -488,7 +499,7 @@ abstract class DataStreamBaseTest extends BaseTest {
sizes.add(size);
final ByteBuffer bf = initBuffer(dataSize, size);
- futures.add(out.writeAsync(bf));
+ futures.add(out.writeAsync(bf, i == bufferNum - 1));
dataSize += size;
}
@@ -504,7 +515,7 @@ abstract class DataStreamBaseTest extends BaseTest {
final DataStreamReply reply = futures.get(i).join();
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
- Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
+ Assert.assertEquals(reply.getType(), i == futures.size() - 1 ?
Type.STREAM_DATA_SYNC : Type.STREAM_DATA);
}
return dataSize;
}
@@ -553,6 +564,7 @@ abstract class DataStreamBaseTest extends BaseTest {
final SingleDataStream stream = s.getSingleDataStream(header);
Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
Assert.assertEquals(dataSize, stream.getByteWritten());
+ Assert.assertEquals(dataSize, stream.getForcedPosition());
final RaftClientRequest writeRequest = stream.getWriteRequest();
assertRaftClientMessage(header, writeRequest);