runzhiwang commented on a change in pull request #338:
URL: https://github.com/apache/incubator-ratis/pull/338#discussion_r538931979
##########
File path:
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
##########
@@ -134,46 +166,124 @@ private long waitStreamFinish(Map<String,
List<CompletableFuture<DataStreamReply
return totalBytes;
}
- private List<CompletableFuture<DataStreamReply>>
writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
- FileChannel fileChannel) throws IOException {
- final int fileSize = getFileSizeInBytes();
- final int bufferSize = getBufferSizeInBytes();
- if (fileSize <= 0) {
- return Collections.emptyList();
+ abstract static class TransferType {
+ private final String path;
+ private final File file;
+ private final long fileSize;
+ private final int bufferSize;
+ private final long syncSize;
+ private long syncPosition = 0;
+
+ TransferType(String path, DataStream cli) {
+ this.path = path;
+ this.file = new File(path);
+ this.fileSize = cli.getFileSizeInBytes();
+ this.bufferSize = cli.getBufferSizeInBytes();
+ this.syncSize = cli.getSyncSize();
+
+ final long actualSize = file.length();
+ Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file
size: expected size is "
+ + fileSize + " but actual size is " + actualSize + ", path=" + path);
+ }
+
+ File getFile() {
+ return file;
+ }
+
+ int getBufferSize() {
+ return bufferSize;
+ }
+
+ long getPacketSize(long offset) {
+ return Math.min(bufferSize, fileSize - offset);
}
- List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
- for(long offset = 0L; offset < fileSize;) {
- final ByteBuf buf = alloc.directBuffer(bufferSize);
- final int bytesRead = buf.writeBytes(fileChannel, bufferSize);
+ boolean isSync(long position) {
+ if (syncSize > 0) {
+ if (position >= fileSize || syncPosition - position >= syncSize) {
+ syncPosition = position;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client)
throws IOException {
+ if (fileSize <= 0) {
+ return Collections.emptyList();
+ }
+
+ final List<CompletableFuture<DataStreamReply>> futures = new
ArrayList<>();
+ try (FileInputStream fis = new FileInputStream(file);
+ DataStreamOutput out = client.getStreamOutput(path, fileSize)) {
Review comment:
@szetszwo If we put `DataStreamOutput out =
client.getStreamOutput(path, fileSize)` in try, it will call close() not
closeAsync(). Then we must finish close one stream (including
submitRaftClientRequest), then start another stream. Or we can start
multi-thread for multi- stream, not one thread charge of multi-stream. Or we
call DataStreamOutput#closeAsync(). Besides, I did not count the time cost of
close stream, my bad.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]