jrsteinebrey commented on code in PR #8992:
URL: https://github.com/apache/nifi/pull/8992#discussion_r1659495668


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+    public static final PropertyDescriptor TARGET_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("copy-s3-object-target-bucket")
+            .displayName("Target Bucket")
+            .description("The bucket that will receive the copy.")
+            .build();
+
+    public static final PropertyDescriptor SOURCE_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-source-key")
+            .displayName("Source Key")
+            .description("The source key in the source bucket")
+            .build();
+
+    public static final PropertyDescriptor TARGET_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-target-key")
+            .displayName("Target Key")
+            .description("The target key in the target bucket")
+            .defaultValue("")
+            .build();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            SOURCE_BUCKET,
+            SOURCE_KEY,
+            TARGET_BUCKET,
+            TARGET_KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            WRITE_USER_LIST,
+            READ_ACL_LIST,
+            WRITE_ACL_LIST,
+            CANNED_ACL,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String sourceBucket = 
context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String sourceKey = 
context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetBucket = 
context.getProperty(TARGET_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetKey = 
context.getProperty(TARGET_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            CopyObjectRequest request = new CopyObjectRequest(sourceBucket, 
sourceKey, targetBucket, targetKey);
+            AccessControlList acl = createACL(context, flowFile);
+            if (acl != null) {
+                request.setAccessControlList(acl);
+            }
+            CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
+
+            if (cannedAccessControlList != null) {
+                request.setCannedAccessControlList(cannedAccessControlList);
+            }
+
+            s3.copyObject(request);
+            session.getProvenanceReporter().send(flowFile, 
getTransitUrl(targetBucket, targetKey));
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception ex) {
+            getLogger().error("Copy S3 Object Request failed with error:", ex);

Review Comment:
   Thanks, @joewitt for raising valid points to consider. I feel the attributes 
are beneficial and relatively small with no stack traces as you can inspect in 
the code below.
   FYI, the method I suggested is used in all 5 of the existing S3Object 
related processors. 
   @joewitt Are you OK with the new processors using the 
extractExceptionDetails() like the existing S3 processors do?
   
   This is the method that adds exception related attributes.
   `protected FlowFile extractExceptionDetails(final Exception e, final 
ProcessSession session, FlowFile flowFile) {
           flowFile = session.putAttribute(flowFile, "s3.exception", 
e.getClass().getName());
           if (e instanceof AmazonS3Exception) {
               flowFile = putAttribute(session, flowFile, 
"s3.additionalDetails", ((AmazonS3Exception) e).getAdditionalDetails());
           }
           if (e instanceof final AmazonServiceException ase) {
               flowFile = putAttribute(session, flowFile, "s3.statusCode", 
ase.getStatusCode());
               flowFile = putAttribute(session, flowFile, "s3.errorCode", 
ase.getErrorCode());
               flowFile = putAttribute(session, flowFile, "s3.errorMessage", 
ase.getErrorMessage());
           }
           return flowFile;
       }
   `



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+    public static final PropertyDescriptor TARGET_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("copy-s3-object-target-bucket")
+            .displayName("Target Bucket")
+            .description("The bucket that will receive the copy.")
+            .build();
+
+    public static final PropertyDescriptor SOURCE_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-source-key")
+            .displayName("Source Key")
+            .description("The source key in the source bucket")
+            .build();
+
+    public static final PropertyDescriptor TARGET_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-target-key")
+            .displayName("Target Key")
+            .description("The target key in the target bucket")
+            .defaultValue("")
+            .build();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            SOURCE_BUCKET,
+            SOURCE_KEY,
+            TARGET_BUCKET,
+            TARGET_KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            WRITE_USER_LIST,
+            READ_ACL_LIST,
+            WRITE_ACL_LIST,
+            CANNED_ACL,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String sourceBucket = 
context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String sourceKey = 
context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetBucket = 
context.getProperty(TARGET_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetKey = 
context.getProperty(TARGET_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            CopyObjectRequest request = new CopyObjectRequest(sourceBucket, 
sourceKey, targetBucket, targetKey);
+            AccessControlList acl = createACL(context, flowFile);
+            if (acl != null) {
+                request.setAccessControlList(acl);
+            }
+            CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
+
+            if (cannedAccessControlList != null) {
+                request.setCannedAccessControlList(cannedAccessControlList);
+            }
+
+            s3.copyObject(request);
+            session.getProvenanceReporter().send(flowFile, 
getTransitUrl(targetBucket, targetKey));
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (Exception ex) {
+            getLogger().error("Copy S3 Object Request failed with error:", ex);

Review Comment:
   Thanks, @joewitt for raising valid points to consider. I feel the attributes 
are beneficial and relatively small with no stack traces as you can inspect in 
the code below.
   FYI, the method I suggested is used in all 5 of the existing S3Object 
related processors. 
   @joewitt Are you OK with the new processors using the 
extractExceptionDetails() like the existing S3 processors do?
   
   This is the method that adds exception related attributes.
   ````
   protected FlowFile extractExceptionDetails(final Exception e, final 
ProcessSession session, FlowFile flowFile) {
           flowFile = session.putAttribute(flowFile, "s3.exception", 
e.getClass().getName());
           if (e instanceof AmazonS3Exception) {
               flowFile = putAttribute(session, flowFile, 
"s3.additionalDetails", ((AmazonS3Exception) e).getAdditionalDetails());
           }
           if (e instanceof final AmazonServiceException ase) {
               flowFile = putAttribute(session, flowFile, "s3.statusCode", 
ase.getStatusCode());
               flowFile = putAttribute(session, flowFile, "s3.errorCode", 
ase.getErrorCode());
               flowFile = putAttribute(session, flowFile, "s3.errorMessage", 
ase.getErrorMessage());
           }
           return flowFile;
       }
   
   ````



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to