This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-mft.git
The following commit(s) were added to refs/heads/master by this push:
new 149d816 S3 Connection Caching
149d816 is described below
commit 149d816e8fabf66c679e0ec23fcfac554d5c76f6
Author: DImuthuUpe <[email protected]>
AuthorDate: Tue Mar 7 09:31:37 2023 -0500
S3 Connection Caching
---
.../airavata/mft/agent/TransferOrchestrator.java | 2 +
.../airavata/mft/core/api/ConnectorConfig.java | 17 ++++
.../mft/transport/s3/S3IncomingConnector.java | 17 +---
.../mft/transport/s3/S3MetadataCollector.java | 31 +-----
.../mft/transport/s3/S3OutgoingConnector.java | 96 ++++++++++---------
.../apache/airavata/mft/transport/s3/S3Util.java | 106 +++++++++++++++++++++
6 files changed, 182 insertions(+), 87 deletions(-)
diff --git
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
index 2eed2a0..dcdf01f 100644
---
a/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
+++
b/agent/service/src/main/java/org/apache/airavata/mft/agent/TransferOrchestrator.java
@@ -158,6 +158,7 @@ public class TransferOrchestrator {
.withSecret(sourceSecret)
.withStorage(sourceStorage)
.withResourcePath(endpointPath.getSourcePath())
+ .withChunkSize(chunkedSize)
.withMetadata(srcMetadata).build();
ConnectorConfig dstCC =
ConnectorConfig.ConnectorConfigBuilder.newBuilder()
@@ -165,6 +166,7 @@ public class TransferOrchestrator {
.withStorage(destStorage)
.withSecret(destSecret)
.withResourcePath(endpointPath.getDestinationPath())
+ .withChunkSize(chunkedSize)
.withMetadata(srcMetadata).build();
updateStatus.accept(endpointPath, new TransferState()
diff --git
a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
index c87b6a6..8cd7538 100644
--- a/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
+++ b/core/src/main/java/org/apache/airavata/mft/core/api/ConnectorConfig.java
@@ -28,6 +28,8 @@ public class ConnectorConfig {
private String resourcePath;
private ResourceMetadata metadata;
+ private long chunkSize;
+
public String getTransferId() {
return transferId;
}
@@ -68,6 +70,13 @@ public class ConnectorConfig {
this.metadata = metadata;
}
+ public long getChunkSize() {
+ return chunkSize;
+ }
+
+ public void setChunkSize(long chunkSize) {
+ this.chunkSize = chunkSize;
+ }
public static final class ConnectorConfigBuilder {
private String transferId;
@@ -76,6 +85,8 @@ public class ConnectorConfig {
private String resourcePath;
private ResourceMetadata metadata;
+ private long chunkSize;
+
private ConnectorConfigBuilder() {
}
@@ -108,6 +119,11 @@ public class ConnectorConfig {
return this;
}
+ public ConnectorConfigBuilder withChunkSize(long chunkSize) {
+ this.chunkSize = chunkSize;
+ return this;
+ }
+
public ConnectorConfig build() {
ConnectorConfig connectorConfig = new ConnectorConfig();
connectorConfig.setTransferId(transferId);
@@ -115,6 +131,7 @@ public class ConnectorConfig {
connectorConfig.setSecret(secret);
connectorConfig.setResourcePath(resourcePath);
connectorConfig.setMetadata(metadata);
+ connectorConfig.setChunkSize(chunkSize);
return connectorConfig;
}
}
diff --git
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
index 8de3541..0f49480 100644
---
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
+++
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3IncomingConnector.java
@@ -54,21 +54,8 @@ public class S3IncomingConnector implements
IncomingChunkedConnector, IncomingSt
S3Secret s3Secret = cc.getSecret().getS3();
- AWSCredentials awsCreds;
-
- if (s3Secret.getSessionToken() == null ||
s3Secret.getSessionToken().equals("")) {
- awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(),
s3Secret.getSecretKey());
- } else {
- awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
- s3Secret.getSecretKey(),
- s3Secret.getSessionToken());
- }
-
- s3Client = AmazonS3ClientBuilder.standard()
- .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
- s3Storage.getEndpoint(), s3Storage.getRegion()))
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .build();
+ s3Client = S3Util.getInstance().leaseS3Client(s3Secret, s3Storage);
+
}
diff --git
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
index 7c3d1ab..712fcce 100644
---
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
+++
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3MetadataCollector.java
@@ -63,20 +63,7 @@ public class S3MetadataCollector implements
MetadataCollector {
checkInitialized();
- AWSCredentials awsCreds;
- if (s3Secret.getSessionToken() == null ||
s3Secret.getSessionToken().equals("")) {
- awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(),
s3Secret.getSecretKey());
- } else {
- awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
- s3Secret.getSecretKey(),
- s3Secret.getSessionToken());
- }
-
- AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
- .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
- s3Storage.getEndpoint(), s3Storage.getRegion()))
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .build();
+ AmazonS3 s3Client = S3Util.getInstance().leaseS3Client(s3Secret,
s3Storage);
ResourceMetadata.Builder resourceBuilder =
ResourceMetadata.newBuilder();
@@ -189,21 +176,7 @@ public class S3MetadataCollector implements
MetadataCollector {
checkInitialized();
- AWSCredentials awsCreds;
- if (s3Secret.getSessionToken() == null ||
s3Secret.getSessionToken().equals("")) {
- awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(),
s3Secret.getSecretKey());
- } else {
- awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
- s3Secret.getSecretKey(),
- s3Secret.getSessionToken());
- }
-
- AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
- .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
- s3Storage.getEndpoint(), s3Storage.getRegion()))
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .build();
-
+ AmazonS3 s3Client = S3Util.getInstance().leaseS3Client(s3Secret,
s3Storage);
return s3Client.doesObjectExist(s3Storage.getBucketName(),
resourcePath);
}
}
diff --git
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
index 4d14694..4803b32 100644
---
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
+++
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3OutgoingConnector.java
@@ -17,6 +17,7 @@
package org.apache.airavata.mft.transport.s3;
+import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@@ -25,6 +26,7 @@ import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.*;
+import com.amazonaws.util.Md5Utils;
import org.apache.airavata.mft.core.api.ConnectorConfig;
import org.apache.airavata.mft.core.api.OutgoingChunkedConnector;
import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
@@ -36,6 +38,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -47,6 +52,7 @@ public class S3OutgoingConnector implements
OutgoingChunkedConnector {
private S3Storage s3Storage;
private AmazonS3 s3Client;
private String resourcePath;
+ private long resourceLength;
InitiateMultipartUploadResult initResponse;
List<PartETag> partETags = Collections.synchronizedList(new ArrayList<>());
@@ -55,65 +61,69 @@ public class S3OutgoingConnector implements
OutgoingChunkedConnector {
public void init(ConnectorConfig cc) throws Exception {
this.resourcePath = cc.getResourcePath();
+ this.resourceLength = cc.getMetadata().getFile().getResourceSize();
s3Storage = cc.getStorage().getS3();
S3Secret s3Secret = cc.getSecret().getS3();
- AWSCredentials awsCreds;
- if (s3Secret.getSessionToken() == null ||
s3Secret.getSessionToken().equals("")) {
- awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(),
s3Secret.getSecretKey());
+ s3Client = S3Util.getInstance().leaseS3Client(s3Secret, s3Storage);
+
+ if (cc.getChunkSize() < cc.getMetadata().getFile().getResourceSize()) {
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(s3Storage.getBucketName(),
+ resourcePath);
+ initResponse = s3Client.initiateMultipartUpload(initRequest);
+ logger.info("Initialized multipart upload for file {} in bucket
{}",
+ resourcePath, s3Storage.getBucketName());
} else {
- awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
- s3Secret.getSecretKey(),
- s3Secret.getSessionToken());
+ logger.info("Using non-multipart upload for file {} in bucket {}",
resourcePath, s3Storage.getBucketName());
}
-
- s3Client = AmazonS3ClientBuilder.standard()
- .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
- s3Storage.getEndpoint(), s3Storage.getRegion()))
- .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
- .build();
-
- InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(s3Storage.getBucketName(),
- resourcePath);
- initResponse = s3Client.initiateMultipartUpload(initRequest);
- logger.info("Initialized multipart upload for file {} in bucket {}",
- resourcePath, s3Storage.getBucketName());
}
@Override
public void uploadChunk(int chunkId, long startByte, long endByte, String
uploadFile) throws Exception {
File file = new File(uploadFile);
- UploadPartRequest uploadRequest = new UploadPartRequest()
- .withBucketName(s3Storage.getBucketName())
- .withKey(resourcePath)
- .withUploadId(initResponse.getUploadId())
- .withPartNumber(chunkId + 1)
- .withFileOffset(0)
- .withFile(file)
- .withPartSize(file.length());
-
- UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
- this.partETags.add(uploadResult.getPartETag());
- logger.debug("Uploaded S3 chunk to path {} for resource path {}",
uploadFile, resourcePath);
+ if (initResponse != null) {
+
+ UploadPartRequest uploadRequest = new UploadPartRequest()
+ .withBucketName(s3Storage.getBucketName())
+ .withKey(resourcePath)
+ .withUploadId(initResponse.getUploadId())
+ .withPartNumber(chunkId + 1)
+ .withFileOffset(0)
+ //.withMD5Digest(Md5Utils.md5AsBase64(new
File(uploadFile)))
+ .withFile(file)
+ .withPartSize(file.length());
+
+ UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+ this.partETags.add(uploadResult.getPartETag());
+ logger.debug("Uploaded S3 chunk to path {} for resource path {}",
uploadFile, resourcePath);
+ } else {
+ s3Client.putObject(s3Storage.getBucketName(), resourcePath,
uploadFile);
+ }
}
@Override
public void uploadChunk(int chunkId, long startByte, long endByte,
InputStream inputStream) throws Exception {
- UploadPartRequest uploadRequest = new UploadPartRequest()
- .withBucketName(s3Storage.getBucketName())
- .withKey(resourcePath)
- .withUploadId(initResponse.getUploadId())
- .withPartNumber(chunkId + 1)
- .withFileOffset(0)
- .withInputStream(inputStream)
- .withPartSize(endByte - startByte);
-
- UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
- inputStream.close();
- this.partETags.add(uploadResult.getPartETag());
- logger.debug("Uploaded S3 chunk {} for resource path {} using stream",
chunkId, resourcePath);
+ if (initResponse != null) {
+ UploadPartRequest uploadRequest = new UploadPartRequest()
+ .withBucketName(s3Storage.getBucketName())
+ .withKey(resourcePath)
+ .withUploadId(initResponse.getUploadId())
+ .withPartNumber(chunkId + 1)
+ .withFileOffset(0)
+ .withInputStream(inputStream)
+ .withPartSize(endByte - startByte);
+
+ UploadPartResult uploadResult = s3Client.uploadPart(uploadRequest);
+ inputStream.close();
+ this.partETags.add(uploadResult.getPartETag());
+ logger.debug("Uploaded S3 chunk {} for resource path {} using
stream", chunkId, resourcePath);
+ } else {
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(resourceLength);
+ s3Client.putObject(s3Storage.getBucketName(), resourcePath,
inputStream, metadata);
+ }
}
@Override
diff --git
a/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Util.java
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Util.java
new file mode 100644
index 0000000..ecd178d
--- /dev/null
+++
b/transport/s3-transport/src/main/java/org/apache/airavata/mft/transport/s3/S3Util.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.airavata.mft.transport.s3;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.BasicSessionCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.airavata.mft.credential.stubs.s3.S3Secret;
+import org.apache.airavata.mft.credential.stubs.swift.SwiftSecret;
+import org.apache.airavata.mft.credential.stubs.swift.SwiftV2AuthSecret;
+import org.apache.airavata.mft.credential.stubs.swift.SwiftV3AuthSecret;
+import org.apache.airavata.mft.resource.stubs.s3.storage.S3Storage;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class S3Util {
+
+ private ThreadLocal<Map<String, AmazonS3>> s3ClientCache =
ThreadLocal.withInitial(() -> {
+ Map<String, AmazonS3> map = new HashMap<>();
+ return map;
+ });
+
+ private static S3Util instance;
+
+ private S3Util(){}
+
+ public static synchronized S3Util getInstance() {
+ if (instance == null) {
+ synchronized (S3Util.class) {
+ if (instance == null) {
+ instance = new S3Util();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public void releaseSwiftApi(SwiftSecret swiftSecret) {
+
+ }
+
+ private String getSecretKey(S3Secret s3Secret, S3Storage s3Storage) throws
NoSuchAlgorithmException {
+ String longSt = s3Secret.getAccessKey()
+ + s3Secret.getSecretKey()
+ + s3Secret.getSessionToken()
+ + s3Storage.getEndpoint() + s3Storage.getRegion();
+
+ /*MessageDigest md = MessageDigest.getInstance("MD5");
+ md.update(longSt.getBytes());
+ byte[] digest = md.digest();
+ return new String(digest);*/
+
+ return longSt;
+ }
+
+ public AmazonS3 leaseS3Client(S3Secret s3Secret, S3Storage s3Storage)
throws Exception {
+
+ String secretKey = getSecretKey(s3Secret, s3Storage);
+
+ if (s3ClientCache.get().containsKey(secretKey)) {
+ return s3ClientCache.get().get(secretKey);
+ }
+
+ AWSCredentials awsCreds;
+ if (s3Secret.getSessionToken() == null ||
s3Secret.getSessionToken().equals("")) {
+ awsCreds = new BasicAWSCredentials(s3Secret.getAccessKey(),
s3Secret.getSecretKey());
+ } else {
+ awsCreds = new BasicSessionCredentials(s3Secret.getAccessKey(),
+ s3Secret.getSecretKey(),
+ s3Secret.getSessionToken());
+ }
+
+ AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
+ .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
+ s3Storage.getEndpoint(), s3Storage.getRegion()))
+ .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
+ .disableChunkedEncoding()
+ .build();
+
+ s3ClientCache.get().put(secretKey, s3Client);
+ return s3Client;
+ }
+}