This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new a8e59e5  NIFI-5920: Tagging an object in S3
a8e59e5 is described below

commit a8e59e52af5144c6238230bd792d01ca6daafadf
Author: Stephen Goodman <[email protected]>
AuthorDate: Sat Dec 29 19:10:38 2018 -0500

    NIFI-5920: Tagging an object in S3
    
    Unit tests and functionality for tagging an object in S3.
    
    Set FlowFile attributes directly from tags retrieved from S3.
    
    Add guard clauses to ensure evaluated properties are not blank.
    
    This closes #3239.
    
    Signed-off-by: Koji Kawamura <[email protected]>
---
 .../apache/nifi/processors/aws/s3/TagS3Object.java | 201 +++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../nifi/processors/aws/s3/ITTagS3Object.java      | 150 ++++++++++
 .../nifi/processors/aws/s3/TestTagS3Object.java    | 319 +++++++++++++++++++++
 4 files changed, 671 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
new file mode 100644
index 0000000..6c9d72a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.ObjectTagging;
+import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.Tag;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@SupportsBatching
+@WritesAttributes({
+        @WritesAttribute(attribute = "s3.tag.___", description = "The tags 
associated with the S3 object will be " +
+                "written as part of the FlowFile attributes")})
+@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
+@Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Sets tags on a FlowFile within an Amazon S3 Bucket. " +
+        "If attempting to tag a file that does not exist, FlowFile is routed 
to success.")
+public class TagS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor TAG_KEY = new 
PropertyDescriptor.Builder()
+            .name("tag-key")
+            .displayName("Tag Key")
+            .description("The key of the tag that will be set on the S3 
Object")
+            .addValidator(new StandardValidators.StringLengthValidator(1, 127))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor TAG_VALUE = new 
PropertyDescriptor.Builder()
+            .name("tag-value")
+            .displayName("Tag Value")
+            .description("The value of the tag that will be set on the S3 
Object")
+            .addValidator(new StandardValidators.StringLengthValidator(1, 255))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor APPEND_TAG = new 
PropertyDescriptor.Builder()
+            .name("append-tag")
+            .displayName("Append Tag")
+            .description("If set to true, the tag will be appended to the 
existing set of tags on the S3 object. " +
+                    "Any existing tags with the same key as the new tag will 
be updated with the specified value. If " +
+                    "set to false, the existing tags will be removed and the 
new tag will be set on the S3 object.")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .allowableValues("true", "false")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .defaultValue("true")
+            .build();
+
+    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
+            .name("version")
+            .displayName("Version ID")
+            .description("The Version of the Object to tag")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(KEY, BUCKET, VERSION_ID, TAG_KEY, TAG_VALUE, 
APPEND_TAG, ACCESS_KEY, SECRET_KEY,
+                    CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, 
REGION, TIMEOUT, SSL_CONTEXT_SERVICE,
+                    ENDPOINT_OVERRIDE, SIGNER_OVERRIDE, 
PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT,
+                    PROXY_USERNAME, PROXY_PASSWORD));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String newTagKey = 
context.getProperty(TAG_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String newTagVal = 
context.getProperty(TAG_VALUE).evaluateAttributeExpressions(flowFile).getValue();
+
+        if(StringUtils.isBlank(bucket)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, BUCKET);
+            return;
+        }
+
+        if(StringUtils.isBlank(key)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, KEY);
+            return;
+        }
+
+        if(StringUtils.isBlank(newTagKey)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, TAG_KEY);
+            return;
+        }
+
+        if(StringUtils.isBlank(newTagVal)){
+            failFlowWithBlankEvaluatedProperty(session, flowFile, TAG_VALUE);
+            return;
+        }
+
+        final String version = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 s3 = getClient();
+
+        SetObjectTaggingRequest r;
+        List<Tag> tags = new ArrayList<>();
+
+        try {
+            if(context.getProperty(APPEND_TAG).asBoolean()) {
+                final GetObjectTaggingRequest gr = new 
GetObjectTaggingRequest(bucket, key);
+                GetObjectTaggingResult res = s3.getObjectTagging(gr);
+
+                // preserve tags on S3 object, but filter out existing tag 
keys that match the one we're setting
+                tags = res.getTagSet().stream().filter(t -> 
!t.getKey().equals(newTagKey)).collect(Collectors.toList());
+            }
+
+            tags.add(new Tag(newTagKey, newTagVal));
+
+            if(StringUtils.isBlank(version)){
+                r = new SetObjectTaggingRequest(bucket, key, new 
ObjectTagging(tags));
+            } else{
+                r = new SetObjectTaggingRequest(bucket, key, version, new 
ObjectTagging(tags));
+            }
+            s3.setObjectTagging(r);
+        } catch (final AmazonServiceException ase) {
+            getLogger().error("Failed to tag S3 Object for {}; routing to 
failure", new Object[]{flowFile, ase});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        flowFile = setTagAttributes(session, flowFile, tags);
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully tagged S3 Object for {} in {} millis; 
routing to success", new Object[]{flowFile, transferMillis});
+    }
+
+    private void failFlowWithBlankEvaluatedProperty(ProcessSession session, 
FlowFile flowFile, PropertyDescriptor pd) {
+        getLogger().error("{} value is blank after attribute expression 
language evaluation", new Object[]{pd.getName()});
+        flowFile = session.penalize(flowFile);
+        session.transfer(flowFile, REL_FAILURE);
+    }
+
+    private FlowFile setTagAttributes(ProcessSession session, FlowFile 
flowFile, List<Tag> tags) {
+        flowFile = session.removeAllAttributes(flowFile, 
Pattern.compile("^s3\\.tag\\..*"));
+
+        final Map<String, String> tagAttrs = new HashMap<>();
+        tags.stream().forEach(t -> tagAttrs.put("s3.tag." + t.getKey(), 
t.getValue()));
+        flowFile = session.putAllAttributes(flowFile, tagAttrs);
+        return flowFile;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index c628fb1..68dd72a 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,6 +15,7 @@
 org.apache.nifi.processors.aws.s3.FetchS3Object
 org.apache.nifi.processors.aws.s3.PutS3Object
 org.apache.nifi.processors.aws.s3.DeleteS3Object
+org.apache.nifi.processors.aws.s3.TagS3Object
 org.apache.nifi.processors.aws.s3.ListS3
 org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java
new file mode 100644
index 0000000..b1b42c0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITTagS3Object.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.Tag;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Provides integration level testing with actual AWS S3 resources for {@link 
TagS3Object} and requires additional
+ * configuration and resources to work.
+ */
+public class ITTagS3Object extends AbstractS3IT {
+
+    @Test
+    public void testSimpleTag() throws Exception {
+        String objectKey = "test-file";
+        String tagKey = "nifi-key";
+        String tagValue = "nifi-val";
+
+        // put file in s3
+        putTestFile(objectKey, 
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        // Set up processor
+        final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
+        runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(TagS3Object.REGION, REGION);
+        runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", objectKey);
+        runner.enqueue(new byte[0], attrs);
+
+        // tag file
+        runner.run(1);
+
+        // Verify processor succeeds
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+
+        // Verify tag exists on S3 object
+        GetObjectTaggingResult res = client.getObjectTagging(new 
GetObjectTaggingRequest(BUCKET_NAME, objectKey));
+        assertTrue("Expected tag not found on S3 object", 
res.getTagSet().contains(new Tag(tagKey, tagValue)));
+    }
+
+    @Test
+    public void testAppendTag() throws Exception {
+        String objectKey = "test-file";
+        String tagKey = "nifi-key";
+        String tagValue = "nifi-val";
+
+        Tag existingTag = new Tag("oldkey", "oldvalue");
+
+        // put file in s3
+        putFileWithObjectTag(objectKey, 
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), Arrays.asList(existingTag));
+
+        // Set up processor
+        final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
+        runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(TagS3Object.REGION, REGION);
+        runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", objectKey);
+        runner.enqueue(new byte[0], attrs);
+
+        // tag file
+        runner.run(1);
+
+        // Verify processor succeeds
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+
+        // Verify new tag and existing exist on S3 object
+        GetObjectTaggingResult res = client.getObjectTagging(new 
GetObjectTaggingRequest(BUCKET_NAME, objectKey));
+        assertTrue("Expected new tag not found on S3 object", 
res.getTagSet().contains(new Tag(tagKey, tagValue)));
+        assertTrue("Expected existing tag not found on S3 object", 
res.getTagSet().contains(existingTag));
+    }
+
+    @Test
+    public void testReplaceTags() throws Exception {
+        String objectKey = "test-file";
+        String tagKey = "nifi-key";
+        String tagValue = "nifi-val";
+
+        Tag existingTag = new Tag("s3.tag.oldkey", "oldvalue");
+
+        // put file in s3
+        putFileWithObjectTag(objectKey, 
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), Arrays.asList(existingTag));
+
+        // Set up processor
+        final TestRunner runner = TestRunners.newTestRunner(new TagS3Object());
+        runner.setProperty(TagS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(TagS3Object.REGION, REGION);
+        runner.setProperty(TagS3Object.BUCKET, BUCKET_NAME);
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagValue);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", objectKey);
+        attrs.put("s3.tag."+existingTag.getKey(), existingTag.getValue());
+        runner.enqueue(new byte[0], attrs);
+
+        // tag file
+        runner.run(1);
+
+        // Verify processor succeeds
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+
+        // Verify flowfile attributes match s3 tags
+        MockFlowFile flowFiles = 
runner.getFlowFilesForRelationship(TagS3Object.REL_SUCCESS).get(0);
+        flowFiles.assertAttributeNotExists(existingTag.getKey());
+        flowFiles.assertAttributeEquals("s3.tag."+tagKey, tagValue);
+
+        // Verify new tag exists on S3 object and prior tag removed
+        GetObjectTaggingResult res = client.getObjectTagging(new 
GetObjectTaggingRequest(BUCKET_NAME, objectKey));
+        assertTrue("Expected new tag not found on S3 object", 
res.getTagSet().contains(new Tag(tagKey, tagValue)));
+        assertFalse("Existing tag not replaced on S3 object", 
res.getTagSet().contains(existingTag));
+    }
+}
+
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
new file mode 100644
index 0000000..d337796
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestTagS3Object.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.SetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.Tag;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.proxy.ProxyConfigurationService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestTagS3Object {
+
+    private TestRunner runner = null;
+    private TagS3Object mockTagS3Object = null;
+    private AmazonS3Client actualS3Client = null;
+    private AmazonS3Client mockS3Client = null;
+
+    @Before
+    public void setUp() {
+        mockS3Client = Mockito.mock(AmazonS3Client.class);
+        mockTagS3Object = new TagS3Object() {
+            protected AmazonS3Client getClient() {
+                actualS3Client = client;
+                return mockS3Client;
+            }
+        };
+        runner = TestRunners.newTestRunner(mockTagS3Object);
+    }
+
+    @Test
+    public void testTagObjectSimple() throws IOException {
+        final String tagKey = "k";
+        final String tagVal = "v";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = 
ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, 
Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertNull("test-version", request.getVersionId());
+        assertTrue("Expected tag not found in request", 
request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("s3.tag."+tagKey, tagVal);
+    }
+
+    @Test
+    public void testTagObjectVersion() throws IOException {
+        final String tagKey = "k";
+        final String tagVal = "v";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.VERSION_ID, "test-version");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = 
ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, 
Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertEquals("test-version", request.getVersionId());
+        assertTrue("Expected tag not found in request", 
request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+    }
+
+    @Test
+    public void testTagObjectAppendToExistingTags() throws IOException {
+        //set up existing tags on S3 object
+        Tag currentTag = new Tag("ck", "cv");
+        mockGetExistingTags(currentTag);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        attrs.put("s3.tag."+currentTag.getKey(), currentTag.getValue());
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = 
ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, 
Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertTrue("New tag not found in request", 
request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+        assertTrue("Existing tag not found in request", 
request.getTagging().getTagSet().contains(currentTag));
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("s3.tag."+tagKey, tagVal);
+        ff0.assertAttributeEquals("s3.tag."+currentTag.getKey(), 
currentTag.getValue());
+    }
+
+    @Test
+    public void testTagObjectAppendUpdatesExistingTagValue() throws 
IOException {
+        //set up existing tags on S3 object
+        Tag currentTag1 = new Tag("ck", "cv");
+        Tag currentTag2 = new Tag("nk", "ov");
+        mockGetExistingTags(currentTag1, currentTag2);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = 
ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, 
Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertTrue("New tag not found in request", 
request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+        assertTrue("Existing tag not found in request", 
request.getTagging().getTagSet().contains(currentTag1));
+        assertFalse("Existing tag should be excluded from request", 
request.getTagging().getTagSet().contains(currentTag2));
+    }
+
+    @Test
+    public void testTagObjectReplacesExistingTags() throws IOException {
+        //set up existing tags on S3 object
+        Tag currentTag = new Tag("ck", "cv");
+        mockGetExistingTags(currentTag);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        runner.setProperty(TagS3Object.APPEND_TAG, "false");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "object-key");
+        attrs.put("s3.tag."+currentTag.getKey(), currentTag.getValue());
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TagS3Object.REL_SUCCESS, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = 
ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+        Mockito.verify(mockS3Client, 
Mockito.times(1)).setObjectTagging(captureRequest.capture());
+        SetObjectTaggingRequest request = captureRequest.getValue();
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("object-key", request.getKey());
+        assertTrue("New tag not found in request", 
request.getTagging().getTagSet().contains(new Tag(tagKey, tagVal)));
+        assertFalse("Existing tag should be excluded from request", 
request.getTagging().getTagSet().contains(currentTag));
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS);
+        MockFlowFile ff0 = flowFiles.get(0);
+        ff0.assertAttributeEquals("s3.tag."+tagKey, tagVal);
+        ff0.assertAttributeNotExists("s3.tag."+currentTag.getKey());
+    }
+
+    @Test
+    public void testTagObjectS3Exception() {
+        //set up existing tags on S3 object
+        Tag currentTag = new Tag("ck", "cv");
+        mockGetExistingTags(currentTag);
+
+        final String tagKey = "nk";
+        final String tagVal = "nv";
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, tagKey);
+        runner.setProperty(TagS3Object.TAG_VALUE, tagVal);
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+        Mockito.doThrow(new 
AmazonS3Exception("TagFailure")).when(mockS3Client).setObjectTagging(Mockito.any());
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+        ArgumentCaptor<SetObjectTaggingRequest> captureRequest = 
ArgumentCaptor.forClass(SetObjectTaggingRequest.class);
+    }
+
+    @Test
+    public void testGetPropertyDescriptors() throws Exception {
+        TagS3Object processor = new TagS3Object();
+        List<PropertyDescriptor> pd = 
processor.getSupportedPropertyDescriptors();
+        assertEquals("size should be eq", 20, pd.size());
+        assertTrue(pd.contains(TagS3Object.ACCESS_KEY));
+        assertTrue(pd.contains(TagS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
+        assertTrue(pd.contains(TagS3Object.BUCKET));
+        assertTrue(pd.contains(TagS3Object.CREDENTIALS_FILE));
+        assertTrue(pd.contains(TagS3Object.ENDPOINT_OVERRIDE));
+        assertTrue(pd.contains(TagS3Object.KEY));
+        assertTrue(pd.contains(TagS3Object.REGION));
+        assertTrue(pd.contains(TagS3Object.SECRET_KEY));
+        assertTrue(pd.contains(TagS3Object.SIGNER_OVERRIDE));
+        assertTrue(pd.contains(TagS3Object.SSL_CONTEXT_SERVICE));
+        assertTrue(pd.contains(TagS3Object.TIMEOUT));
+        
assertTrue(pd.contains(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE));
+        assertTrue(pd.contains(TagS3Object.PROXY_HOST));
+        assertTrue(pd.contains(TagS3Object.PROXY_HOST_PORT));
+        assertTrue(pd.contains(TagS3Object.PROXY_USERNAME));
+        assertTrue(pd.contains(TagS3Object.PROXY_PASSWORD));
+        assertTrue(pd.contains(TagS3Object.TAG_KEY));
+        assertTrue(pd.contains(TagS3Object.TAG_VALUE));
+        assertTrue(pd.contains(TagS3Object.APPEND_TAG));
+        assertTrue(pd.contains(TagS3Object.VERSION_ID));
+    }
+
+    @Test
+    public void testBucketEvaluatedAsBlank() {
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "${not.existant.attribute}");
+        runner.setProperty(TagS3Object.TAG_KEY, "key");
+        runner.setProperty(TagS3Object.TAG_VALUE, "val");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testTagKeyEvaluatedAsBlank() {
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, "${not.existant.attribute}");
+        runner.setProperty(TagS3Object.TAG_VALUE, "val");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testTagValEvaluatedAsBlank() {
+        runner.setProperty(TagS3Object.REGION, "us-west-2");
+        runner.setProperty(TagS3Object.BUCKET, "test-bucket");
+        runner.setProperty(TagS3Object.TAG_KEY, "tagKey");
+        runner.setProperty(TagS3Object.TAG_VALUE, "${not.existant.attribute}");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "delete-key");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    private void mockGetExistingTags(Tag... currentTag) {
+        List<Tag> currentTags = new ArrayList<>(Arrays.asList(currentTag));
+        
Mockito.when(mockS3Client.getObjectTagging(Mockito.anyObject())).thenReturn(new 
GetObjectTaggingResult(currentTags));
+    }
+}

Reply via email to