exceptionfactory commented on code in PR #8992: URL: https://github.com/apache/nifi/pull/8992#discussion_r1679683656
########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("copy-s3-object-source-bucket") + .displayName("Source Bucket") Review Comment: For new properties and new Processors, we should use the human-readable name as the property name, and avoid including the processor in all cases. ```suggestion .name("Source Bucket") .displayName("Source Bucket") ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("copy-s3-object-source-bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor TARGET_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("copy-s3-object-target-bucket") + .displayName("Target Bucket") + .description("The bucket that will receive the copy.") + .build(); + + public static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("copy-s3-source-key") + .displayName("Source Key") Review Comment: ```suggestion .name("Source Key") .displayName("Source Key") ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("copy-s3-object-source-bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor TARGET_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("copy-s3-object-target-bucket") + .displayName("Target Bucket") + .description("The bucket that will receive the copy.") + .build(); + + public static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("copy-s3-source-key") + .displayName("Source Key") + .description("The source key in the source bucket") + .build(); + + public static final PropertyDescriptor TARGET_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("copy-s3-target-key") + .displayName("Target Key") + .description("The target key in the target bucket") + .defaultValue("") + .build(); + + public static final List<PropertyDescriptor> properties = List.of( + SOURCE_BUCKET, + SOURCE_KEY, + TARGET_BUCKET, + TARGET_KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + WRITE_USER_LIST, + READ_ACL_LIST, + WRITE_ACL_LIST, + CANNED_ACL, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String sourceBucket = context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String sourceKey = context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String targetBucket = context.getProperty(TARGET_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String targetKey = context.getProperty(TARGET_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, targetBucket, targetKey); + AccessControlList acl = createACL(context, flowFile); + if (acl != null) { + request.setAccessControlList(acl); + } + CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile); + + if (cannedAccessControlList != null) { + request.setCannedAccessControlList(cannedAccessControlList); + } + + s3.copyObject(request); + session.getProvenanceReporter().send(flowFile, getTransitUrl(targetBucket, targetKey)); + + session.transfer(flowFile, REL_SUCCESS); + } catch (AmazonClientException ex) { + flowFile = extractExceptionDetails(ex, session, flowFile); + getLogger().error("Copy S3 Object Request failed with error:", ex); Review Comment: ```suggestion getLogger().error("Failed to copy S3 object", e); ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + + "attribute"); + public static final AllowableValue MODE_ROUTER = new AllowableValue("router", "Router", "When selected," + + "this mode will skip writing the metadata and just send the flowfile to the found or not-found relationship. It should be used " + + "when the goal is to just route flowfiles based on whether or not a key is present in S3."); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER) + .defaultValue(MODE_FETCH_METADATA.getValue()) + .required(true) + .description("Configure the mode of operation for this processor") + .addValidator(Validator.VALID) + .build(); + + public static final AllowableValue TARGET_ATTRIBUTE = new AllowableValue("attribute", "Attribute", "When " + + "selected, the metadata will be written to a user-supplied attribute"); + public static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("flowfile-body", "Flowfile Body", "Write " + + "the metadata to the flowfile's body"); + + public static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when it is found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTE) + .dependsOn(MODE, MODE_FETCH_METADATA) + .build(); + + public static final PropertyDescriptor METADATA_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Metadata Attribute") + .description("The attribute where the metadata will be written") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("s3.object.metadata") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE) + .required(true) + .build(); + + public static final ObjectMapper MAPPER = new ObjectMapper(); + + public static final List<PropertyDescriptor> properties = List.of( + MODE, + METADATA_TARGET, + METADATA_ATTRIBUTE, + BUCKET_WITH_DEFAULT_VALUE, + KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + READ_ACL_LIST, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + public static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("An object was found in the bucket at the supplied key") + .build(); + + public static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No object was found in the bucket the supplied key") + .build(); + + @Override + public Set<Relationship> getRelationships() { + return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE)); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + boolean isRouter = context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue()); + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + Relationship route; + + try { + ObjectMetadata metadata = s3.getObjectMetadata(bucket, key); + Map<String, Object> combinedMetadata = new HashMap<>(metadata.getRawMetadata()); + combinedMetadata.putAll(metadata.getUserMetadata()); + + String metadataJson = MAPPER.writeValueAsString(combinedMetadata); + + if (!isRouter && context.getProperty(METADATA_TARGET).getValue().equals(TARGET_ATTRIBUTE.getValue())) { + String attribute = context.getProperty(METADATA_ATTRIBUTE).getValue(); + flowFile = session.putAttribute(flowFile, attribute, metadataJson); + } else if (!isRouter && context.getProperty(METADATA_TARGET).getValue().equals(TARGET_FLOWFILE_BODY.getValue())) { + flowFile = session.write(flowFile, os -> os.write(metadataJson.getBytes(StandardCharsets.UTF_8))); + } + + route = REL_FOUND; + } catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + route = REL_NOT_FOUND; + flowFile = extractExceptionDetails(e, session, flowFile); + } else { + throw e; + } + } + + session.transfer(flowFile, route); + } catch (IOException | AmazonClientException ex) { Review Comment: ```suggestion } catch (final IOException | AmazonClientException e) { ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestCopyS3Object { + private TestRunner runner = null; + private CopyS3Object mockCopyS3Object = null; + private AmazonS3Client mockS3Client = null; Review Comment: These `null` initial values are not necessary. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java: ########## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.testutil.AuthUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestCopyS3Object { + private TestRunner runner = null; + private CopyS3Object mockCopyS3Object = null; + private AmazonS3Client mockS3Client = null; + + @BeforeEach + public void setUp() { + mockS3Client = mock(AmazonS3Client.class); + mockCopyS3Object = new CopyS3Object() { + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config, + final AwsClientBuilder.EndpointConfiguration endpointConfiguration) { + return mockS3Client; + } + }; + runner = TestRunners.newTestRunner(mockCopyS3Object); + AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey"); + } + + @DisplayName("Test a normal run that SHOULD succeed") + @Test + public void testRun() { + runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun()); + runner.run(); + + runner.assertTransferCount(CopyS3Object.REL_SUCCESS, 1); + + verify(mockS3Client, times(1)) + .copyObject(any(CopyObjectRequest.class)); + + var provenanceEvents = runner.getProvenanceEvents(); + assertEquals(1, provenanceEvents.size()); + } + + @DisplayName("Validate that S3 errors cleanly route to failure") + @Test + public void testS3ErrorHandling() { + var ex = new AmazonS3Exception("Manually triggered error"); + ex.setStatusCode(503); + when(mockS3Client.copyObject(any(CopyObjectRequest.class))) + .thenThrow(ex); + + runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun()); Review Comment: ```suggestion runner.enqueue(new byte[]{}, setupRun()); ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("copy-s3-object-source-bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor TARGET_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("copy-s3-object-target-bucket") + .displayName("Target Bucket") Review Comment: Recommend using `Destination` instead of `Target`: ```suggestion public static final PropertyDescriptor DESTINATION_BUCKET = new PropertyDescriptor.Builder() .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) .name("Destination Bucket") .displayName("Destination Bucket") ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("copy-s3-object-source-bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor TARGET_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("copy-s3-object-target-bucket") + .displayName("Target Bucket") + .description("The bucket that will receive the copy.") + .build(); + + public static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("copy-s3-source-key") + .displayName("Source Key") + .description("The source key in the source bucket") + .build(); + + public static final PropertyDescriptor TARGET_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("copy-s3-target-key") + .displayName("Target Key") + .description("The target key in the target bucket") + .defaultValue("") + .build(); + + public static final List<PropertyDescriptor> properties = List.of( + SOURCE_BUCKET, + SOURCE_KEY, + TARGET_BUCKET, + TARGET_KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + WRITE_USER_LIST, + READ_ACL_LIST, + WRITE_ACL_LIST, + CANNED_ACL, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String sourceBucket = context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String sourceKey = context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String targetBucket = context.getProperty(TARGET_BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String targetKey = context.getProperty(TARGET_KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, targetBucket, targetKey); + AccessControlList acl = createACL(context, flowFile); + if (acl != null) { + request.setAccessControlList(acl); + } + CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile); + + if (cannedAccessControlList != null) { + request.setCannedAccessControlList(cannedAccessControlList); + } + + s3.copyObject(request); + session.getProvenanceReporter().send(flowFile, getTransitUrl(targetBucket, targetKey)); + + session.transfer(flowFile, REL_SUCCESS); + } catch (AmazonClientException ex) { Review Comment: ```suggestion } catch (final AmazonClientException e) { ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + + "attribute"); + public static final AllowableValue MODE_ROUTER = new AllowableValue("router", "Router", "When selected," + + "this mode will skip writing the metadata and just send the flowfile to the found or not-found relationship. It should be used " + + "when the goal is to just route flowfiles based on whether or not a key is present in S3."); Review Comment: Recommend replacing these allowable values with an `enum` that implements `DescribedValue`. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class CopyS3Object extends AbstractS3Processor { + public static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE) + .name("copy-s3-object-source-bucket") + .displayName("Source Bucket") + .description("The bucket that contains the file to be copied.") + .build(); + public static final PropertyDescriptor TARGET_BUCKET = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE) + .name("copy-s3-object-target-bucket") + .displayName("Target Bucket") + .description("The bucket that will receive the copy.") + .build(); + + public static final PropertyDescriptor SOURCE_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("copy-s3-source-key") + .displayName("Source Key") + .description("The source key in the source bucket") + .build(); + + public static final PropertyDescriptor TARGET_KEY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(KEY) + .name("copy-s3-target-key") + .displayName("Target Key") Review Comment: ```suggestion public static final PropertyDescriptor DESTINATION_KEY = new PropertyDescriptor.Builder() .fromPropertyDescriptor(KEY) .name("Destination Key") .displayName("Destination Key") ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + Review Comment: The description should not mention whether it is the default as that fact is indicated through the PropertyDescriptor. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + + "attribute"); + public static final AllowableValue MODE_ROUTER = new AllowableValue("router", "Router", "When selected," + + "this mode will skip writing the metadata and just send the flowfile to the found or not-found relationship. It should be used " + + "when the goal is to just route flowfiles based on whether or not a key is present in S3."); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() Review Comment: As noted in the implementation details, rather than this type of `Mode` property, I recommend a `Metadata Output Strategy` property. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + + "attribute"); + public static final AllowableValue MODE_ROUTER = new AllowableValue("router", "Router", "When selected," + + "this mode will skip writing the metadata and just send the flowfile to the found or not-found relationship. It should be used " + + "when the goal is to just route flowfiles based on whether or not a key is present in S3."); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER) + .defaultValue(MODE_FETCH_METADATA.getValue()) + .required(true) + .description("Configure the mode of operation for this processor") + .addValidator(Validator.VALID) + .build(); + + public static final AllowableValue TARGET_ATTRIBUTE = new AllowableValue("attribute", "Attribute", "When " + + "selected, the metadata will be written to a user-supplied attribute"); + public static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("flowfile-body", "Flowfile Body", "Write " + + "the metadata to the flowfile's body"); + + public static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when it is found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTE) + .dependsOn(MODE, MODE_FETCH_METADATA) + .build(); + + public static final PropertyDescriptor METADATA_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Metadata Attribute") + .description("The attribute where the metadata will be written") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("s3.object.metadata") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE) + .required(true) + .build(); + + public static final ObjectMapper MAPPER = new ObjectMapper(); Review Comment: ```suggestion private static final ObjectMapper MAPPER = new ObjectMapper(); ``` ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + + "attribute"); + public static final AllowableValue MODE_ROUTER = new AllowableValue("router", "Router", "When selected," + + "this mode will skip writing the metadata and just send the flowfile to the found or not-found relationship. It should be used " + + "when the goal is to just route flowfiles based on whether or not a key is present in S3."); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER) + .defaultValue(MODE_FETCH_METADATA.getValue()) + .required(true) + .description("Configure the mode of operation for this processor") + .addValidator(Validator.VALID) + .build(); + + public static final AllowableValue TARGET_ATTRIBUTE = new AllowableValue("attribute", "Attribute", "When " + + "selected, the metadata will be written to a user-supplied attribute"); + public static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("flowfile-body", "Flowfile Body", "Write " + + "the metadata to the flowfile's body"); + + public static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when it is found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTE) + .dependsOn(MODE, MODE_FETCH_METADATA) + .build(); + + public static final PropertyDescriptor METADATA_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Metadata Attribute") + .description("The attribute where the metadata will be written") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("s3.object.metadata") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE) + .required(true) + .build(); + + public static final ObjectMapper MAPPER = new ObjectMapper(); + + public static final List<PropertyDescriptor> properties = List.of( + MODE, + METADATA_TARGET, + METADATA_ATTRIBUTE, + BUCKET_WITH_DEFAULT_VALUE, + KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + READ_ACL_LIST, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + public static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("An object was found in the bucket at the supplied key") + .build(); + + public static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No object was found in the bucket the supplied key") + .build(); + + @Override + public Set<Relationship> getRelationships() { + return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE)); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + boolean isRouter = context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue()); + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + Relationship route; + + try { + ObjectMetadata metadata = s3.getObjectMetadata(bucket, key); + Map<String, Object> combinedMetadata = new HashMap<>(metadata.getRawMetadata()); + combinedMetadata.putAll(metadata.getUserMetadata()); + + String metadataJson = MAPPER.writeValueAsString(combinedMetadata); + + if (!isRouter && context.getProperty(METADATA_TARGET).getValue().equals(TARGET_ATTRIBUTE.getValue())) { + String attribute = context.getProperty(METADATA_ATTRIBUTE).getValue(); + flowFile = session.putAttribute(flowFile, attribute, metadataJson); + } else if (!isRouter && context.getProperty(METADATA_TARGET).getValue().equals(TARGET_FLOWFILE_BODY.getValue())) { + flowFile = session.write(flowFile, os -> os.write(metadataJson.getBytes(StandardCharsets.UTF_8))); + } Review Comment: Rather than having the `Mode` property, it seems like a more descriptive approach would be something like `Metadata Output Strategy` with either `Attributes` or `Content`. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + + "attribute"); + public static final AllowableValue MODE_ROUTER = new AllowableValue("router", "Router", "When selected," + + "this mode will skip writing the metadata and just send the flowfile to the found or not-found relationship. It should be used " + + "when the goal is to just route flowfiles based on whether or not a key is present in S3."); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER) + .defaultValue(MODE_FETCH_METADATA.getValue()) + .required(true) + .description("Configure the mode of operation for this processor") + .addValidator(Validator.VALID) + .build(); + + public static final AllowableValue TARGET_ATTRIBUTE = new AllowableValue("attribute", "Attribute", "When " + + "selected, the metadata will be written to a user-supplied attribute"); + public static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("flowfile-body", "Flowfile Body", "Write " + + "the metadata to the flowfile's body"); + + public static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when it is found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTE) + .dependsOn(MODE, MODE_FETCH_METADATA) + .build(); + + public static final PropertyDescriptor METADATA_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Metadata Attribute") + .description("The attribute where the metadata will be written") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("s3.object.metadata") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE) + .required(true) + .build(); + + public static final ObjectMapper MAPPER = new ObjectMapper(); + + public static final List<PropertyDescriptor> properties = List.of( + MODE, + METADATA_TARGET, + METADATA_ATTRIBUTE, + BUCKET_WITH_DEFAULT_VALUE, + KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + READ_ACL_LIST, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + public static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("An object was found in the bucket at the supplied key") + .build(); + + public static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No object was found in the bucket the supplied key") + .build(); + + @Override + public Set<Relationship> getRelationships() { + return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE)); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + boolean isRouter = context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue()); + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + Relationship route; + + try { + ObjectMetadata metadata = s3.getObjectMetadata(bucket, key); + Map<String, Object> combinedMetadata = new HashMap<>(metadata.getRawMetadata()); + combinedMetadata.putAll(metadata.getUserMetadata()); + + String metadataJson = MAPPER.writeValueAsString(combinedMetadata); Review Comment: Rather than turning the entire metadata map into a JSON string, what about setting each metadata key as the attribute key, and then just turning the attribute value into a String. We could consider an Attributes JSON strategy as well, but mapping the metadata directly to attributes has some benefits. ########## nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.aws.s3; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION; + +@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Check for the existence of a file in S3 without attempting to download it. This processor can be " + + "used as a router for work flows that need to check on a file in S3 before proceeding with data processing") +@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class}) +public class GetS3ObjectMetadata extends AbstractS3Processor { + public static final AllowableValue MODE_FETCH_METADATA = new AllowableValue("fetch", "Fetch Metadata", + "This is the default mode. It will fetch the metadata and write it to either the flowfile body or an " + + "attribute"); + public static final AllowableValue MODE_ROUTER = new AllowableValue("router", "Router", "When selected," + + "this mode will skip writing the metadata and just send the flowfile to the found or not-found relationship. It should be used " + + "when the goal is to just route flowfiles based on whether or not a key is present in S3."); + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER) + .defaultValue(MODE_FETCH_METADATA.getValue()) + .required(true) + .description("Configure the mode of operation for this processor") + .addValidator(Validator.VALID) + .build(); + + public static final AllowableValue TARGET_ATTRIBUTE = new AllowableValue("attribute", "Attribute", "When " + + "selected, the metadata will be written to a user-supplied attribute"); + public static final AllowableValue TARGET_FLOWFILE_BODY = new AllowableValue("flowfile-body", "Flowfile Body", "Write " + + "the metadata to the flowfile's body"); + + public static final PropertyDescriptor METADATA_TARGET = new PropertyDescriptor.Builder() + .name("Metadata Target") + .description("This determines where the metadata will be written when it is found.") + .addValidator(Validator.VALID) + .required(true) + .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY) + .defaultValue(TARGET_ATTRIBUTE) + .dependsOn(MODE, MODE_FETCH_METADATA) + .build(); + + public static final PropertyDescriptor METADATA_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Metadata Attribute") + .description("The attribute where the metadata will be written") + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .defaultValue("s3.object.metadata") + .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE) + .required(true) + .build(); + + public static final ObjectMapper MAPPER = new ObjectMapper(); + + public static final List<PropertyDescriptor> properties = List.of( + MODE, + METADATA_TARGET, + METADATA_ATTRIBUTE, + BUCKET_WITH_DEFAULT_VALUE, + KEY, + AWS_CREDENTIALS_PROVIDER_SERVICE, + S3_REGION, + TIMEOUT, + FULL_CONTROL_USER_LIST, + READ_USER_LIST, + READ_ACL_LIST, + OWNER, + SSL_CONTEXT_SERVICE, + ENDPOINT_OVERRIDE, + SIGNER_OVERRIDE, + S3_CUSTOM_SIGNER_CLASS_NAME, + S3_CUSTOM_SIGNER_MODULE_LOCATION, + PROXY_CONFIGURATION_SERVICE); + + public static Relationship REL_FOUND = new Relationship.Builder() + .name("found") + .description("An object was found in the bucket at the supplied key") + .build(); + + public static Relationship REL_NOT_FOUND = new Relationship.Builder() + .name("not found") + .description("No object was found in the bucket the supplied key") + .build(); + + @Override + public Set<Relationship> getRelationships() { + return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE)); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + boolean isRouter = context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue()); + + final AmazonS3Client s3; + try { + s3 = getS3Client(context, flowFile.getAttributes()); + } catch (Exception e) { + getLogger().error("Failed to initialize S3 client", e); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final String bucket = context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + try { + Relationship route; + + try { + ObjectMetadata metadata = s3.getObjectMetadata(bucket, key); + Map<String, Object> combinedMetadata = new HashMap<>(metadata.getRawMetadata()); + combinedMetadata.putAll(metadata.getUserMetadata()); + + String metadataJson = MAPPER.writeValueAsString(combinedMetadata); + + if (!isRouter && context.getProperty(METADATA_TARGET).getValue().equals(TARGET_ATTRIBUTE.getValue())) { + String attribute = context.getProperty(METADATA_ATTRIBUTE).getValue(); + flowFile = session.putAttribute(flowFile, attribute, metadataJson); + } else if (!isRouter && context.getProperty(METADATA_TARGET).getValue().equals(TARGET_FLOWFILE_BODY.getValue())) { + flowFile = session.write(flowFile, os -> os.write(metadataJson.getBytes(StandardCharsets.UTF_8))); + } + + route = REL_FOUND; + } catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) { + route = REL_NOT_FOUND; + flowFile = extractExceptionDetails(e, session, flowFile); + } else { + throw e; + } + } + + session.transfer(flowFile, route); + } catch (IOException | AmazonClientException ex) { + getLogger().error("There was a problem checking for " + String.format("s3://%s%s", bucket, key), ex); Review Comment: Log statements should use placeholders instead of concatenation: ```suggestion getLogger().error("Failed to get S3 Object Metadata from s3://{}{} ", bucket, key, e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
