exceptionfactory commented on code in PR #8992:
URL: https://github.com/apache/nifi/pull/8992#discussion_r1679683656


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")

Review Comment:
   For new properties and new Processors, we should use the human-readable name 
as the property name, and avoid including the processor in all cases.
   ```suggestion
               .name("Source Bucket")
               .displayName("Source Bucket")
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+    public static final PropertyDescriptor TARGET_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("copy-s3-object-target-bucket")
+            .displayName("Target Bucket")
+            .description("The bucket that will receive the copy.")
+            .build();
+
+    public static final PropertyDescriptor SOURCE_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-source-key")
+            .displayName("Source Key")

Review Comment:
   ```suggestion
               .name("Source Key")
               .displayName("Source Key")
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+    public static final PropertyDescriptor TARGET_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("copy-s3-object-target-bucket")
+            .displayName("Target Bucket")
+            .description("The bucket that will receive the copy.")
+            .build();
+
+    public static final PropertyDescriptor SOURCE_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-source-key")
+            .displayName("Source Key")
+            .description("The source key in the source bucket")
+            .build();
+
+    public static final PropertyDescriptor TARGET_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-target-key")
+            .displayName("Target Key")
+            .description("The target key in the target bucket")
+            .defaultValue("")
+            .build();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            SOURCE_BUCKET,
+            SOURCE_KEY,
+            TARGET_BUCKET,
+            TARGET_KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            WRITE_USER_LIST,
+            READ_ACL_LIST,
+            WRITE_ACL_LIST,
+            CANNED_ACL,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String sourceBucket = 
context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String sourceKey = 
context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetBucket = 
context.getProperty(TARGET_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetKey = 
context.getProperty(TARGET_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            CopyObjectRequest request = new CopyObjectRequest(sourceBucket, 
sourceKey, targetBucket, targetKey);
+            AccessControlList acl = createACL(context, flowFile);
+            if (acl != null) {
+                request.setAccessControlList(acl);
+            }
+            CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
+
+            if (cannedAccessControlList != null) {
+                request.setCannedAccessControlList(cannedAccessControlList);
+            }
+
+            s3.copyObject(request);
+            session.getProvenanceReporter().send(flowFile, 
getTransitUrl(targetBucket, targetKey));
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (AmazonClientException ex) {
+            flowFile = extractExceptionDetails(ex, session, flowFile);
+            getLogger().error("Copy S3 Object Request failed with error:", ex);

Review Comment:
   ```suggestion
               getLogger().error("Failed to copy S3 object", e);
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +
+                    "attribute");
+    public static final AllowableValue MODE_ROUTER = new 
AllowableValue("router", "Router", "When selected," +
+            "this mode will skip writing the metadata and just send the 
flowfile to the found or not-found relationship. It should be used " +
+            "when the goal is to just route flowfiles based on whether or not 
a key is present in S3.");
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("Mode")
+            .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER)
+            .defaultValue(MODE_FETCH_METADATA.getValue())
+            .required(true)
+            .description("Configure the mode of operation for this processor")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final AllowableValue TARGET_ATTRIBUTE = new 
AllowableValue("attribute", "Attribute", "When " +
+            "selected, the metadata will be written to a user-supplied 
attribute");
+    public static final AllowableValue TARGET_FLOWFILE_BODY = new 
AllowableValue("flowfile-body", "Flowfile Body", "Write " +
+            "the metadata to the flowfile's body");
+
+    public static final PropertyDescriptor METADATA_TARGET = new 
PropertyDescriptor.Builder()
+            .name("Metadata Target")
+            .description("This determines where the metadata will be written 
when it is found.")
+            .addValidator(Validator.VALID)
+            .required(true)
+            .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY)
+            .defaultValue(TARGET_ATTRIBUTE)
+            .dependsOn(MODE, MODE_FETCH_METADATA)
+            .build();
+
+    public static final PropertyDescriptor METADATA_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Metadata Attribute")
+            .description("The attribute where the metadata will be written")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .defaultValue("s3.object.metadata")
+            .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE)
+            .required(true)
+            .build();
+
+    public static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            MODE,
+            METADATA_TARGET,
+            METADATA_ATTRIBUTE,
+            BUCKET_WITH_DEFAULT_VALUE,
+            KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            READ_ACL_LIST,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    public static Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("An object was found in the bucket at the supplied 
key")
+            .build();
+
+    public static Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("No object was found in the bucket the supplied key")
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean isRouter = 
context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue());
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String bucket = 
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            Relationship route;
+
+            try {
+                ObjectMetadata metadata = s3.getObjectMetadata(bucket, key);
+                Map<String, Object> combinedMetadata = new 
HashMap<>(metadata.getRawMetadata());
+                combinedMetadata.putAll(metadata.getUserMetadata());
+
+                String metadataJson = 
MAPPER.writeValueAsString(combinedMetadata);
+
+                if (!isRouter && 
context.getProperty(METADATA_TARGET).getValue().equals(TARGET_ATTRIBUTE.getValue()))
 {
+                    String attribute = 
context.getProperty(METADATA_ATTRIBUTE).getValue();
+                    flowFile = session.putAttribute(flowFile, attribute, 
metadataJson);
+                } else if (!isRouter && 
context.getProperty(METADATA_TARGET).getValue().equals(TARGET_FLOWFILE_BODY.getValue()))
 {
+                    flowFile = session.write(flowFile, os -> 
os.write(metadataJson.getBytes(StandardCharsets.UTF_8)));
+                }
+
+                route = REL_FOUND;
+            } catch (AmazonS3Exception e) {
+                if (e.getStatusCode() == 404) {
+                    route = REL_NOT_FOUND;
+                    flowFile = extractExceptionDetails(e, session, flowFile);
+                } else {
+                    throw e;
+                }
+            }
+
+            session.transfer(flowFile, route);
+        } catch (IOException | AmazonClientException ex) {

Review Comment:
   ```suggestion
           } catch (final IOException | AmazonClientException e) {
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCopyS3Object {
+    private TestRunner runner = null;
+    private CopyS3Object mockCopyS3Object = null;
+    private AmazonS3Client mockS3Client = null;

Review Comment:
   These `null` initial values are not necessary.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestCopyS3Object.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.aws.testutil.AuthUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestCopyS3Object {
+    private TestRunner runner = null;
+    private CopyS3Object mockCopyS3Object = null;
+    private AmazonS3Client mockS3Client = null;
+
+    @BeforeEach
+    public void setUp() {
+        mockS3Client = mock(AmazonS3Client.class);
+        mockCopyS3Object = new CopyS3Object() {
+            @Override
+            protected AmazonS3Client createClient(final ProcessContext 
context, final AWSCredentialsProvider credentialsProvider, final Region region, 
final ClientConfiguration config,
+                                                  final 
AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
+                return mockS3Client;
+            }
+        };
+        runner = TestRunners.newTestRunner(mockCopyS3Object);
+        AuthUtils.enableAccessKey(runner, "accessKeyId", "secretKey");
+    }
+
+    @DisplayName("Test a normal run that SHOULD succeed")
+    @Test
+    public void testRun() {
+        runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun());
+        runner.run();
+
+        runner.assertTransferCount(CopyS3Object.REL_SUCCESS, 1);
+
+        verify(mockS3Client, times(1))
+                .copyObject(any(CopyObjectRequest.class));
+
+        var provenanceEvents = runner.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+    }
+
+    @DisplayName("Validate that S3 errors cleanly route to failure")
+    @Test
+    public void testS3ErrorHandling() {
+        var ex = new AmazonS3Exception("Manually triggered error");
+        ex.setStatusCode(503);
+        when(mockS3Client.copyObject(any(CopyObjectRequest.class)))
+                .thenThrow(ex);
+
+        runner.enqueue("".getBytes(StandardCharsets.UTF_8), setupRun());

Review Comment:
   ```suggestion
           runner.enqueue(new byte[]{}, setupRun());
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+    public static final PropertyDescriptor TARGET_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("copy-s3-object-target-bucket")
+            .displayName("Target Bucket")

Review Comment:
   Recommend using `Destination` instead of `Target`:
   ```suggestion
       public static final PropertyDescriptor DESTINATION_BUCKET = new 
PropertyDescriptor.Builder()
               .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
               .name("Destination Bucket")
               .displayName("Destination Bucket")
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+    public static final PropertyDescriptor TARGET_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("copy-s3-object-target-bucket")
+            .displayName("Target Bucket")
+            .description("The bucket that will receive the copy.")
+            .build();
+
+    public static final PropertyDescriptor SOURCE_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-source-key")
+            .displayName("Source Key")
+            .description("The source key in the source bucket")
+            .build();
+
+    public static final PropertyDescriptor TARGET_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-target-key")
+            .displayName("Target Key")
+            .description("The target key in the target bucket")
+            .defaultValue("")
+            .build();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            SOURCE_BUCKET,
+            SOURCE_KEY,
+            TARGET_BUCKET,
+            TARGET_KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            WRITE_USER_LIST,
+            READ_ACL_LIST,
+            WRITE_ACL_LIST,
+            CANNED_ACL,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String sourceBucket = 
context.getProperty(SOURCE_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String sourceKey = 
context.getProperty(SOURCE_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetBucket = 
context.getProperty(TARGET_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String targetKey = 
context.getProperty(TARGET_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            CopyObjectRequest request = new CopyObjectRequest(sourceBucket, 
sourceKey, targetBucket, targetKey);
+            AccessControlList acl = createACL(context, flowFile);
+            if (acl != null) {
+                request.setAccessControlList(acl);
+            }
+            CannedAccessControlList cannedAccessControlList = 
createCannedACL(context, flowFile);
+
+            if (cannedAccessControlList != null) {
+                request.setCannedAccessControlList(cannedAccessControlList);
+            }
+
+            s3.copyObject(request);
+            session.getProvenanceReporter().send(flowFile, 
getTransitUrl(targetBucket, targetKey));
+
+            session.transfer(flowFile, REL_SUCCESS);
+        } catch (AmazonClientException ex) {

Review Comment:
   ```suggestion
           } catch (final AmazonClientException e) {
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +
+                    "attribute");
+    public static final AllowableValue MODE_ROUTER = new 
AllowableValue("router", "Router", "When selected," +
+            "this mode will skip writing the metadata and just send the 
flowfile to the found or not-found relationship. It should be used " +
+            "when the goal is to just route flowfiles based on whether or not 
a key is present in S3.");

Review Comment:
   Recommend replacing these allowable values with an `enum` that implements 
`DescribedValue`.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/CopyS3Object.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import java.util.List;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Copy"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Copies a file from one bucket and key to another in 
AWS S3")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class CopyS3Object extends AbstractS3Processor {
+    public static final PropertyDescriptor SOURCE_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
+            .name("copy-s3-object-source-bucket")
+            .displayName("Source Bucket")
+            .description("The bucket that contains the file to be copied.")
+            .build();
+    public static final PropertyDescriptor TARGET_BUCKET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(BUCKET_WITHOUT_DEFAULT_VALUE)
+            .name("copy-s3-object-target-bucket")
+            .displayName("Target Bucket")
+            .description("The bucket that will receive the copy.")
+            .build();
+
+    public static final PropertyDescriptor SOURCE_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-source-key")
+            .displayName("Source Key")
+            .description("The source key in the source bucket")
+            .build();
+
+    public static final PropertyDescriptor TARGET_KEY = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(KEY)
+            .name("copy-s3-target-key")
+            .displayName("Target Key")

Review Comment:
   ```suggestion
       public static final PropertyDescriptor DESTINATION_KEY = new 
PropertyDescriptor.Builder()
               .fromPropertyDescriptor(KEY)
               .name("Destination Key")
               .displayName("Destination Key")
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +

Review Comment:
   The description should not mention whether it is the default as that fact is 
indicated through the PropertyDescriptor.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +
+                    "attribute");
+    public static final AllowableValue MODE_ROUTER = new 
AllowableValue("router", "Router", "When selected," +
+            "this mode will skip writing the metadata and just send the 
flowfile to the found or not-found relationship. It should be used " +
+            "when the goal is to just route flowfiles based on whether or not 
a key is present in S3.");
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()

Review Comment:
   As noted in the implementation details, rather than this type of `Mode` 
property, I recommend a `Metadata Output Strategy` property.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +
+                    "attribute");
+    public static final AllowableValue MODE_ROUTER = new 
AllowableValue("router", "Router", "When selected," +
+            "this mode will skip writing the metadata and just send the 
flowfile to the found or not-found relationship. It should be used " +
+            "when the goal is to just route flowfiles based on whether or not 
a key is present in S3.");
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("Mode")
+            .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER)
+            .defaultValue(MODE_FETCH_METADATA.getValue())
+            .required(true)
+            .description("Configure the mode of operation for this processor")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final AllowableValue TARGET_ATTRIBUTE = new 
AllowableValue("attribute", "Attribute", "When " +
+            "selected, the metadata will be written to a user-supplied 
attribute");
+    public static final AllowableValue TARGET_FLOWFILE_BODY = new 
AllowableValue("flowfile-body", "Flowfile Body", "Write " +
+            "the metadata to the flowfile's body");
+
+    public static final PropertyDescriptor METADATA_TARGET = new 
PropertyDescriptor.Builder()
+            .name("Metadata Target")
+            .description("This determines where the metadata will be written 
when it is found.")
+            .addValidator(Validator.VALID)
+            .required(true)
+            .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY)
+            .defaultValue(TARGET_ATTRIBUTE)
+            .dependsOn(MODE, MODE_FETCH_METADATA)
+            .build();
+
+    public static final PropertyDescriptor METADATA_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Metadata Attribute")
+            .description("The attribute where the metadata will be written")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .defaultValue("s3.object.metadata")
+            .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE)
+            .required(true)
+            .build();
+
+    public static final ObjectMapper MAPPER = new ObjectMapper();

Review Comment:
   ```suggestion
       private static final ObjectMapper MAPPER = new ObjectMapper();
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +
+                    "attribute");
+    public static final AllowableValue MODE_ROUTER = new 
AllowableValue("router", "Router", "When selected," +
+            "this mode will skip writing the metadata and just send the 
flowfile to the found or not-found relationship. It should be used " +
+            "when the goal is to just route flowfiles based on whether or not 
a key is present in S3.");
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("Mode")
+            .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER)
+            .defaultValue(MODE_FETCH_METADATA.getValue())
+            .required(true)
+            .description("Configure the mode of operation for this processor")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final AllowableValue TARGET_ATTRIBUTE = new 
AllowableValue("attribute", "Attribute", "When " +
+            "selected, the metadata will be written to a user-supplied 
attribute");
+    public static final AllowableValue TARGET_FLOWFILE_BODY = new 
AllowableValue("flowfile-body", "Flowfile Body", "Write " +
+            "the metadata to the flowfile's body");
+
+    public static final PropertyDescriptor METADATA_TARGET = new 
PropertyDescriptor.Builder()
+            .name("Metadata Target")
+            .description("This determines where the metadata will be written 
when it is found.")
+            .addValidator(Validator.VALID)
+            .required(true)
+            .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY)
+            .defaultValue(TARGET_ATTRIBUTE)
+            .dependsOn(MODE, MODE_FETCH_METADATA)
+            .build();
+
+    public static final PropertyDescriptor METADATA_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Metadata Attribute")
+            .description("The attribute where the metadata will be written")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .defaultValue("s3.object.metadata")
+            .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE)
+            .required(true)
+            .build();
+
+    public static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            MODE,
+            METADATA_TARGET,
+            METADATA_ATTRIBUTE,
+            BUCKET_WITH_DEFAULT_VALUE,
+            KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            READ_ACL_LIST,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    public static Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("An object was found in the bucket at the supplied 
key")
+            .build();
+
+    public static Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("No object was found in the bucket the supplied key")
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean isRouter = 
context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue());
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String bucket = 
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            Relationship route;
+
+            try {
+                ObjectMetadata metadata = s3.getObjectMetadata(bucket, key);
+                Map<String, Object> combinedMetadata = new 
HashMap<>(metadata.getRawMetadata());
+                combinedMetadata.putAll(metadata.getUserMetadata());
+
+                String metadataJson = 
MAPPER.writeValueAsString(combinedMetadata);
+
+                if (!isRouter && 
context.getProperty(METADATA_TARGET).getValue().equals(TARGET_ATTRIBUTE.getValue()))
 {
+                    String attribute = 
context.getProperty(METADATA_ATTRIBUTE).getValue();
+                    flowFile = session.putAttribute(flowFile, attribute, 
metadataJson);
+                } else if (!isRouter && 
context.getProperty(METADATA_TARGET).getValue().equals(TARGET_FLOWFILE_BODY.getValue()))
 {
+                    flowFile = session.write(flowFile, os -> 
os.write(metadataJson.getBytes(StandardCharsets.UTF_8)));
+                }

Review Comment:
   Rather than having the `Mode` property, it seems like a more descriptive 
approach would be something like `Metadata Output Strategy` with either 
`Attributes` or `Content`.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +
+                    "attribute");
+    public static final AllowableValue MODE_ROUTER = new 
AllowableValue("router", "Router", "When selected," +
+            "this mode will skip writing the metadata and just send the 
flowfile to the found or not-found relationship. It should be used " +
+            "when the goal is to just route flowfiles based on whether or not 
a key is present in S3.");
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("Mode")
+            .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER)
+            .defaultValue(MODE_FETCH_METADATA.getValue())
+            .required(true)
+            .description("Configure the mode of operation for this processor")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final AllowableValue TARGET_ATTRIBUTE = new 
AllowableValue("attribute", "Attribute", "When " +
+            "selected, the metadata will be written to a user-supplied 
attribute");
+    public static final AllowableValue TARGET_FLOWFILE_BODY = new 
AllowableValue("flowfile-body", "Flowfile Body", "Write " +
+            "the metadata to the flowfile's body");
+
+    public static final PropertyDescriptor METADATA_TARGET = new 
PropertyDescriptor.Builder()
+            .name("Metadata Target")
+            .description("This determines where the metadata will be written 
when it is found.")
+            .addValidator(Validator.VALID)
+            .required(true)
+            .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY)
+            .defaultValue(TARGET_ATTRIBUTE)
+            .dependsOn(MODE, MODE_FETCH_METADATA)
+            .build();
+
+    public static final PropertyDescriptor METADATA_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Metadata Attribute")
+            .description("The attribute where the metadata will be written")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .defaultValue("s3.object.metadata")
+            .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE)
+            .required(true)
+            .build();
+
+    public static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            MODE,
+            METADATA_TARGET,
+            METADATA_ATTRIBUTE,
+            BUCKET_WITH_DEFAULT_VALUE,
+            KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            READ_ACL_LIST,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    public static Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("An object was found in the bucket at the supplied 
key")
+            .build();
+
+    public static Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("No object was found in the bucket the supplied key")
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean isRouter = 
context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue());
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String bucket = 
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            Relationship route;
+
+            try {
+                ObjectMetadata metadata = s3.getObjectMetadata(bucket, key);
+                Map<String, Object> combinedMetadata = new 
HashMap<>(metadata.getRawMetadata());
+                combinedMetadata.putAll(metadata.getUserMetadata());
+
+                String metadataJson = 
MAPPER.writeValueAsString(combinedMetadata);

Review Comment:
   Rather than turning the entire metadata map into a JSON string, what about 
setting each metadata key as the attribute key, and then just turning the 
attribute value into a String. We could consider an Attributes JSON strategy as 
well, but mapping the metadata directly to attributes has some benefits.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3ObjectMetadata.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
+
+@Tags({"Amazon", "S3", "AWS", "Archive", "Exists"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Check for the existence of a file in S3 without 
attempting to download it. This processor can be " +
+        "used as a router for work flows that need to check on a file in S3 
before proceeding with data processing")
+@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, 
TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
+public class GetS3ObjectMetadata extends AbstractS3Processor {
+    public static final AllowableValue MODE_FETCH_METADATA = new 
AllowableValue("fetch", "Fetch Metadata",
+            "This is the default mode. It will fetch the metadata and write it 
to either the flowfile body or an " +
+                    "attribute");
+    public static final AllowableValue MODE_ROUTER = new 
AllowableValue("router", "Router", "When selected," +
+            "this mode will skip writing the metadata and just send the 
flowfile to the found or not-found relationship. It should be used " +
+            "when the goal is to just route flowfiles based on whether or not 
a key is present in S3.");
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("Mode")
+            .allowableValues(MODE_FETCH_METADATA, MODE_ROUTER)
+            .defaultValue(MODE_FETCH_METADATA.getValue())
+            .required(true)
+            .description("Configure the mode of operation for this processor")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final AllowableValue TARGET_ATTRIBUTE = new 
AllowableValue("attribute", "Attribute", "When " +
+            "selected, the metadata will be written to a user-supplied 
attribute");
+    public static final AllowableValue TARGET_FLOWFILE_BODY = new 
AllowableValue("flowfile-body", "Flowfile Body", "Write " +
+            "the metadata to the flowfile's body");
+
+    public static final PropertyDescriptor METADATA_TARGET = new 
PropertyDescriptor.Builder()
+            .name("Metadata Target")
+            .description("This determines where the metadata will be written 
when it is found.")
+            .addValidator(Validator.VALID)
+            .required(true)
+            .allowableValues(TARGET_ATTRIBUTE, TARGET_FLOWFILE_BODY)
+            .defaultValue(TARGET_ATTRIBUTE)
+            .dependsOn(MODE, MODE_FETCH_METADATA)
+            .build();
+
+    public static final PropertyDescriptor METADATA_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+            .name("Metadata Attribute")
+            .description("The attribute where the metadata will be written")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .defaultValue("s3.object.metadata")
+            .dependsOn(METADATA_TARGET, TARGET_ATTRIBUTE)
+            .required(true)
+            .build();
+
+    public static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public static final List<PropertyDescriptor> properties = List.of(
+            MODE,
+            METADATA_TARGET,
+            METADATA_ATTRIBUTE,
+            BUCKET_WITH_DEFAULT_VALUE,
+            KEY,
+            AWS_CREDENTIALS_PROVIDER_SERVICE,
+            S3_REGION,
+            TIMEOUT,
+            FULL_CONTROL_USER_LIST,
+            READ_USER_LIST,
+            READ_ACL_LIST,
+            OWNER,
+            SSL_CONTEXT_SERVICE,
+            ENDPOINT_OVERRIDE,
+            SIGNER_OVERRIDE,
+            S3_CUSTOM_SIGNER_CLASS_NAME,
+            S3_CUSTOM_SIGNER_MODULE_LOCATION,
+            PROXY_CONFIGURATION_SERVICE);
+
+    public static Relationship REL_FOUND = new Relationship.Builder()
+            .name("found")
+            .description("An object was found in the bucket at the supplied 
key")
+            .build();
+
+    public static Relationship REL_NOT_FOUND = new Relationship.Builder()
+            .name("not found")
+            .description("No object was found in the bucket the supplied key")
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return new HashSet<>(List.of(REL_FOUND, REL_NOT_FOUND, REL_FAILURE));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        boolean isRouter = 
context.getProperty(MODE).getValue().equals(MODE_ROUTER.getValue());
+
+        final AmazonS3Client s3;
+        try {
+            s3 = getS3Client(context, flowFile.getAttributes());
+        } catch (Exception e) {
+            getLogger().error("Failed to initialize S3 client", e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final String bucket = 
context.getProperty(BUCKET_WITH_DEFAULT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            Relationship route;
+
+            try {
+                ObjectMetadata metadata = s3.getObjectMetadata(bucket, key);
+                Map<String, Object> combinedMetadata = new 
HashMap<>(metadata.getRawMetadata());
+                combinedMetadata.putAll(metadata.getUserMetadata());
+
+                String metadataJson = 
MAPPER.writeValueAsString(combinedMetadata);
+
+                if (!isRouter && 
context.getProperty(METADATA_TARGET).getValue().equals(TARGET_ATTRIBUTE.getValue()))
 {
+                    String attribute = 
context.getProperty(METADATA_ATTRIBUTE).getValue();
+                    flowFile = session.putAttribute(flowFile, attribute, 
metadataJson);
+                } else if (!isRouter && 
context.getProperty(METADATA_TARGET).getValue().equals(TARGET_FLOWFILE_BODY.getValue()))
 {
+                    flowFile = session.write(flowFile, os -> 
os.write(metadataJson.getBytes(StandardCharsets.UTF_8)));
+                }
+
+                route = REL_FOUND;
+            } catch (AmazonS3Exception e) {
+                if (e.getStatusCode() == 404) {
+                    route = REL_NOT_FOUND;
+                    flowFile = extractExceptionDetails(e, session, flowFile);
+                } else {
+                    throw e;
+                }
+            }
+
+            session.transfer(flowFile, route);
+        } catch (IOException | AmazonClientException ex) {
+            getLogger().error("There was a problem checking for " + 
String.format("s3://%s%s", bucket, key), ex);

Review Comment:
   Log statements should use placeholders instead of concatenation:
   ```suggestion
               getLogger().error("Failed to get S3 Object Metadata from 
s3://{}{} ", bucket, key, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to