exceptionfactory commented on code in PR #8992: URL: https://github.com/apache/nifi/pull/8992#discussion_r1715819161
########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("Source Bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor DESTINATION_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("Destination Bucket") + .displayName("Destination Bucket") + .description("The bucket that will receive the copy.") + .build(); + + public static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Source Key") + .displayName("Source Key") + .description("The source key in the source bucket") + .build(); + + public static final PropertyDescriptor DESTINATION_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Destination Key") + .displayName("Destination Key") + .description("The target key in the target bucket") + .defaultValue("") + .build(); + + public static final List<PropertyDescriptor> properties = List.of( + SOURCE_BUCKET, + SOURCE_KEY, + DESTINATION_BUCKET, + DESTINATION_KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + WRITE_USER_LIST, + READ_ACL_LIST, + WRITE_ACL_LIST, + CANNED_ACL, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String sourceBucket = context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String sourceKey = context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String targetBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String targetKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, targetBucket, targetKey); + AccessControlList acl = createACL(context, flowFile); + if (acl != null) { + request.setAccessControlList(acl); + } + CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile); + + if (cannedAccessControlList != null) { + request.setCannedAccessControlList(cannedAccessControlList); + } + + s3.copyObject(request); + session.getProvenanceReporter().send(flowFile, getTransitUrl(targetBucket, targetKey)); + + session.transfer(flowFile, REL_SUCCESS); + } catch (final AmazonClientException e) { + flowFile = extractExceptionDetails(e, session, flowFile); + getLogger().error("Failed to copy S3 object", e); Review Comment: Recommend including the source bucket and key: ```suggestion getLogger().error("Failed to copy S3 object from Bucket [{}] Key [{}]", sourceBucket, sourceKey, e); ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue TARGET_ATTRIBUTES = new AllowableValue("attributes", "Attributes", "When " + + "selected, the metadata will be written to FlowFile attributes that have a user-configured prefix. For example: " + + "the standard S3 attribute Content-Type will be written as s3.Content-Type when using the default value. User-defined metadata " + + "will be included in the attributes added to the FlowFile"); + public static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("flowfile-content", "FlowFile Body", "Write " + + "the metadata to the FlowFile's content as JSON data."); + + public static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when it is found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTES, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor METADATA_ATTRIBUTE_PREFIX = new PropertyDescriptor.Builder() + .name("Metadata Attribute Prefix") + .description("The prefix for FlowFile attributes generated from the S3 object metadata.") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("s3.") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor ATTRIBUTE_INCLUDE_PATTERN = new PropertyDescriptor.Builder() + .name("Metadata Attribute Include Pattern") + .description("A regex pattern to use for determining which object metadata entries are included as FlowFile " + + "attributes. This pattern is only applied to the 'found' relationship and will not be used to " + + "filter the error attributes in the 'failure' relationship.") + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .defaultValue(".*") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTES) + .build(); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public static final List<PropertyDescriptor> properties = List.of( + METADATA_TARGET, + METADATA_ATTRIBUTE_PREFIX, + ATTRIBUTE_INCLUDE_PATTERN, + BUCKET_WITH_DEFAULT_VALUE, + KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + READ_ACL_LIST, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + public static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("An object was found in the bucket at the supplied key") + .build(); + + public static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No object was found in the bucket the supplied key") + .build(); + + @Override + public Set<Relationship> getRelationships() { + return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE)); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final Pattern attributePattern; + if (context.getProperty(ATTRIBUTE_INCLUDE_PATTERN).evaluateAttributeExpressions(flowFile).isSet()) { + attributePattern = Pattern.compile(context.getProperty(ATTRIBUTE_INCLUDE_PATTERN) + .evaluateAttributeExpressions(flowFile).getValue()); + } else { + attributePattern = null; + } + + try { + Relationship relationship; + + try { + ObjectMetadata metadata = s3.getObjectMetadata(bucket, key); + Map<String, Object> combinedMetadata = new HashMap<>(metadata.getRawMetadata()); + combinedMetadata.putAll(metadata.getUserMetadata()); + + if (context.getProperty(METADATA_TARGET).getValue().equals(TARGET_ATTRIBUTES.getValue())) { + String attributePrefix = context.getProperty(METADATA_ATTRIBUTE_PREFIX).getValue(); + Map<String, String> newAttributes = combinedMetadata + .entrySet().stream() + .filter(e -> { + if (attributePattern == null) { + return true; + } else { + return attributePattern.matcher(e.getKey()) + .find(); + } + }) + .collect(Collectors.toMap(e -> attributePrefix + e.getKey(), e -> e.getValue().toString())); Review Comment: The `toString()` approach works for most cases, but it is not optimal for `Date` objects due to the behavior of `java.util.Date.toString()`. Recommend checking for `Date` instances and using the time in seconds. ```suggestion .collect(Collectors.toMap(e -> attributePrefix + e.getKey(), e -> { final Object value = e.getValue(); final String attributeValue; if (value instanceof Date dateValue) { attributeValue = Long.toString(dateValue.getTime()); } else { attributeValue = value.toString(); } return attributeValue; })); ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("Source Bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor DESTINATION_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("Destination Bucket") + .displayName("Destination Bucket") + .description("The bucket that will receive the copy.") + .build(); + + public static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Source Key") + .displayName("Source Key") + .description("The source key in the source bucket") + .build(); + + public static final PropertyDescriptor DESTINATION_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Destination Key") + .displayName("Destination Key") + .description("The target key in the target bucket") + .defaultValue("") + .build(); + + public static final List<PropertyDescriptor> properties = List.of( + SOURCE_BUCKET, + SOURCE_KEY, + DESTINATION_BUCKET, + DESTINATION_KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + WRITE_USER_LIST, + READ_ACL_LIST, + WRITE_ACL_LIST, + CANNED_ACL, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String sourceBucket = context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String sourceKey = context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String targetBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String targetKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue(); Review Comment: ```suggestion final String destinationBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String destinationKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue(); ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("Source Bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor DESTINATION_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("Destination Bucket") + .displayName("Destination Bucket") + .description("The bucket that will receive the copy.") + .build(); + + public static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Source Key") + .displayName("Source Key") + .description("The source key in the source bucket") + .build(); + + public static final PropertyDescriptor DESTINATION_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("Destination Key") + .displayName("Destination Key") + .description("The target key in the target bucket") + .defaultValue("") + .build(); + + public static final List<PropertyDescriptor> properties = List.of( + SOURCE_BUCKET, + SOURCE_KEY, + DESTINATION_BUCKET, + DESTINATION_KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + WRITE_USER_LIST, + READ_ACL_LIST, + WRITE_ACL_LIST, + CANNED_ACL, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String sourceBucket = context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String sourceKey = context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String targetBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String targetKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, targetBucket, targetKey); + AccessControlList acl = createACL(context, flowFile); + if (acl != null) { + request.setAccessControlList(acl); + } + CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile); + + if (cannedAccessControlList != null) { + request.setCannedAccessControlList(cannedAccessControlList); + } + + s3.copyObject(request); + session.getProvenanceReporter().send(flowFile, getTransitUrl(targetBucket, targetKey)); + + session.transfer(flowFile, REL_SUCCESS); + } catch (final AmazonClientException e) { + flowFile = extractExceptionDetails(e, session, flowFile); + getLogger().error("Failed to copy S3 object", e); + session.transfer(flowFile, REL_FAILURE); + } + } + + private String getTransitUrl(String targetBucket, String targetKey) { Review Comment: ```suggestion private String getTransitUrl(String destinationBucket, String destinationKey) { ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue TARGET_ATTRIBUTES = new AllowableValue("attributes", "Attributes", "When " + + "selected, the metadata will be written to FlowFile attributes that have a user-configured prefix. For example: " + + "the standard S3 attribute Content-Type will be written as s3.Content-Type when using the default value. User-defined metadata " + + "will be included in the attributes added to the FlowFile"); + public static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("flowfile-content", "FlowFile Body", "Write " + + "the metadata to the FlowFile's content as JSON data."); + + public static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when it is found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTES, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor METADATA_ATTRIBUTE_PREFIX = new PropertyDescriptor.Builder() + .name("Metadata Attribute Prefix") + .description("The prefix for FlowFile attributes generated from the S3 object metadata.") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("s3.") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTES) + .required(true) + .build(); Review Comment: Based on the fact that `ListS3` always uses the `s3.` prefix, this seems unnecessary as a configurable property. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
