This is an automated email from the ASF dual-hosted git repository. dimuthuupe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
commit 33be009066c43d32db9c89da96dd6444643fd231 Author: Praneeth Chityala <[email protected]> AuthorDate: Sat Mar 18 01:36:13 2023 -0400 Local incoming chunked update --- .../local/LocalIncomingChunkedConnector.java | 81 +++++++++++----------- 1 file changed, 39 insertions(+), 42 deletions(-) diff --git a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java index 91f5de6..dd388ab 100644 --- a/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java +++ b/transport/local-transport/src/main/java/org/apache/airavata/mft/transport/local/LocalIncomingChunkedConnector.java @@ -21,22 +21,24 @@ package org.apache.airavata.mft.transport.local; import org.apache.airavata.mft.core.api.ConnectorConfig; import org.apache.airavata.mft.core.api.IncomingChunkedConnector; -import java.io.InputStream; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.File; +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LocalIncomingChunkedConnector implements IncomingChunkedConnector { private String resourcePath; + private long resourceSize; private static final Logger logger = LoggerFactory.getLogger(LocalIncomingChunkedConnector.class); @Override public void init(ConnectorConfig connectorConfig) throws Exception { this.resourcePath = connectorConfig.getResourcePath(); + this.resourceSize = connectorConfig.getMetadata().getFile().getResourceSize(); } @Override @@ -53,45 +55,40 @@ public class LocalIncomingChunkedConnector implements IncomingChunkedConnector { @Override public void downloadChunk(int chunkId, long startByte, long endByte, String downloadFile) throws Exception { - FileInputStream from = new FileInputStream(new File(this.resourcePath)); - FileOutputStream to = new FileOutputStream(new File(downloadFile)); - - final int buffLen = 1024; - - byte[] buf = new byte[buffLen]; - - from.skip(startByte); - - long fileSize = endByte - startByte + 1; - - while (true) { - int bufSize = 0; - - if (buffLen < fileSize) { - bufSize = buffLen; - } else { - bufSize = (int) fileSize; - } - - bufSize = (int) from.read(buf, 0, bufSize); - - if (bufSize < 0) { - break; - } - - to.write(buf, 0, bufSize); - to.flush(); - - fileSize -= bufSize; - - if (fileSize == 0L) { - break; + logger.info("Downloading chunk {} with start byte {} and end byte {} to file {} from resource path {}", + chunkId, startByte, endByte, downloadFile, this.resourcePath); + +// #use this code on a DMA enabled device +// if (resourceSize <= endByte - startByte) { +// Files.copy(Path.of(this.resourcePath), Path.of(downloadFile)); +// } else { +// try (FileInputStream from = new FileInputStream(this.resourcePath); +// FileOutputStream to = new FileOutputStream(downloadFile)) { +// from.getChannel().transferTo(startByte, endByte - startByte, to.getChannel()); +// } catch (Exception e) { +// logger.error("Unexpected error occurred while downloading chunk {} to file {} from resource path {}", +// chunkId, downloadFile, this.resourcePath, e); +// throw e; +// } +// } + + int buffLen = 1024 * 1024 * 16; + try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(this.resourcePath),buffLen); + BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(downloadFile))) { + byte[] buffer = new byte[buffLen]; + int read = 0; + long totalRead = bis.skip(startByte); + while ((read = bis.read(buffer,0,Math.min(buffLen, (int) (endByte - totalRead )))) > 0) { + bos.write(buffer, 0, read); + totalRead += read; } + bis.close(); + bos.close(); + } catch (Exception e) { + logger.error("Unexpected error occurred while downloading chunk {} to file {} from resource path {}", + chunkId, downloadFile, this.resourcePath, e); + throw e; } - - from.close(); - to.close(); - } @Override @@ -101,6 +98,6 @@ public class LocalIncomingChunkedConnector implements IncomingChunkedConnector { from.skip(startByte); - return from; + return new BufferedInputStream(from, Math.min(16 * 1024 * 1024,(int) (endByte - startByte))); } }
