szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534576753
##########
File path:
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
##########
@@ -219,4 +224,56 @@ public void close() {
reader.shutdownNow();
deleter.shutdownNow();
}
+
+ CompletableFuture<StreamWriteReplyProto> streamCommit(String p, long
bytesWritten) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ final Path full = resolve(normalize(p));
+ RandomAccessFile file = new RandomAccessFile(full.toFile(), "r");
+ long len = file.length();
+ return StreamWriteReplyProto.newBuilder().setIsSuccess(len ==
bytesWritten).setByteWritten(len).build();
+ } catch (IOException e) {
+ throw new CompletionException("Failed to commit stream write on file "
+ p, e);
Review comment:
Include bytesWritten in the message.
##########
File path:
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +158,55 @@ public TransactionContext
startTransaction(RaftClientRequest request) throws IOE
return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
}
+ class LocalStream implements DataStream {
+ private final DataChannel dataChannel;
+
+ LocalStream(DataChannel dataChannel) {
+ this.dataChannel = dataChannel;
+ }
+
+ @Override
+ public DataChannel getDataChannel() {
+ return dataChannel;
+ }
+
+ @Override
+ public CompletableFuture<?> cleanUp() {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ dataChannel.close();
+ return true;
+ } catch (IOException e) {
+ return FileStoreCommon.completeExceptionally("Failed to close data
channel", e);
+ }
+ });
+ }
+ }
+
+ @Override
+ public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+ final ByteString reqByteString = request.getMessage().getContent();
+ final FileStoreRequestProto proto;
+ try {
+ proto = FileStoreRequestProto.parseFrom(reqByteString);
+ } catch (InvalidProtocolBufferException e) {
+ return FileStoreCommon.completeExceptionally(
+ "Failed to parse stream header", e);
+ }
+ return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+ .thenApply(LocalStream::new);
+ }
+
+ @Override
+ public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+ LOG.info("link {}", stream);
+ if (stream == null) {
+ return JavaUtils.completeExceptionally(new IllegalStateException("Null
stream: entry=" + entry));
+ }
+ dataStreamToLogEntryMap.put(entry, stream);
+ return CompletableFuture.completedFuture(null);
Review comment:
Check if the stream is already closed and if the lengths are matched.
If not, complete exceptionally.
##########
File path:
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -40,14 +41,18 @@
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
public class FileStoreStateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage = new
SimpleStateMachineStorage();
+ private final ConcurrentMap<LogEntryProto, DataStream>
dataStreamToLogEntryMap = new ConcurrentHashMap<>();
Review comment:
Let's remove this map since it is currently not useful.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]