amaliujia commented on a change in pull request #298:
URL: https://github.com/apache/incubator-ratis/pull/298#discussion_r534668692



##########
File path: 
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
##########
@@ -153,6 +158,55 @@ public TransactionContext 
startTransaction(RaftClientRequest request) throws IOE
     return reply.thenApply(ExamplesProtos.ReadReplyProto::getData);
   }
 
+  class LocalStream implements DataStream {
+    private final DataChannel dataChannel;
+
+    LocalStream(DataChannel dataChannel) {
+      this.dataChannel = dataChannel;
+    }
+
+    @Override
+    public DataChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @Override
+    public CompletableFuture<?> cleanUp() {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          dataChannel.close();
+          return true;
+        } catch (IOException e) {
+          return FileStoreCommon.completeExceptionally("Failed to close data 
channel", e);
+        }
+      });
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataStream> stream(RaftClientRequest request) {
+    final ByteString reqByteString = request.getMessage().getContent();
+    final FileStoreRequestProto proto;
+    try {
+      proto = FileStoreRequestProto.parseFrom(reqByteString);
+    } catch (InvalidProtocolBufferException e) {
+      return FileStoreCommon.completeExceptionally(
+          "Failed to parse stream header", e);
+    }
+    return files.createDataChannel(proto.getStream().getPath().toStringUtf8())
+        .thenApply(LocalStream::new);
+  }
+
+  @Override
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    LOG.info("link {}", stream);
+    if (stream == null) {
+      return JavaUtils.completeExceptionally(new IllegalStateException("Null 
stream: entry=" + entry));
+    }
+    dataStreamToLogEntryMap.put(entry, stream);
+    return CompletableFuture.completedFuture(null);

Review comment:
       I have did similar thing in `applyTransaction` (check whether the file 
exists, whether the file has the right length).
   
   Do you think that is not enough? 
   
   if so, in order to check whether this stream closes, and check the length, I 
think we need to update the interface 
   
   ```
     /**
      * For streaming state machine data.
      */
     interface DataStream {
       /** @return a channel for streaming state machine data. */
       DataChannel getDataChannel();
   
       /**
        * Clean up asynchronously this stream.
        *
        * When there is an error, this method is invoked to clean up the 
associated resources.
        * If this stream is not yet linked (see {@link DataApi#link(DataStream, 
LogEntryProto)}),
        * the state machine may choose to remove the data or to keep the data 
internally for future recovery.
        * If this stream is already linked, the data must not be removed.
        *
        * @return a future for the cleanup task.
        */
       CompletableFuture<?> cleanUp();
     }
   ```
   to   expose those information. 
   
   The least I can do with current interface is to check whether the channel in 
the data stream has closed.
   
   @szetszwo  what do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to