This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new b318223189 NIFI-13776 Updated CopyS3Object to Handle Files over 5 GB 
(#9286)
b318223189 is described below

commit b318223189e2343d04b65cedc2a5d5c4321ddc32
Author: Mike <[email protected]>
AuthorDate: Tue Oct 8 14:26:19 2024 -0400

    NIFI-13776 Updated CopyS3Object to Handle Files over 5 GB (#9286)
    
    - Added multipart copying for files over 5 GB
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../src/main/resources/META-INF/NOTICE             |   6 +
 .../nifi/processors/aws/s3/CopyS3Object.java       | 139 +++++++++++++++++++--
 .../nifi/processors/aws/s3/TestCopyS3Object.java   |   8 ++
 3 files changed, 142 insertions(+), 11 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
index 29939af0f0..133327c742 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/src/main/resources/META-INF/NOTICE
@@ -174,6 +174,12 @@ The following binary components are provided under the 
Apache Software License v
       Error Prone Annotations
       Copyright 2015 The Error Prone Authors
 
+The multi-part copy code in CopyS3Object was derived from code found in the 
Amazon S3 documentation. This is the specific file for reference:
+    
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java#L18
+
+    This code was provided under the terms of the Apache Software License V2 
per this:
+    
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/LICENSE
+
 ************************
 Eclipse Distribution License 1.0
 ************************
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
index 1a66e5194f..f929609549 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
@@ -16,11 +16,21 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
-import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonWebServiceRequest;
 import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.CopyPartRequest;
+import com.amazonaws.services.s3.model.CopyPartResult;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -30,15 +40,21 @@ import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.StringUtils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 @Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
 @SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
 public class CopyS3Object extends AbstractS3Processor {
+    public static final long MULTIPART_THRESHOLD = 5L * 1024L * 1024L * 1024L;
 
     static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
             .fromPropertyDescriptor(BUCKET)
@@ -119,28 +135,129 @@ public class CopyS3Object extends AbstractS3Processor {
         final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
+
+
+        final AtomicReference<String> multipartIdRef = new AtomicReference<>();
+        boolean multipartUploadRequired = false;
+
         try {
-            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
+            final GetObjectMetadataRequest sourceMetadataRequest = new 
GetObjectMetadataRequest(sourceBucket, sourceKey);
+            final ObjectMetadata metadataResult = 
s3.getObjectMetadata(sourceMetadataRequest);
+            final long contentLength = metadataResult.getContentLength();
+            multipartUploadRequired = contentLength > MULTIPART_THRESHOLD;
             final AccessControlList acl = createACL(context, flowFile);
-            if (acl != null) {
-                request.setAccessControlList(acl);
-            }
-
             final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
-            if (cannedAccessControlList != null) {
-                request.setCannedAccessControlList(cannedAccessControlList);
-            }
 
-            s3.copyObject(request);
+            if (multipartUploadRequired) {
+                copyMultipart(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket,
+                        destinationKey, multipartIdRef, contentLength);
+            } else {
+                copyObject(s3, acl, cannedAccessControlList, sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+            }
             session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
             session.transfer(flowFile, REL_SUCCESS);
-        } catch (final IllegalArgumentException | AmazonClientException e) {
+        } catch (final Exception e) {
+            if (multipartUploadRequired && 
!StringUtils.isEmpty(multipartIdRef.get())) {
+                try {
+                    final AbortMultipartUploadRequest abortRequest = new 
AbortMultipartUploadRequest(destinationBucket, destinationKey, 
multipartIdRef.get());
+                    s3.abortMultipartUpload(abortRequest);
+                } catch (final AmazonS3Exception s3e) {
+                    getLogger().warn("Abort Multipart Upload failed for Bucket 
[{}] Key [{}]", destinationBucket, destinationKey, s3e);
+                }
+            }
+
             flowFile = extractExceptionDetails(e, session, flowFile);
             getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }
 
+    /*
+     * Sections of this code were derived from example code from the official 
AWS S3 documentation. Specifically this example:
+     * 
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
+     */
+    private void copyMultipart(final AmazonS3Client s3, final 
AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey, final AtomicReference<String> multipartIdRef,
+                               final long contentLength) {
+        InitiateMultipartUploadRequest initRequest = new 
InitiateMultipartUploadRequest(destinationBucket,
+                destinationKey);
+        if (acl != null) {
+            initRequest.setAccessControlList(acl);
+        }
+        if (cannedAccessControlList != null) {
+            initRequest.setCannedACL(cannedAccessControlList);
+        }
+
+        final InitiateMultipartUploadResult initResult = 
s3.initiateMultipartUpload(initRequest);
+
+        multipartIdRef.set(initResult.getUploadId());
+
+        long bytePosition = 0;
+        int partNumber = 1;
+        final List<CopyPartResult> copyPartResults = new ArrayList<>();
+        while (bytePosition < contentLength) {
+            long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, 
contentLength - 1);
+
+            final CopyPartRequest copyPartRequest = new CopyPartRequest()
+                    .withSourceBucketName(sourceBucket)
+                    .withSourceKey(sourceKey)
+                    .withDestinationBucketName(destinationBucket)
+                    .withDestinationKey(destinationKey)
+                    .withUploadId(initResult.getUploadId())
+                    .withFirstByte(bytePosition)
+                    .withLastByte(lastByte)
+                    .withPartNumber(partNumber++);
+
+            doRetryLoop(partRequest -> 
copyPartResults.add(s3.copyPart((CopyPartRequest) partRequest)), 
copyPartRequest);
+
+            bytePosition += MULTIPART_THRESHOLD;
+        }
+
+        final CompleteMultipartUploadRequest completeRequest = new 
CompleteMultipartUploadRequest(
+                destinationBucket,
+                destinationKey,
+                initResult.getUploadId(),
+                copyPartResults.stream().map(response -> new 
PartETag(response.getPartNumber(), response.getETag()))
+                        .collect(Collectors.toList()));
+        doRetryLoop(complete -> s3.completeMultipartUpload(completeRequest), 
completeRequest);
+    }
+
+    private void doRetryLoop(Consumer<AmazonWebServiceRequest> consumer, 
AmazonWebServiceRequest request) {
+        boolean requestComplete = false;
+        int retryIndex = 0;
+
+        while (!requestComplete) {
+            try {
+                consumer.accept(request);
+                requestComplete = true;
+            } catch (AmazonS3Exception e) {
+                if (e.getStatusCode() == 503 && retryIndex < 3) {
+                    retryIndex++;
+                } else {
+                    throw e;
+                }
+            }
+        }
+    }
+
+    private void copyObject(final AmazonS3Client s3, final AccessControlList 
acl,
+                               final CannedAccessControlList cannedAcl,
+                               final String sourceBucket, final String 
sourceKey,
+                               final String destinationBucket, final String 
destinationKey) {
+        final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, 
sourceKey, destinationBucket, destinationKey);
+
+        if (acl != null) {
+            request.setAccessControlList(acl);
+        }
+
+        if (cannedAcl != null) {
+            request.setCannedAccessControlList(cannedAcl);
+        }
+
+        s3.copyObject(request);
+    }
+
     private String getTransitUrl(final String destinationBucket, final String 
destinationKey) {
         final String spacer = destinationKey.startsWith("/") ? "" : "/";
         return String.format("s3://%s%s%s", destinationBucket, spacer, 
destinationKey);
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
index 0183b4f9ef..ebd38f4ec6 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
@@ -22,6 +22,8 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.util.TestRunner;
@@ -53,6 +55,12 @@ public class TestCopyS3Object {
         final CopyS3Object mockCopyS3Object = new CopyS3Object() {
             @Override
             protected AmazonS3Client createClient(final ProcessContext 
context, final AWSCredentialsProvider credentialsProvider, final 
ClientConfiguration config) {
+                ObjectMetadata metadata = mock(ObjectMetadata.class);
+
+                when(metadata.getContentLength()).thenReturn(1000L);
+                
when(mockS3Client.getObjectMetadata(any(GetObjectMetadataRequest.class)))
+                        .thenReturn(metadata);
+
                 return mockS3Client;
             }
         };

Reply via email to