szetszwo commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r533437943



##########
File path: 
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +163,55 @@ public TransactionContext 
startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data 
channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final StreamWriteRequestHeaderProto proto;
+    try {
+      proto = StreamWriteRequestHeaderProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null 
stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(stream, entry);

Review comment:
       In applyTransaction(..), a log entry will be given and then the state 
machine has to apply it.
   
   For Streaming in FileStore, we may simply check if the stream is closed and 
the file exists since the data should have already been written to the file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to