This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 99f3717f13 NIFI-13430 Added CopyS3Object and GetS3ObjectMetadata
99f3717f13 is described below

commit 99f3717f13169ed9504c3c05650938940ee169d4
Author: Mike Thomsen <[email protected]>
AuthorDate: Thu Jun 20 09:32:29 2024 -0400

    NIFI-13430 Added CopyS3Object and GetS3ObjectMetadata
    
    This closes #8992
    
    Co-authored-by: David Handermann <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/processors/aws/s3/CopyS3Object.java       | 149 ++++++++++++++
 .../nifi/processors/aws/s3/DeleteS3Object.java     |   2 +-
 .../nifi/processors/aws/s3/FetchS3Object.java      |   2 +-
 .../processors/aws/s3/GetS3ObjectMetadata.java     | 216 +++++++++++++++++++++
 .../org/apache/nifi/processors/aws/s3/ListS3.java  |   2 +-
 .../apache/nifi/processors/aws/s3/PutS3Object.java |   2 +-
 .../apache/nifi/processors/aws/s3/TagS3Object.java |   2 +-
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../nifi/processors/aws/s3/TestCopyS3Object.java   | 108 +++++++++++
 .../processors/aws/s3/TestGetS3ObjectMetadata.java | 202 +++++++++++++++++++
 10 files changed, 682 insertions(+), 5 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
new file mode 100644
index 0000000000..9f886bda50
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+
+    static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("Source Bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+
+    static final PropertyDescriptor SOURCE_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("Source Key")
+            .displayName("Source Key")
+            .description("The source key in the source bucket")
+            .build();
+
+    static final PropertyDescriptor DESTINATION_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("Destination Bucket")
+            .displayName("Destination Bucket")
+            .description("The bucket that will receive the copy.")
+            .build();
+
+    static final PropertyDescriptor DESTINATION_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("Destination Key")
+            .displayName("Destination Key")
+            .description("The target key in the target bucket")
+            .defaultValue("${filename}-1")
+            .build();
+
+    static final List<PropertyDescriptor> properties = List.of(
+            SOURCE_BUCKET,
+            SOURCE_KEY,
+            DESTINATION_BUCKET,
+            DESTINATION_KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            WRITE_USER_LIST,
+            READ_ACL_LIST,
+            WRITE_ACL_LIST,
+            CANNED_ACL,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE
+    );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String sourceBucket = 
context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String sourceKey = 
context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String destinationBucket = 
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String destinationKey = 
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            final CopyObjectRequest request = new 
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
+            final AccessControlList acl = createACL(context, flowFile);
+            if (acl != null) {
+                request.setAccessControlList(acl);
+            }
+
+            final CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
+            if (cannedAccessControlList != null) {
+                request.setCannedAccessControlList(cannedAccessControlList);
+            }
+
+            s3.copyObject(request);
+            session.getProvenanceReporter().send(flowFile, 
getTransitUrl(destinationBucket, destinationKey));
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (final AmazonClientException e) {
+            flowFile = extractExceptionDetails(e, session, flowFile);
+            getLogger().error("Failed to copy S3 object from Bucket [{}] Key 
[{}]", sourceBucket, sourceKey, e);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+    private String getTransitUrl(final String destinationBucket, final String 
destinationKey) {
+        final String spacer = destinationKey.startsWith("/") ? "" : "/";
+        return String.format("s3://%s%s%s", destinationBucket, spacer, 
destinationKey);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 7e4694ed79..e885b6b56b 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -48,7 +48,7 @@ import static 
org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
         @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP 
error code (if available) from the failed operation"),
         @WritesAttribute(attribute = "s3.errorCode", description = "The S3 
moniker of the failed operation"),
         @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 
exception message from the failed operation")})
-@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
+@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class, 
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Deletes a file from an Amazon S3 Bucket. If attempting 
to delete a file that does not exist, FlowFile is routed to success.")
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 67fe18280b..05a5224fac 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -65,7 +65,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
 
 @SupportsBatching
-@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
 @CapabilityDescription("Retrieves the contents of an S3 Object and writes it 
to the content of a FlowFile")
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java
new file mode 100644
index 0000000000..9793e81afd
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+
+    static final AllowableValue TARGET_ATTRIBUTES = new 
AllowableValue("ATTRIBUTES", "Attributes", """
+            When selected, the metadata will be written to FlowFile attributes 
with the prefix "s3." following the convention used in other processors. For 
example:
+            the standard S3 attribute Content-Type will be written as 
s3.Content-Type when using the default value. User-defined metadata
+            will be included in the attributes added to the FlowFile
+            """
+    );
+
+    static final AllowableValue TARGET_FLOWFILE_BODY = new 
AllowableValue("FLOWFILE_BODY", "FlowFile Body", "Write the metadata to 
FlowFile content as JSON data.");
+
+    static final PropertyDescriptor METADATA_TARGET = new 
PropertyDescriptor.Builder()
+            .name("Metadata Target")
+            .description("This determines where the metadata will be written 
when found.")
+            .addValidator(Validator.VALID)
+            .required(true)
+            .allowableValues(TARGET_ATTRIBUTES, TARGET_FLOWFILE_BODY)
+            .defaultValue(TARGET_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor ATTRIBUTE_INCLUDE_PATTERN = new 
PropertyDescriptor.Builder()
+            .name("Metadata Attribute Include Pattern")
+            .description("""
+                    A regular expression pattern to use for determining which 
object metadata entries are included as FlowFile
+                    attributes. This pattern is only applied to the 'found' 
relationship and will not be used to
+                    filter the error attributes in the 'failure' relationship.
+                    """
+            )
+            .addValidator(Validator.VALID)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .defaultValue(".*")
+            .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTES)
+            .build();
+
+    private static final List<PropertyDescriptor> properties = List.of(
+            METADATA_TARGET,
+            ATTRIBUTE_INCLUDE_PATTERN,
+            BUCKET_WITH_DEFAULT_VALUE,
+            KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            READ_ACL_LIST,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE
+    );
+
+    static Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("An object was found in the bucket at the supplied 
key")
+            .build();
+
+    static Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("No object was found in the bucket the supplied key")
+            .build();
+
+    private static final Set<Relationship> relationships = Set.of(REL_FOUND, 
REL_NOT_FOUND,  REL_FAILURE);
+
+    private static final String ATTRIBUTE_FORMAT = "s3.%s";
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String bucket = 
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final Pattern attributePattern;
+
+        final PropertyValue attributeIncludePatternProperty = 
context.getProperty(ATTRIBUTE_INCLUDE_PATTERN).evaluateAttributeExpressions(flowFile);
+        if (attributeIncludePatternProperty.isSet()) {
+             attributePattern = 
Pattern.compile(attributeIncludePatternProperty.getValue());
+        } else {
+            attributePattern = null;
+        }
+
+        final String metadataTarget = 
context.getProperty(METADATA_TARGET).getValue();
+
+        try {
+            Relationship relationship;
+
+            try {
+                final ObjectMetadata objectMetadata = 
s3.getObjectMetadata(bucket, key);
+                final Map<String, Object> combinedMetadata = new 
LinkedHashMap<>(objectMetadata.getRawMetadata());
+                combinedMetadata.putAll(objectMetadata.getUserMetadata());
+
+                if (TARGET_ATTRIBUTES.getValue().equals(metadataTarget)) {
+                    final Map<String, String> newAttributes = combinedMetadata
+                            .entrySet().stream()
+                            .filter(e -> {
+                                if (attributePattern == null) {
+                                    return true;
+                                } else {
+                                    return 
attributePattern.matcher(e.getKey()).find();
+                                }
+                            })
+                            .collect(Collectors.toMap(e -> 
ATTRIBUTE_FORMAT.formatted(e.getKey()), e -> {
+                                final Object value = e.getValue();
+                                final String attributeValue;
+                                if (value instanceof Date dateValue) {
+                                    attributeValue = 
Long.toString(dateValue.getTime());
+                                } else {
+                                    attributeValue = value.toString();
+                                }
+                                return attributeValue;
+                            }));
+
+                    flowFile = session.putAllAttributes(flowFile, 
newAttributes);
+                } else if 
(TARGET_FLOWFILE_BODY.getValue().equals(metadataTarget)) {
+                    flowFile = session.write(flowFile, outputStream -> 
MAPPER.writeValue(outputStream, combinedMetadata));
+                }
+
+                relationship = REL_FOUND;
+            } catch (final AmazonS3Exception e) {
+                if (e.getStatusCode() == 404) {
+                    relationship = REL_NOT_FOUND;
+                    flowFile = extractExceptionDetails(e, session, flowFile);
+                } else {
+                    throw e;
+                }
+            }
+
+            session.transfer(flowFile, relationship);
+        } catch (final AmazonClientException e) {
+            getLogger().error("Failed to get S3 Object Metadata from Bucket 
[{}] Key [{}]", bucket, key, e);
+            flowFile = extractExceptionDetails(e, session, flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index b703b14c26..938a3fc05e 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -106,7 +106,7 @@ import static 
org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"Amazon", "S3", "AWS", "list"})
-@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
+@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class, 
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
 @CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For 
each object that is listed, creates a FlowFile that represents "
         + "the object so that it can be fetched in conjunction with 
FetchS3Object. This Processor is designed to run on Primary Node only "
         + "in a cluster. If the primary node changes, the new Primary Node 
will pick up where the previous node left off without duplicating "
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index baeee681c1..cd695b4f06 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -95,7 +95,7 @@ import static 
org.apache.nifi.processors.transfer.ResourceTransferProperties.RES
 import static 
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
 
 @SupportsBatching
-@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
+@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class, 
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
 @CapabilityDescription("Writes the contents of a FlowFile as an S3 Object to 
an Amazon S3 Bucket.")
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
index d62716a465..2de04ef9a8 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -59,7 +59,7 @@ import static 
org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
         @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP 
error code (if available) from the failed operation"),
         @WritesAttribute(attribute = "s3.errorCode", description = "The S3 
moniker of the failed operation"),
         @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 
exception message from the failed operation")})
-@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
+@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class, 
CopyS3Object.class, GetS3ObjectMetadata.class, DeleteS3Object.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Adds or updates a tag on an Amazon S3 Object.")
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a56392c9ae..050fbe1843 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,6 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+org.apache.nifi.processors.aws.s3.CopyS3Object
+org.apache.nifi.processors.aws.s3.GetS3ObjectMetadata
 org.apache.nifi.processors.aws.s3.FetchS3Object
 org.apache.nifi.processors.aws.s3.PutS3Object
 org.apache.nifi.processors.aws.s3.DeleteS3Object
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
new file mode 100644
index 0000000000..d704d40ec4
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCopyS3Object {
+    private TestRunner runner;
+
+    private AmazonS3Client mockS3Client;
+
+    @BeforeEach
+    void setUp() {
+        mockS3Client = mock(AmazonS3Client.class);
+        final CopyS3Object mockCopyS3Object = new CopyS3Object() {
+            @Override
+            protected AmazonS3Client createClient(final ProcessContext 
context, final AWSCredentialsProvider credentialsProvider, final Region region, 
final ClientConfiguration config,
+                                                  final 
AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
+                return mockS3Client;
+            }
+        };
+        runner = TestRunners.newTestRunner(mockCopyS3Object);
+        AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
+    }
+
+    @DisplayName("Test a normal run that SHOULD succeed")
+    @Test
+    void testRun() {
+        runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun());
+        runner.run();
+
+        runner.assertTransferCount(CopyS3Object.REL_SUCCESS, 1);
+
+        verify(mockS3Client, times(1))
+                .copyObject(any(CopyObjectRequest.class));
+
+        final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+    }
+
+    @DisplayName("Validate that S3 errors cleanly route to failure")
+    @Test
+    void testS3ErrorHandling() {
+        final AmazonS3Exception exception = new AmazonS3Exception("Manually 
triggered error");
+        exception.setStatusCode(503);
+        when(mockS3Client.copyObject(any(CopyObjectRequest.class)))
+                .thenThrow(exception);
+
+        runner.enqueue(new byte[]{}, setupRun());
+        runner.run();
+
+        runner.assertTransferCount(CopyS3Object.REL_FAILURE, 1);
+    }
+
+    private Map<String, String> setupRun() {
+        runner.setProperty(CopyS3Object.SOURCE_BUCKET, "${s3.bucket.source}");
+        runner.setProperty(CopyS3Object.SOURCE_KEY, "${s3.key.source}");
+        runner.setProperty(CopyS3Object.DESTINATION_BUCKET, 
"${s3.bucket.target}");
+        runner.setProperty(CopyS3Object.DESTINATION_KEY, "${s3.key.target}");
+
+        return Map.of(
+                "s3.bucket.source", "dev-bucket",
+                "s3.key.source", "/test.txt",
+                "s3.bucket.target", "staging-bucket",
+                "s3.key.target", "/copied.txt"
+        );
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java
new file mode 100644
index 0000000000..7346b06df2
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestGetS3ObjectMetadata {
+    private TestRunner runner;
+
+    private AmazonS3Client mockS3Client;
+
+    private ObjectMetadata mockMetadata;
+
+    @BeforeEach
+    void setUp() {
+        mockS3Client = mock(AmazonS3Client.class);
+        GetS3ObjectMetadata mockGetS3ObjectMetadata = new 
GetS3ObjectMetadata() {
+            @Override
+            protected AmazonS3Client createClient(final ProcessContext 
context, final AWSCredentialsProvider credentialsProvider, final Region region, 
final ClientConfiguration config,
+                                                  final 
AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
+                return mockS3Client;
+            }
+        };
+        runner = TestRunners.newTestRunner(mockGetS3ObjectMetadata);
+        AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
+
+        mockMetadata = mock(ObjectMetadata.class);
+        Map<String, String> user = Map.of("x", "y");
+        Map<String, Object> raw = Map.of("a", "b");
+        when(mockMetadata.getUserMetadata()).thenReturn(user);
+        when(mockMetadata.getRawMetadata()).thenReturn(raw);
+    }
+
+    private void run() {
+        runner.setProperty(GetS3ObjectMetadata.BUCKET_WITH_DEFAULT_VALUE, 
"${s3.bucket}");
+        runner.setProperty(GetS3ObjectMetadata.KEY, "${filename}");
+        runner.enqueue("", Map.of("s3.bucket", "test-data", "filename", 
"test.txt"));
+
+        runner.run();
+    }
+
+    private Map<String, Object> setupObjectMetadata() {
+        Map<String, Object> rawMetadata = new HashMap<>();
+        rawMetadata.put("raw1", "x");
+        rawMetadata.put("raw2", "y");
+        Map<String, String> userMetadata = new HashMap<>();
+        userMetadata.put("user1", "a");
+        userMetadata.put("user2", "b");
+        userMetadata.put("mighthaveto", "excludemelater");
+        Map<String, Object> combined = new HashMap<>(rawMetadata);
+        combined.putAll(userMetadata);
+
+        when(mockMetadata.getRawMetadata()).thenReturn(rawMetadata);
+        when(mockMetadata.getUserMetadata()).thenReturn(userMetadata);
+
+        when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+                .thenReturn(mockMetadata);
+
+        return combined;
+    }
+
+    @DisplayName("Validate fetch metadata to attribute routes to found when 
the file exists")
+    @Test
+    void testFetchMetadataToAttributeExists() {
+        runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, 
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+        runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN, "");
+
+        Map<String, Object> combined = setupObjectMetadata();
+
+        run();
+        runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst();
+        combined.forEach((k, v) -> {
+            String key = String.format("s3.%s", k);
+            String val = flowFile.getAttribute(key);
+            assertEquals(v, val);
+        });
+    }
+
+    @DisplayName("Validate attribution exclusion")
+    @Test
+    void testFetchMetadataToAttributeExclusion() {
+        runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, 
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+        runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN, 
"(raw|user)");
+
+        Map<String, Object> metadata = setupObjectMetadata();
+        Map<String, Object> musthave = new HashMap<>(metadata);
+        musthave.remove("mighthaveto");
+
+        run();
+        runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
+
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst();
+        musthave.forEach((k, v) -> {
+            String key = String.format("s3.%s", k);
+            String val = flowFile.getAttribute(key);
+            assertEquals(v, val);
+        });
+
+        assertNull(flowFile.getAttribute("s3.mighthaveto"));
+    }
+
+    @DisplayName("Validate fetch to attribute mode routes to failure on S3 
error")
+    @Test
+    void testFetchMetadataToAttributeS3Error() {
+        AmazonS3Exception exception = new AmazonS3Exception("test");
+        exception.setStatusCode(501);
+
+        runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, 
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+        when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+                .thenThrow(exception);
+        run();
+        runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1);
+    }
+
+    @DisplayName("Validate fetch metadata to attribute routes to not-found 
when when the file doesn't exist")
+    @Test
+    void testFetchMetadataToAttributeNotExist() {
+        AmazonS3Exception exception = new AmazonS3Exception("test");
+        exception.setStatusCode(404);
+
+        runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, 
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+        when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+                .thenThrow(exception);
+        run();
+        runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1);
+    }
+
+    @DisplayName("Validate fetch metadata to FlowFile body routes to found 
when the file exists")
+    @Test
+    void testFetchMetadataToBodyExists() {
+        runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, 
GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
+        when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+                .thenReturn(mockMetadata);
+        run();
+        runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
+    }
+
+    @DisplayName("Validate fetch metadata to FlowFile body routes to not-found 
when when the file doesn't exist")
+    @Test
+    void testFetchMetadataToBodyNotExist() {
+        AmazonS3Exception exception = new AmazonS3Exception("test");
+        exception.setStatusCode(404);
+
+        runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, 
GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
+        when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+                .thenThrow(exception);
+        run();
+        runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1);
+    }
+
+    @DisplayName("Validate fetch to FlowFile body mode routes to failure on S3 
error")
+    @Test
+    void testFetchMetadataToBodyS3Error() {
+        AmazonS3Exception exception = new AmazonS3Exception("test");
+        exception.setStatusCode(501);
+
+        runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET, 
GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
+        when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+                .thenThrow(exception);
+        run();
+        runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1);
+    }
+}


Reply via email to