This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch RATIS-1209 in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
commit c140982b3a402d41df7d35754e3877511b3d9b3a Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Sun Dec 6 19:55:41 2020 +0800 RATIS-1209. Compare the performance between DataStreamApi and AsyncApi. --- .../ratis/examples/filestore/cli/DataStream.java | 47 +++++++++++++++------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java index 5ca07d2..7857cd4 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java @@ -23,6 +23,10 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.examples.filestore.FileStoreClient; import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator; +import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator; +import org.apache.ratis.util.Preconditions; import java.io.File; import java.io.FileInputStream; @@ -31,6 +35,7 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,6 +76,10 @@ public class DataStream extends Client { Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>(); for(String path : paths) { File file = new File(path); + final long fileLength = file.length(); + Preconditions.assertTrue(fileLength == getFileSizeInBytes(), + "Unexpected file size: expected size is " + getFileSizeInBytes() + + " but actual size is " + fileLength); FileInputStream fis = new FileInputStream(file); final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length()); @@ -106,23 +115,31 @@ public class DataStream extends Client { private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput, FileChannel fileChannel) throws IOException { - List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>(); - - int bytesToRead = getBufferSizeInBytes(); - if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < getBufferSizeInBytes()) { - bytesToRead = getFileSizeInBytes(); + final int fileSize = getFileSizeInBytes(); + final int bufferSize = getBufferSizeInBytes(); + if (fileSize <= 0) { + return Collections.emptyList(); } - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytesToRead); - long offset = 0L; - - while (fileChannel.read(byteBuffer) > 0) { - byteBuffer.flip(); - futures.add(dataStreamOutput.writeAsync(byteBuffer, offset + bytesToRead == getFileSizeInBytes())); - offset += bytesToRead; - bytesToRead = (int) Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes()); - if (bytesToRead > 0) { - byteBuffer = ByteBuffer.allocateDirect(bytesToRead); + List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>(); + final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT; + + for(long offset = 0L; offset < fileSize;) { + final ByteBuf buf = alloc.directBuffer(bufferSize); + final ByteBuffer byteBuffer = buf.nioBuffers()[0]; + Preconditions.assertTrue(byteBuffer.remaining() > 0); + + final int bytesRead = fileChannel.read(byteBuffer); + if (bytesRead < 0) { + throw new IllegalStateException("Failed to read " + fileSize + + " byte(s). The channel has reached end-of-stream at " + offset); + } else if (bytesRead > 0) { + offset += bytesRead; + + byteBuffer.flip(); + final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(byteBuffer, offset == fileSize); + f.thenRun(buf::release); + futures.add(f); } }
