This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 99f3717f13 NIFI-13430 Added CopyS3Object and GetS3ObjectMetadata
99f3717f13 is described below
commit 99f3717f13169ed9504c3c05650938940ee169d4
Author: Mike Thomsen <[email protected]>
AuthorDate: Thu Jun 20 09:32:29 2024 -0400
NIFI-13430 Added CopyS3Object and GetS3ObjectMetadata
This closes #8992
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/aws/s3/CopyS3Object.java | 149 ++++++++++++++
.../nifi/processors/aws/s3/DeleteS3Object.java | 2 +-
.../nifi/processors/aws/s3/FetchS3Object.java | 2 +-
.../processors/aws/s3/GetS3ObjectMetadata.java | 216 +++++++++++++++++++++
.../org/apache/nifi/processors/aws/s3/ListS3.java | 2 +-
.../apache/nifi/processors/aws/s3/PutS3Object.java | 2 +-
.../apache/nifi/processors/aws/s3/TagS3Object.java | 2 +-
.../services/org.apache.nifi.processor.Processor | 2 +
.../nifi/processors/aws/s3/TestCopyS3Object.java | 108 +++++++++++
.../processors/aws/s3/TestGetS3ObjectMetadata.java | 202 +++++++++++++++++++
10 files changed, 682 insertions(+), 5 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
new file mode 100644
index 0000000000..9f886bda50
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class,
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+
+ static final PropertyDescriptor SOURCE_BUCKET = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+ .name("Source Bucket")
+ .displayName("Source Bucket")
+ .description("The bucket that contains the file to be copied.")
+ .build();
+
+ static final PropertyDescriptor SOURCE_KEY = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(KEY)
+ .name("Source Key")
+ .displayName("Source Key")
+ .description("The source key in the source bucket")
+ .build();
+
+ static final PropertyDescriptor DESTINATION_BUCKET = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+ .name("Destination Bucket")
+ .displayName("Destination Bucket")
+ .description("The bucket that will receive the copy.")
+ .build();
+
+ static final PropertyDescriptor DESTINATION_KEY = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(KEY)
+ .name("Destination Key")
+ .displayName("Destination Key")
+ .description("The target key in the target bucket")
+ .defaultValue("${filename}-1")
+ .build();
+
+ static final List<PropertyDescriptor> properties = List.of(
+ SOURCE_BUCKET,
+ SOURCE_KEY,
+ DESTINATION_BUCKET,
+ DESTINATION_KEY,
+ AWS_CREDENTIALS_PROVIDER_SERVICE,
+ S3_REGION,
+ TIMEOUT,
+ FULL_CONTROL_USER_LIST,
+ READ_USER_LIST,
+ WRITE_USER_LIST,
+ READ_ACL_LIST,
+ WRITE_ACL_LIST,
+ CANNED_ACL,
+ OWNER,
+ SSL_CONTEXT_SERVICE,
+ ENDPOINT_OVERRIDE,
+ SIGNER_OVERRIDE,
+ S3_CUSTOM_SIGNER_CLASS_NAME,
+ S3_CUSTOM_SIGNER_MODULE_LOCATION,
+ PROXY_CONFIGURATION_SERVICE
+ );
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final AmazonS3Client s3;
+ try {
+ s3 = getS3Client(context, flowFile.getAttributes());
+ } catch (Exception e) {
+ getLogger().error("Failed to initialize S3 client", e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final String sourceBucket =
context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+ final String sourceKey =
context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final String destinationBucket =
context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+ final String destinationKey =
context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ final CopyObjectRequest request = new
CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
+ final AccessControlList acl = createACL(context, flowFile);
+ if (acl != null) {
+ request.setAccessControlList(acl);
+ }
+
+ final CannedAccessControlList cannedAccessControlList =
createCannedACL(context, flowFile);
+ if (cannedAccessControlList != null) {
+ request.setCannedAccessControlList(cannedAccessControlList);
+ }
+
+ s3.copyObject(request);
+ session.getProvenanceReporter().send(flowFile,
getTransitUrl(destinationBucket, destinationKey));
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final AmazonClientException e) {
+ flowFile = extractExceptionDetails(e, session, flowFile);
+ getLogger().error("Failed to copy S3 object from Bucket [{}] Key
[{}]", sourceBucket, sourceKey, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ private String getTransitUrl(final String destinationBucket, final String
destinationKey) {
+ final String spacer = destinationKey.startsWith("/") ? "" : "/";
+ return String.format("s3://%s%s%s", destinationBucket, spacer,
destinationKey);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 7e4694ed79..e885b6b56b 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -48,7 +48,7 @@ import static
org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP
error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3
moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3
exception message from the failed operation")})
-@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
+@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class,
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Deletes a file from an Amazon S3 Bucket. If attempting
to delete a file that does not exist, FlowFile is routed to success.")
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 67fe18280b..05a5224fac 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -65,7 +65,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@SupportsBatching
-@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class,
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it
to the content of a FlowFile")
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java
new file mode 100644
index 0000000000..9793e81afd
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without
attempting to download it. This processor can be " +
+ "used as a router for work flows that need to check on a file in S3
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class,
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+
+ static final AllowableValue TARGET_ATTRIBUTES = new
AllowableValue("ATTRIBUTES", "Attributes", """
+ When selected, the metadata will be written to FlowFile attributes
with the prefix "s3." following the convention used in other processors. For
example:
+ the standard S3 attribute Content-Type will be written as
s3.Content-Type when using the default value. User-defined metadata
+ will be included in the attributes added to the FlowFile
+ """
+ );
+
+ static final AllowableValue TARGET_FLOWFILE_BODY = new
AllowableValue("FLOWFILE_BODY", "FlowFile Body", "Write the metadata to
FlowFile content as JSON data.");
+
+ static final PropertyDescriptor METADATA_TARGET = new
PropertyDescriptor.Builder()
+ .name("Metadata Target")
+ .description("This determines where the metadata will be written
when found.")
+ .addValidator(Validator.VALID)
+ .required(true)
+ .allowableValues(TARGET_ATTRIBUTES, TARGET_FLOWFILE_BODY)
+ .defaultValue(TARGET_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor ATTRIBUTE_INCLUDE_PATTERN = new
PropertyDescriptor.Builder()
+ .name("Metadata Attribute Include Pattern")
+ .description("""
+ A regular expression pattern to use for determining which
object metadata entries are included as FlowFile
+ attributes. This pattern is only applied to the 'found'
relationship and will not be used to
+ filter the error attributes in the 'failure' relationship.
+ """
+ )
+ .addValidator(Validator.VALID)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .defaultValue(".*")
+ .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTES)
+ .build();
+
+ private static final List<PropertyDescriptor> properties = List.of(
+ METADATA_TARGET,
+ ATTRIBUTE_INCLUDE_PATTERN,
+ BUCKET_WITH_DEFAULT_VALUE,
+ KEY,
+ AWS_CREDENTIALS_PROVIDER_SERVICE,
+ S3_REGION,
+ TIMEOUT,
+ FULL_CONTROL_USER_LIST,
+ READ_USER_LIST,
+ READ_ACL_LIST,
+ OWNER,
+ SSL_CONTEXT_SERVICE,
+ ENDPOINT_OVERRIDE,
+ SIGNER_OVERRIDE,
+ S3_CUSTOM_SIGNER_CLASS_NAME,
+ S3_CUSTOM_SIGNER_MODULE_LOCATION,
+ PROXY_CONFIGURATION_SERVICE
+ );
+
+ static Relationship REL_FOUND = new Relationship.Builder()
+ .name("found")
+ .description("An object was found in the bucket at the supplied
key")
+ .build();
+
+ static Relationship REL_NOT_FOUND = new Relationship.Builder()
+ .name("not found")
+ .description("No object was found in the bucket the supplied key")
+ .build();
+
+ private static final Set<Relationship> relationships = Set.of(REL_FOUND,
REL_NOT_FOUND, REL_FAILURE);
+
+ private static final String ATTRIBUTE_FORMAT = "s3.%s";
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final AmazonS3Client s3;
+ try {
+ s3 = getS3Client(context, flowFile.getAttributes());
+ } catch (Exception e) {
+ getLogger().error("Failed to initialize S3 client", e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final String bucket =
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+ final String key =
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final Pattern attributePattern;
+
+ final PropertyValue attributeIncludePatternProperty =
context.getProperty(ATTRIBUTE_INCLUDE_PATTERN).evaluateAttributeExpressions(flowFile);
+ if (attributeIncludePatternProperty.isSet()) {
+ attributePattern =
Pattern.compile(attributeIncludePatternProperty.getValue());
+ } else {
+ attributePattern = null;
+ }
+
+ final String metadataTarget =
context.getProperty(METADATA_TARGET).getValue();
+
+ try {
+ Relationship relationship;
+
+ try {
+ final ObjectMetadata objectMetadata =
s3.getObjectMetadata(bucket, key);
+ final Map<String, Object> combinedMetadata = new
LinkedHashMap<>(objectMetadata.getRawMetadata());
+ combinedMetadata.putAll(objectMetadata.getUserMetadata());
+
+ if (TARGET_ATTRIBUTES.getValue().equals(metadataTarget)) {
+ final Map<String, String> newAttributes = combinedMetadata
+ .entrySet().stream()
+ .filter(e -> {
+ if (attributePattern == null) {
+ return true;
+ } else {
+ return
attributePattern.matcher(e.getKey()).find();
+ }
+ })
+ .collect(Collectors.toMap(e ->
ATTRIBUTE_FORMAT.formatted(e.getKey()), e -> {
+ final Object value = e.getValue();
+ final String attributeValue;
+ if (value instanceof Date dateValue) {
+ attributeValue =
Long.toString(dateValue.getTime());
+ } else {
+ attributeValue = value.toString();
+ }
+ return attributeValue;
+ }));
+
+ flowFile = session.putAllAttributes(flowFile,
newAttributes);
+ } else if
(TARGET_FLOWFILE_BODY.getValue().equals(metadataTarget)) {
+ flowFile = session.write(flowFile, outputStream ->
MAPPER.writeValue(outputStream, combinedMetadata));
+ }
+
+ relationship = REL_FOUND;
+ } catch (final AmazonS3Exception e) {
+ if (e.getStatusCode() == 404) {
+ relationship = REL_NOT_FOUND;
+ flowFile = extractExceptionDetails(e, session, flowFile);
+ } else {
+ throw e;
+ }
+ }
+
+ session.transfer(flowFile, relationship);
+ } catch (final AmazonClientException e) {
+ getLogger().error("Failed to get S3 Object Metadata from Bucket
[{}] Key [{}]", bucket, key, e);
+ flowFile = extractExceptionDetails(e, session, flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index b703b14c26..938a3fc05e 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -106,7 +106,7 @@ import static
org.apache.nifi.processors.aws.util.RegionUtilV1.REGION;
@TriggerWhenEmpty
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"Amazon", "S3", "AWS", "list"})
-@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
+@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class,
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@CapabilityDescription("Retrieves a listing of objects from an S3 bucket. For
each object that is listed, creates a FlowFile that represents "
+ "the object so that it can be fetched in conjunction with
FetchS3Object. This Processor is designed to run on Primary Node only "
+ "in a cluster. If the primary node changes, the new Primary Node
will pick up where the previous node left off without duplicating "
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index baeee681c1..cd695b4f06 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -95,7 +95,7 @@ import static
org.apache.nifi.processors.transfer.ResourceTransferProperties.RES
import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource;
@SupportsBatching
-@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class})
+@SeeAlso({FetchS3Object.class, DeleteS3Object.class, ListS3.class,
CopyS3Object.class, GetS3ObjectMetadata.class, TagS3Object.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
@CapabilityDescription("Writes the contents of a FlowFile as an S3 Object to
an Amazon S3 Bucket.")
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
index d62716a465..2de04ef9a8 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -59,7 +59,7 @@ import static
org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@WritesAttribute(attribute = "s3.statusCode", description = "The HTTP
error code (if available) from the failed operation"),
@WritesAttribute(attribute = "s3.errorCode", description = "The S3
moniker of the failed operation"),
@WritesAttribute(attribute = "s3.errorMessage", description = "The S3
exception message from the failed operation")})
-@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
+@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class,
CopyS3Object.class, GetS3ObjectMetadata.class, DeleteS3Object.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Adds or updates a tag on an Amazon S3 Object.")
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a56392c9ae..050fbe1843 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.nifi.processors.aws.s3.CopyS3Object
+org.apache.nifi.processors.aws.s3.GetS3ObjectMetadata
org.apache.nifi.processors.aws.s3.FetchS3Object
org.apache.nifi.processors.aws.s3.PutS3Object
org.apache.nifi.processors.aws.s3.DeleteS3Object
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
new file mode 100644
index 0000000000..d704d40ec4
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCopyS3Object {
+ private TestRunner runner;
+
+ private AmazonS3Client mockS3Client;
+
+ @BeforeEach
+ void setUp() {
+ mockS3Client = mock(AmazonS3Client.class);
+ final CopyS3Object mockCopyS3Object = new CopyS3Object() {
+ @Override
+ protected AmazonS3Client createClient(final ProcessContext
context, final AWSCredentialsProvider credentialsProvider, final Region region,
final ClientConfiguration config,
+ final
AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
+ return mockS3Client;
+ }
+ };
+ runner = TestRunners.newTestRunner(mockCopyS3Object);
+ AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
+ }
+
+ @DisplayName("Test a normal run that SHOULD succeed")
+ @Test
+ void testRun() {
+ runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun());
+ runner.run();
+
+ runner.assertTransferCount(CopyS3Object.REL_SUCCESS, 1);
+
+ verify(mockS3Client, times(1))
+ .copyObject(any(CopyObjectRequest.class));
+
+ final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ }
+
+ @DisplayName("Validate that S3 errors cleanly route to failure")
+ @Test
+ void testS3ErrorHandling() {
+ final AmazonS3Exception exception = new AmazonS3Exception("Manually
triggered error");
+ exception.setStatusCode(503);
+ when(mockS3Client.copyObject(any(CopyObjectRequest.class)))
+ .thenThrow(exception);
+
+ runner.enqueue(new byte[]{}, setupRun());
+ runner.run();
+
+ runner.assertTransferCount(CopyS3Object.REL_FAILURE, 1);
+ }
+
+ private Map<String, String> setupRun() {
+ runner.setProperty(CopyS3Object.SOURCE_BUCKET, "${s3.bucket.source}");
+ runner.setProperty(CopyS3Object.SOURCE_KEY, "${s3.key.source}");
+ runner.setProperty(CopyS3Object.DESTINATION_BUCKET,
"${s3.bucket.target}");
+ runner.setProperty(CopyS3Object.DESTINATION_KEY, "${s3.key.target}");
+
+ return Map.of(
+ "s3.bucket.source", "dev-bucket",
+ "s3.key.source", "/test.txt",
+ "s3.bucket.target", "staging-bucket",
+ "s3.key.target", "/copied.txt"
+ );
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java
new file mode 100644
index 0000000000..7346b06df2
--- /dev/null
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3ObjectMetadata.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestGetS3ObjectMetadata {
+ private TestRunner runner;
+
+ private AmazonS3Client mockS3Client;
+
+ private ObjectMetadata mockMetadata;
+
+ @BeforeEach
+ void setUp() {
+ mockS3Client = mock(AmazonS3Client.class);
+ GetS3ObjectMetadata mockGetS3ObjectMetadata = new
GetS3ObjectMetadata() {
+ @Override
+ protected AmazonS3Client createClient(final ProcessContext
context, final AWSCredentialsProvider credentialsProvider, final Region region,
final ClientConfiguration config,
+ final
AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
+ return mockS3Client;
+ }
+ };
+ runner = TestRunners.newTestRunner(mockGetS3ObjectMetadata);
+ AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
+
+ mockMetadata = mock(ObjectMetadata.class);
+ Map<String, String> user = Map.of("x", "y");
+ Map<String, Object> raw = Map.of("a", "b");
+ when(mockMetadata.getUserMetadata()).thenReturn(user);
+ when(mockMetadata.getRawMetadata()).thenReturn(raw);
+ }
+
+ private void run() {
+ runner.setProperty(GetS3ObjectMetadata.BUCKET_WITH_DEFAULT_VALUE,
"${s3.bucket}");
+ runner.setProperty(GetS3ObjectMetadata.KEY, "${filename}");
+ runner.enqueue("", Map.of("s3.bucket", "test-data", "filename",
"test.txt"));
+
+ runner.run();
+ }
+
+ private Map<String, Object> setupObjectMetadata() {
+ Map<String, Object> rawMetadata = new HashMap<>();
+ rawMetadata.put("raw1", "x");
+ rawMetadata.put("raw2", "y");
+ Map<String, String> userMetadata = new HashMap<>();
+ userMetadata.put("user1", "a");
+ userMetadata.put("user2", "b");
+ userMetadata.put("mighthaveto", "excludemelater");
+ Map<String, Object> combined = new HashMap<>(rawMetadata);
+ combined.putAll(userMetadata);
+
+ when(mockMetadata.getRawMetadata()).thenReturn(rawMetadata);
+ when(mockMetadata.getUserMetadata()).thenReturn(userMetadata);
+
+ when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+ .thenReturn(mockMetadata);
+
+ return combined;
+ }
+
+ @DisplayName("Validate fetch metadata to attribute routes to found when
the file exists")
+ @Test
+ void testFetchMetadataToAttributeExists() {
+ runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET,
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+ runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN, "");
+
+ Map<String, Object> combined = setupObjectMetadata();
+
+ run();
+ runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst();
+ combined.forEach((k, v) -> {
+ String key = String.format("s3.%s", k);
+ String val = flowFile.getAttribute(key);
+ assertEquals(v, val);
+ });
+ }
+
+ @DisplayName("Validate attribution exclusion")
+ @Test
+ void testFetchMetadataToAttributeExclusion() {
+ runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET,
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+ runner.setProperty(GetS3ObjectMetadata.ATTRIBUTE_INCLUDE_PATTERN,
"(raw|user)");
+
+ Map<String, Object> metadata = setupObjectMetadata();
+ Map<String, Object> musthave = new HashMap<>(metadata);
+ musthave.remove("mighthaveto");
+
+ run();
+ runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(GetS3ObjectMetadata.REL_FOUND).getFirst();
+ musthave.forEach((k, v) -> {
+ String key = String.format("s3.%s", k);
+ String val = flowFile.getAttribute(key);
+ assertEquals(v, val);
+ });
+
+ assertNull(flowFile.getAttribute("s3.mighthaveto"));
+ }
+
+ @DisplayName("Validate fetch to attribute mode routes to failure on S3
error")
+ @Test
+ void testFetchMetadataToAttributeS3Error() {
+ AmazonS3Exception exception = new AmazonS3Exception("test");
+ exception.setStatusCode(501);
+
+ runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET,
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+ when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+ .thenThrow(exception);
+ run();
+ runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1);
+ }
+
+ @DisplayName("Validate fetch metadata to attribute routes to not-found
when when the file doesn't exist")
+ @Test
+ void testFetchMetadataToAttributeNotExist() {
+ AmazonS3Exception exception = new AmazonS3Exception("test");
+ exception.setStatusCode(404);
+
+ runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET,
GetS3ObjectMetadata.TARGET_ATTRIBUTES.getValue());
+ when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+ .thenThrow(exception);
+ run();
+ runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1);
+ }
+
+ @DisplayName("Validate fetch metadata to FlowFile body routes to found
when the file exists")
+ @Test
+ void testFetchMetadataToBodyExists() {
+ runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET,
GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
+ when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+ .thenReturn(mockMetadata);
+ run();
+ runner.assertTransferCount(GetS3ObjectMetadata.REL_FOUND, 1);
+ }
+
+ @DisplayName("Validate fetch metadata to FlowFile body routes to not-found
when when the file doesn't exist")
+ @Test
+ void testFetchMetadataToBodyNotExist() {
+ AmazonS3Exception exception = new AmazonS3Exception("test");
+ exception.setStatusCode(404);
+
+ runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET,
GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
+ when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+ .thenThrow(exception);
+ run();
+ runner.assertTransferCount(GetS3ObjectMetadata.REL_NOT_FOUND, 1);
+ }
+
+ @DisplayName("Validate fetch to FlowFile body mode routes to failure on S3
error")
+ @Test
+ void testFetchMetadataToBodyS3Error() {
+ AmazonS3Exception exception = new AmazonS3Exception("test");
+ exception.setStatusCode(501);
+
+ runner.setProperty(GetS3ObjectMetadata.METADATA_TARGET,
GetS3ObjectMetadata.TARGET_FLOWFILE_BODY.getValue());
+ when(mockS3Client.getObjectMetadata(anyString(), anyString()))
+ .thenThrow(exception);
+ run();
+ runner.assertTransferCount(GetS3ObjectMetadata.REL_FAILURE, 1);
+ }
+}