This is an automated email from the ASF dual-hosted git repository.

dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git


The following commit(s) were added to refs/heads/master by this push:
     new 149d816  S3 Connection Caching
149d816 is described below

commit 149d816e8fabf66c679e0ec23fcfac554d5c76f6
Author: DImuthuUpe <[email protected]>
AuthorDate: Tue Mar 7 09:31:37 2023 -0500

    S3 Connection Caching
---
 .../airavata/mft/agent/TransferOrchestrator.java   |   2 +
 .../airavata/mft/core/api/ConnectorConfig.java     |  17 ++++
 .../mft/transport/s3/S3IncomingConnector.java      |  17 +---
 .../mft/transport/s3/S3MetadataCollector.java      |  31 +-----
 .../mft/transport/s3/S3OutgoingConnector.java      |  96 ++++++++++---------
 .../apache/airavata/mft/transport/s3/S3Util.java   | 106 +++++++++++++++++++++
 6 files changed, 182 insertions(+), 87 deletions(-)

diff --git 
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
 
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
index 2eed2a0..dcdf01f 100644
--- 
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
+++ 
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
@@ -158,6 +158,7 @@ public class TransferOrchestrator {
                     .withSecret(sourceSecret)
                     .withStorage(sourceStorage)
                     .withResourcePath(endpointPath.getSourcePath())
+                    .withChunkSize(chunkedSize)
                     .withMetadata(srcMetadata).build();
 
             ConnectorConfig dstCC = 
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
@@ -165,6 +166,7 @@ public class TransferOrchestrator {
                     .withStorage(destStorage)
                     .withSecret(destSecret)
                     .withResourcePath(endpointPath.getDestinationPath())
+                    .withChunkSize(chunkedSize)
                     .withMetadata(srcMetadata).build();
 
             updateStatus.accept(endpointPath, new TransferState()
diff --git 
a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java 
b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
index c87b6a6..8cd7538 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
@@ -28,6 +28,8 @@ public class ConnectorConfig {
     private String resourcePath;
     private ResourceMetadata metadata;
 
+    private long chunkSize;
+
     public String getTransferId() {
         return transferId;
     }
@@ -68,6 +70,13 @@ public class ConnectorConfig {
         this.metadata = metadata;
     }
 
+    public long getChunkSize() {
+        return chunkSize;
+    }
+
+    public void setChunkSize(long chunkSize) {
+        this.chunkSize = chunkSize;
+    }
 
     public static final class ConnectorConfigBuilder {
         private String transferId;
@@ -76,6 +85,8 @@ public class ConnectorConfig {
         private String resourcePath;
         private ResourceMetadata metadata;
 
+        private long chunkSize;
+
         private ConnectorConfigBuilder() {
         }
 
@@ -108,6 +119,11 @@ public class ConnectorConfig {
             return this;
         }
 
+        public ConnectorConfigBuilder withChunkSize(long chunkSize) {
+            this.chunkSize = chunkSize;
+            return this;
+        }
+
         public ConnectorConfig build() {
             ConnectorConfig connectorConfig = new ConnectorConfig();
             connectorConfig.setTransferId(transferId);
@@ -115,6 +131,7 @@ public class ConnectorConfig {
             connectorConfig.setSecret(secret);
             connectorConfig.setResourcePath(resourcePath);
             connectorConfig.setMetadata(metadata);
+            connectorConfig.setChunkSize(chunkSize);
             return connectorConfig;
         }
     }
diff --git 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index 8de3541..0f49480 100644
--- 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++ 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -54,21 +54,8 @@ public class S3IncomingConnector implements 
IncomingChunkedConnector, IncomingSt
 
         S3Secret s3Secret = cc.getSecret().getS3();
 
-        AWSCredentials awsCreds;
-
-        if (s3Secret.getSessionToken() == null || 
s3Secret.getSessionToken().equals("")) {
-            awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), 
s3Secret.getSecretKey());
-        } else {
-            awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
-                    s3Secret.getSecretKey(),
-                    s3Secret.getSessionToken());
-        }
-
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
-                        s3Storage.getEndpoint(), s3Storage.getRegion()))
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .build();
+        s3Client = S3Util.getInstance().leaseS3Client(s3Secret, s3Storage);
+
     }
 
 
diff --git 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 7c3d1ab..712fcce 100644
--- 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++ 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -63,20 +63,7 @@ public class S3MetadataCollector implements 
MetadataCollector {
 
         checkInitialized();
 
-        AWSCredentials awsCreds;
-        if (s3Secret.getSessionToken() == null || 
s3Secret.getSessionToken().equals("")) {
-            awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), 
s3Secret.getSecretKey());
-        } else {
-            awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
-                    s3Secret.getSecretKey(),
-                    s3Secret.getSessionToken());
-        }
-
-        AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
-                .withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
-                        s3Storage.getEndpoint(), s3Storage.getRegion()))
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .build();
+        AmazonS3 s3Client = S3Util.getInstance().leaseS3Client(s3Secret, 
s3Storage);
 
         ResourceMetadata.Builder resourceBuilder = 
ResourceMetadata.newBuilder();
 
@@ -189,21 +176,7 @@ public class S3MetadataCollector implements 
MetadataCollector {
 
         checkInitialized();
 
-        AWSCredentials awsCreds;
-        if (s3Secret.getSessionToken() == null || 
s3Secret.getSessionToken().equals("")) {
-            awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), 
s3Secret.getSecretKey());
-        } else {
-            awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
-                    s3Secret.getSecretKey(),
-                    s3Secret.getSessionToken());
-        }
-
-        AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
-                .withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
-                        s3Storage.getEndpoint(), s3Storage.getRegion()))
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .build();
-
+        AmazonS3 s3Client = S3Util.getInstance().leaseS3Client(s3Secret, 
s3Storage);
         return s3Client.doesObjectExist(s3Storage.getBucketName(), 
resourcePath);
     }
 }
diff --git 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index 4d14694..4803b32 100644
--- 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++ 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.airavata.mft.transport.s3;
 
+import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -25,6 +26,7 @@ import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.*;
+import com.amazonaws.util.Md5Utils;
 import org.apache.airavata.mft.core.api.ConnectorConfig;
 import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
 import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
@@ -36,6 +38,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -47,6 +52,7 @@ public class S3OutgoingConnector implements 
OutgoingChunkedConnector {
     private S3Storage s3Storage;
     private AmazonS3 s3Client;
     private String resourcePath;
+    private long resourceLength;
 
     InitiateMultipartUploadResult initResponse;
     List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
@@ -55,65 +61,69 @@ public class S3OutgoingConnector implements 
OutgoingChunkedConnector {
     public void init(ConnectorConfig cc) throws Exception {
 
         this.resourcePath = cc.getResourcePath();
+        this.resourceLength = cc.getMetadata().getFile().getResourceSize();
 
         s3Storage = cc.getStorage().getS3();
 
         S3Secret s3Secret = cc.getSecret().getS3();
 
-        AWSCredentials awsCreds;
-        if (s3Secret.getSessionToken() == null || 
s3Secret.getSessionToken().equals("")) {
-            awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), 
s3Secret.getSecretKey());
+        s3Client = S3Util.getInstance().leaseS3Client(s3Secret, s3Storage);
+
+        if (cc.getChunkSize() < cc.getMetadata().getFile().getResourceSize()) {
+            InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(s3Storage.getBucketName(),
+                    resourcePath);
+            initResponse = s3Client.initiateMultipartUpload(initRequest);
+            logger.info("Initialized multipart upload for file {} in bucket 
{}",
+                    resourcePath, s3Storage.getBucketName());
         } else {
-            awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
-                    s3Secret.getSecretKey(),
-                    s3Secret.getSessionToken());
+            logger.info("Using non-multipart upload for file {} in bucket {}", 
resourcePath, s3Storage.getBucketName());
         }
-
-        s3Client = AmazonS3ClientBuilder.standard()
-                .withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
-                        s3Storage.getEndpoint(), s3Storage.getRegion()))
-                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
-                .build();
-
-        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(s3Storage.getBucketName(),
-                resourcePath);
-        initResponse = s3Client.initiateMultipartUpload(initRequest);
-        logger.info("Initialized multipart upload for file {} in bucket {}",
-                resourcePath, s3Storage.getBucketName());
     }
 
     @Override
     public void uploadChunk(int chunkId, long startByte, long endByte, String 
uploadFile) throws Exception {
         File file = new File(uploadFile);
-        UploadPartRequest uploadRequest = new UploadPartRequest()
-                .withBucketName(s3Storage.getBucketName())
-                .withKey(resourcePath)
-                .withUploadId(initResponse.getUploadId())
-                .withPartNumber(chunkId + 1)
-                .withFileOffset(0)
-                .withFile(file)
-                .withPartSize(file.length());
-
-        UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
-        this.partETags.add(uploadResult.getPartETag());
-        logger.debug("Uploaded S3 chunk to path {} for resource path {}", 
uploadFile, resourcePath);
+        if (initResponse != null) {
+
+            UploadPartRequest uploadRequest = new UploadPartRequest()
+                    .withBucketName(s3Storage.getBucketName())
+                    .withKey(resourcePath)
+                    .withUploadId(initResponse.getUploadId())
+                    .withPartNumber(chunkId + 1)
+                    .withFileOffset(0)
+                    //.withMD5Digest(Md5Utils.md5AsBase64(new 
File(uploadFile)))
+                    .withFile(file)
+                    .withPartSize(file.length());
+
+            UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+            this.partETags.add(uploadResult.getPartETag());
+            logger.debug("Uploaded S3 chunk to path {} for resource path {}", 
uploadFile, resourcePath);
+        } else {
+            s3Client.putObject(s3Storage.getBucketName(), resourcePath, 
uploadFile);
+        }
     }
 
     @Override
     public void uploadChunk(int chunkId, long startByte, long endByte, 
InputStream inputStream) throws Exception {
-        UploadPartRequest uploadRequest = new UploadPartRequest()
-                .withBucketName(s3Storage.getBucketName())
-                .withKey(resourcePath)
-                .withUploadId(initResponse.getUploadId())
-                .withPartNumber(chunkId + 1)
-                .withFileOffset(0)
-                .withInputStream(inputStream)
-                .withPartSize(endByte - startByte);
-
-        UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
-        inputStream.close();
-        this.partETags.add(uploadResult.getPartETag());
-        logger.debug("Uploaded S3 chunk {} for resource path {} using stream", 
chunkId, resourcePath);
+        if (initResponse != null) {
+            UploadPartRequest uploadRequest = new UploadPartRequest()
+                    .withBucketName(s3Storage.getBucketName())
+                    .withKey(resourcePath)
+                    .withUploadId(initResponse.getUploadId())
+                    .withPartNumber(chunkId + 1)
+                    .withFileOffset(0)
+                    .withInputStream(inputStream)
+                    .withPartSize(endByte - startByte);
+
+            UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+            inputStream.close();
+            this.partETags.add(uploadResult.getPartETag());
+            logger.debug("Uploaded S3 chunk {} for resource path {} using 
stream", chunkId, resourcePath);
+        } else {
+            ObjectMetadata metadata = new ObjectMetadata();
+            metadata.setContentLength(resourceLength);
+            s3Client.putObject(s3Storage.getBucketName(), resourcePath, 
inputStream, metadata);
+        }
     }
 
     @Override
diff --git 
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Util.java
 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Util.java
new file mode 100644
index 0000000..ecd178d
--- /dev/null
+++ 
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Util.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.swift.SwiftSecret;
+import org.apache.airavata.mft.credential.stubs.swift.SwiftV2AuthSecret;
+import org.apache.airavata.mft.credential.stubs.swift.SwiftV3AuthSecret;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class S3Util {
+
+    private ThreadLocal<Map<String, AmazonS3>> s3ClientCache = 
ThreadLocal.withInitial(() -> {
+        Map<String, AmazonS3> map = new HashMap<>();
+        return map;
+    });
+
+    private static S3Util instance;
+
+    private S3Util(){}
+
+    public static synchronized S3Util getInstance() {
+        if (instance == null) {
+            synchronized (S3Util.class) {
+                if (instance == null) {
+                    instance = new S3Util();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public void releaseSwiftApi(SwiftSecret swiftSecret) {
+
+    }
+
+    private String getSecretKey(S3Secret s3Secret, S3Storage s3Storage) throws 
NoSuchAlgorithmException {
+        String longSt =  s3Secret.getAccessKey()
+                + s3Secret.getSecretKey()
+                + s3Secret.getSessionToken()
+                + s3Storage.getEndpoint() + s3Storage.getRegion();
+
+        /*MessageDigest md = MessageDigest.getInstance("MD5");
+        md.update(longSt.getBytes());
+        byte[] digest = md.digest();
+        return new String(digest);*/
+
+        return longSt;
+    }
+
+    public AmazonS3 leaseS3Client(S3Secret s3Secret, S3Storage s3Storage) 
throws Exception {
+
+        String secretKey = getSecretKey(s3Secret, s3Storage);
+
+        if (s3ClientCache.get().containsKey(secretKey)) {
+            return s3ClientCache.get().get(secretKey);
+        }
+
+        AWSCredentials awsCreds;
+        if (s3Secret.getSessionToken() == null || 
s3Secret.getSessionToken().equals("")) {
+            awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(), 
s3Secret.getSecretKey());
+        } else {
+            awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
+                    s3Secret.getSecretKey(),
+                    s3Secret.getSessionToken());
+        }
+
+        AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
+                .withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
+                        s3Storage.getEndpoint(), s3Storage.getRegion()))
+                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+                .disableChunkedEncoding()
+                .build();
+
+        s3ClientCache.get().put(secretKey, s3Client);
+        return s3Client;
+    }
+}

Reply via email to