This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new b318223189 NIFI-13776 Updated CopyS3Object to Handle Files over 5 GB
(#9286)
b318223189 is described below
commit b318223189e2343d04b65cedc2a5d5c4321ddc32
Author: Mike <[email protected]>
AuthorDate: Tue Oct 8 14:26:19 2024 -0400
NIFI-13776 Updated CopyS3Object to Handle Files over 5 GB (#9286)
- Added multipart copying for files over 5 GB
Signed-off-by: David Handermann <[email protected]>
---
.../src/main/resources/META-INF/NOTICE | 6 +
.../nifi/processors/aws/s3/CopyS3Object.java | 139 +++++++++++++++++++--
.../nifi/processors/aws/s3/TestCopyS3Object.java | 8 ++
3 files changed, 142 insertions(+), 11 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
index 29939af0f0..133327c742 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
@@ -174,6 +174,12 @@ The following binary components are provided under the
Apache Software License v
Error Prone Annotations
Copyright 2015 The Error Prone Authors
+The multi-part copy code in CopyS3Object was derived from code found in the
Amazon S3 documentation. This is the specific file for reference:
+
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java#L18
+
+ This code was provided under the terms of the Apache Software License V2
per this:
+
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/LICENSE
+
************************
Eclipse Distribution License 1.0
************************
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
index 1a66e5194f..f929609549 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
@@ -16,11 +16,21 @@
*/
package org.apache.nifi.processors.aws.s3;
-import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -30,15 +40,21 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Copies a file from one bucket and key to another in
AWS S3")
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class,
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
public class CopyS3Object extends AbstractS3Processor {
+ public static final long MULTIPART_THRESHOLD = 5L * 1024L * 1024L * 1024L;
static final PropertyDescriptor SOURCE_BUCKET = new
PropertyDescriptor.Builder()
.fromPropertyDescriptor(BUCKET)
@@ -119,28 +135,129 @@ public class CopyS3Object extends AbstractS3Processor {
final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+
+ final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+ boolean multipartUploadRequired = false;
+
try {
- final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
+ final GetObjectMetadataRequest sourceMetadataRequest = new
GetObjectMetadataRequest(sourceBucket, sourceKey);
+ final ObjectMetadata metadataResult =
s3.getObjectMetadata(sourceMetadataRequest);
+ final long contentLength = metadataResult.getContentLength();
+ multipartUploadRequired = contentLength > MULTIPART_THRESHOLD;
final AccessControlList acl = createACL(context, flowFile);
- if (acl != null) {
- request.setAccessControlList(acl);
- }
-
final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
- if (cannedAccessControlList != null) {
- request.setCannedAccessControlList(cannedAccessControlList);
- }
- s3.copyObject(request);
+ if (multipartUploadRequired) {
+ copyMultipart(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket,
+ destinationKey, multipartIdRef, contentLength);
+ } else {
+ copyObject(s3, acl, cannedAccessControlList, sourceBucket,
sourceKey, destinationBucket, destinationKey);
+ }
session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
- } catch (final IllegalArgumentException | AmazonClientException e) {
+ } catch (final Exception e) {
+ if (multipartUploadRequired &&
!StringUtils.isEmpty(multipartIdRef.get())) {
+ try {
+ final AbortMultipartUploadRequest abortRequest = new
AbortMultipartUploadRequest(destinationBucket, destinationKey,
multipartIdRef.get());
+ s3.abortMultipartUpload(abortRequest);
+ } catch (final AmazonS3Exception s3e) {
+ getLogger().warn("Abort Multipart Upload failed for Bucket
[{}] Key [{}]", destinationBucket, destinationKey, s3e);
+ }
+ }
+
flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}
+ /*
+ * Sections of this code were derived from example code from the official
AWS S3 documentation. Specifically this example:
+ *
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+ */
+ private void copyMultipart(final AmazonS3Client s3, final
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey, final AtomicReference<String> multipartIdRef,
+ final long contentLength) {
+ InitiateMultipartUploadRequest initRequest = new
InitiateMultipartUploadRequest(destinationBucket,
+ destinationKey);
+ if (acl != null) {
+ initRequest.setAccessControlList(acl);
+ }
+ if (cannedAccessControlList != null) {
+ initRequest.setCannedACL(cannedAccessControlList);
+ }
+
+ final InitiateMultipartUploadResult initResult =
s3.initiateMultipartUpload(initRequest);
+
+ multipartIdRef.set(initResult.getUploadId());
+
+ long bytePosition = 0;
+ int partNumber = 1;
+ final List<CopyPartResult> copyPartResults = new ArrayList<>();
+ while (bytePosition < contentLength) {
+ long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1,
contentLength - 1);
+
+ final CopyPartRequest copyPartRequest = new CopyPartRequest()
+ .withSourceBucketName(sourceBucket)
+ .withSourceKey(sourceKey)
+ .withDestinationBucketName(destinationBucket)
+ .withDestinationKey(destinationKey)
+ .withUploadId(initResult.getUploadId())
+ .withFirstByte(bytePosition)
+ .withLastByte(lastByte)
+ .withPartNumber(partNumber++);
+
+ doRetryLoop(partRequest ->
copyPartResults.add(s3.copyPart((CopyPartRequest) partRequest)),
copyPartRequest);
+
+ bytePosition += MULTIPART_THRESHOLD;
+ }
+
+ final CompleteMultipartUploadRequest completeRequest = new
CompleteMultipartUploadRequest(
+ destinationBucket,
+ destinationKey,
+ initResult.getUploadId(),
+ copyPartResults.stream().map(response -> new
PartETag(response.getPartNumber(), response.getETag()))
+ .collect(Collectors.toList()));
+ doRetryLoop(complete -> s3.completeMultipartUpload(completeRequest),
completeRequest);
+ }
+
+ private void doRetryLoop(Consumer<AmazonWebServiceRequest> consumer,
AmazonWebServiceRequest request) {
+ boolean requestComplete = false;
+ int retryIndex = 0;
+
+ while (!requestComplete) {
+ try {
+ consumer.accept(request);
+ requestComplete = true;
+ } catch (AmazonS3Exception e) {
+ if (e.getStatusCode() == 503 && retryIndex < 3) {
+ retryIndex++;
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private void copyObject(final AmazonS3Client s3, final AccessControlList
acl,
+ final CannedAccessControlList cannedAcl,
+ final String sourceBucket, final String
sourceKey,
+ final String destinationBucket, final String
destinationKey) {
+ final CopyObjectRequest request = new CopyObjectRequest(sourceBucket,
sourceKey, destinationBucket, destinationKey);
+
+ if (acl != null) {
+ request.setAccessControlList(acl);
+ }
+
+ if (cannedAcl != null) {
+ request.setCannedAccessControlList(cannedAcl);
+ }
+
+ s3.copyObject(request);
+ }
+
private String getTransitUrl(final String destinationBucket, final String
destinationKey) {
final String spacer = destinationKey.startsWith("/") ? "" : "/";
return String.format("s3://%s%s%s", destinationBucket, spacer,
destinationKey);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
index 0183b4f9ef..ebd38f4ec6 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
@@ -22,6 +22,8 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.util.TestRunner;
@@ -53,6 +55,12 @@ public class TestCopyS3Object {
final CopyS3Object mockCopyS3Object = new CopyS3Object() {
@Override
protected AmazonS3Client createClient(final ProcessContext
context, final AWSCredentialsProvider credentialsProvider, final
ClientConfiguration config) {
+ ObjectMetadata metadata = mock(ObjectMetadata.class);
+
+ when(metadata.getContentLength()).thenReturn(1000L);
+
when(mockS3Client.getObjectMetadata(any(GetObjectMetadataRequest.class)))
+ .thenReturn(metadata);
+
return mockS3Client;
}
};