This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new e917df5  Streaming chunked data transfer support
e917df5 is described below

commit e917df5eefa87618d0131605824ed8f0a614d326
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Mon Apr 18 22:40:23 2022 -0400

    Streaming chunked data transfer support
---
 .../org/apache/airavata/mft/agent/MFTAgent.java    |  8 ++++-
 .../airavata/mft/agent/TransportMediator.java      | 36 ++++++++++++++++------
 .../mft/core/api/IncomingChunkedConnector.java     |  1 +
 .../mft/core/api/OutgoingChunkedConnector.java     |  1 +
 .../mft/transport/s3/S3IncomingConnector.java      | 10 ++++++
 .../mft/transport/s3/S3OutgoingConnector.java      | 15 +++++++++
 6 files changed, 60 insertions(+), 11 deletions(-)

diff --git a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
index 5a61d31..18e7ba6 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
@@ -107,6 +107,9 @@ public class MFTAgent implements CommandLineRunner {
     @org.springframework.beans.factory.annotation.Value("${agent.chunk.size}")
     private int chunkedSize;
 
+    
@org.springframework.beans.factory.annotation.Value("${agent.chunk.streaming.enabled}")
+    private boolean doChunkStream;
+
     private final Semaphore mainHold = new Semaphore(0);
 
     private KVCache transferMessageCache;
@@ -139,7 +142,10 @@ public class MFTAgent implements CommandLineRunner {
     public void init() {
         transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), 
MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
         rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), 
MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
-        mediator = new TransportMediator(tempDataDir, concurrentTransfers, 
concurrentChunkedThreads, chunkedSize);
+        mediator = new TransportMediator(tempDataDir,
+                concurrentTransfers,
+                concurrentChunkedThreads,
+                chunkedSize, doChunkStream);
         transferRequestExecutor = 
Executors.newFixedThreadPool(concurrentTransfers);
     }
 
diff --git 
a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java 
b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
index 49de2f8..f084853 100644
--- a/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
+++ b/agent/src/main/java/org/apache/airavata/mft/agent/TransportMediator.java
@@ -45,14 +45,20 @@ public class TransportMediator {
 
     private String tempDataDir = "/tmp";
     private final int chunkedSize;
+    private final boolean doChunkStreaming;
 
     private final ExecutorService chunkedExecutorService;
 
-    public TransportMediator(String tempDataDir, int concurrentTransfers, int 
concurrentChunkedThreads, int chunkedSize) {
+    public TransportMediator(String tempDataDir,
+                             int concurrentTransfers,
+                             int concurrentChunkedThreads,
+                             int chunkedSize,
+                             boolean doChunkStreaming) {
         this.tempDataDir = tempDataDir;
         monitorPool = Executors.newFixedThreadPool(concurrentTransfers);
         this.chunkedSize = chunkedSize;
         chunkedExecutorService = 
Executors.newFixedThreadPool(concurrentChunkedThreads);
+        this.doChunkStreaming = doChunkStreaming;
     }
 
     public void transferSingleThread(String transferId,
@@ -115,8 +121,10 @@ public class TransportMediator {
                         endPos = fileLength;
                     }
 
-                    String tempFile = tempDataDir + File.separator + 
transferId + "-" + chunkIdx;
-                    completionService.submit(new ChunkMover(inConnector, 
outConnector, uploadLength, endPos, chunkIdx, tempFile));
+
+                    completionService.submit(new ChunkMover(inConnector,
+                            outConnector, uploadLength, endPos, chunkIdx,
+                            transferId, doChunkStreaming));
 
                     uploadLength = endPos;
                     chunkIdx++;
@@ -221,30 +229,38 @@ public class TransportMediator {
         monitorPool.shutdown();
     }
 
-    private static class ChunkMover implements Callable<Integer> {
+    private class ChunkMover implements Callable<Integer> {
 
         IncomingChunkedConnector downloader;
         OutgoingChunkedConnector uploader;
         long startPos;
         long endPos;
         int chunkIdx;
-        String tempFile;
+        String transferId;
+        boolean useStreaming;
 
         public ChunkMover(IncomingChunkedConnector downloader, 
OutgoingChunkedConnector uploader, long startPos,
-                          long endPos, int chunkIdx, String tempFile) {
+                          long endPos, int chunkIdx, String transferId, 
boolean useStreaming) {
             this.downloader = downloader;
             this.uploader = uploader;
             this.startPos = startPos;
             this.endPos = endPos;
             this.chunkIdx = chunkIdx;
-            this.tempFile = tempFile;
+            this.transferId = transferId;
+            this.useStreaming = useStreaming;
         }
 
         @Override
         public Integer call() throws Exception {
-            downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
-            uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
-            new File(tempFile).delete();
+            if (useStreaming) {
+                InputStream inputStream = downloader.downloadChunk(chunkIdx, 
startPos, endPos);
+                uploader.uploadChunk(chunkIdx, startPos, endPos,inputStream);
+            } else {
+                String tempFile = tempDataDir + File.separator + transferId + 
"-" + chunkIdx;
+                downloader.downloadChunk(chunkIdx, startPos, endPos, tempFile);
+                uploader.uploadChunk(chunkIdx, startPos, endPos, tempFile);
+                new File(tempFile).delete();
+            }
             return chunkIdx;
         }
     }
diff --git 
a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
 
b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
index 24908a2..0848004 100644
--- 
a/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
+++ 
b/core/src/main/java/org/apache/airavata/mft/core/api/IncomingChunkedConnector.java
@@ -4,4 +4,5 @@ import java.io.InputStream;
 
 public interface IncomingChunkedConnector extends BasicConnector {
     public void downloadChunk(int chunkId, long startByte, long endByte, 
String downloadFile) throws Exception;
+    public InputStream downloadChunk(int chunkId, long startByte, long 
endByte) throws Exception;
 }
diff --git 
a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
 
b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
index d9cc5b5..77ea2ab 100644
--- 
a/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
+++ 
b/core/src/main/java/org/apache/airavata/mft/core/api/OutgoingChunkedConnector.java
@@ -4,4 +4,5 @@ import java.io.InputStream;
 
 public interface OutgoingChunkedConnector extends BasicConnector {
     public void uploadChunk(int chunkId, long startByte, long endByte, String 
uploadFile) throws Exception;
+    public void uploadChunk(int chunkId, long startByte, long endByte, 
InputStream inputStream) throws Exception;
 }
diff --git 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index 29e8263..19ecfaf 100644
--- 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++ 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -93,6 +93,16 @@ public class S3IncomingConnector implements 
IncomingChunkedConnector, IncomingSt
         logger.debug("Downloaded S3 chunk to path {} for resource id {}", 
downloadFile, resource.getResourceId());
     }
 
+    @Override
+    public InputStream downloadChunk(int chunkId, long startByte, long 
endByte) throws Exception {
+        GetObjectRequest rangeObjectRequest = new 
GetObjectRequest(resource.getS3Storage().getBucketName(),
+                resource.getFile().getResourcePath());
+        rangeObjectRequest.setRange(startByte, endByte - 1);
+        logger.debug("Fetching input stream for chunk {} in resource {}", 
chunkId, resource.getResourceId());
+        S3Object object = s3Client.getObject(rangeObjectRequest);
+        return object.getObjectContent();
+    }
+
     @Override
     public void complete() throws Exception {
 
diff --git 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index aa59378..e608e9e 100644
--- 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -96,6 +96,21 @@ public class S3OutgoingConnector implements 
OutgoingChunkedConnector {
         logger.debug("Uploaded S3 chunk to path {} for resource id {}", 
uploadFile, resource.getResourceId());
     }
 
+    @Override
+    public void uploadChunk(int chunkId, long startByte, long endByte, 
InputStream inputStream) throws Exception {
+        UploadPartRequest uploadRequest = new UploadPartRequest()
+                .withBucketName(resource.getS3Storage().getBucketName())
+                .withKey(resource.getFile().getResourcePath())
+                .withUploadId(initResponse.getUploadId())
+                .withPartNumber(chunkId + 1)
+                .withFileOffset(0)
+                .withInputStream(inputStream)
+                .withPartSize(endByte - startByte);
+
+        UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+        this.partETags.add(uploadResult.getPartETag());
+        logger.debug("Uploaded S3 chunk {} for resource id {} using stream", 
chunkId, resource.getResourceId());
+    }
 
     @Override
     public void complete() throws Exception {

Reply via email to