szetszwo commented on a change in pull request #338:
URL: https://github.com/apache/incubator-ratis/pull/338#discussion_r538975854
##########
File path:
ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
##########
@@ -134,46 +166,124 @@ private long waitStreamFinish(Map<String,
List<CompletableFuture<DataStreamReply
return totalBytes;
}
- private List<CompletableFuture<DataStreamReply>>
writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
- FileChannel fileChannel) throws IOException {
- final int fileSize = getFileSizeInBytes();
- final int bufferSize = getBufferSizeInBytes();
- if (fileSize <= 0) {
- return Collections.emptyList();
+ abstract static class TransferType {
+ private final String path;
+ private final File file;
+ private final long fileSize;
+ private final int bufferSize;
+ private final long syncSize;
+ private long syncPosition = 0;
+
+ TransferType(String path, DataStream cli) {
+ this.path = path;
+ this.file = new File(path);
+ this.fileSize = cli.getFileSizeInBytes();
+ this.bufferSize = cli.getBufferSizeInBytes();
+ this.syncSize = cli.getSyncSize();
+
+ final long actualSize = file.length();
+ Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file
size: expected size is "
+ + fileSize + " but actual size is " + actualSize + ", path=" + path);
+ }
+
+ File getFile() {
+ return file;
+ }
+
+ int getBufferSize() {
+ return bufferSize;
+ }
+
+ long getPacketSize(long offset) {
+ return Math.min(bufferSize, fileSize - offset);
}
- List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
- for(long offset = 0L; offset < fileSize;) {
- final ByteBuf buf = alloc.directBuffer(bufferSize);
- final int bytesRead = buf.writeBytes(fileChannel, bufferSize);
+ boolean isSync(long position) {
+ if (syncSize > 0) {
+ if (position >= fileSize || syncPosition - position >= syncSize) {
+ syncPosition = position;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ List<CompletableFuture<DataStreamReply>> transfer(FileStoreClient client)
throws IOException {
+ if (fileSize <= 0) {
+ return Collections.emptyList();
+ }
+
+ final List<CompletableFuture<DataStreamReply>> futures = new
ArrayList<>();
+ try (FileInputStream fis = new FileInputStream(file);
+ DataStreamOutput out = client.getStreamOutput(path, fileSize)) {
Review comment:
(1) could also make the read in parallel. We can do both (1) and (2).
Thanks a lot!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]