This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new cdee1d8  NIFI-6487 Add S3 User Metadata to ListS3 processor
cdee1d8 is described below

commit cdee1d8c09c52dd2aa78311cf8c8fe9b2f449d44
Author: JF Beauvais <[email protected]>
AuthorDate: Thu Jul 25 14:20:35 2019 +0200

    NIFI-6487 Add S3 User Metadata to ListS3 processor
    
    Fix imports auto formatted by intellij
    
    NIFI-6487 Fix WriteAttribute documentation
    
    This closes #3603.
    
    Signed-off-by: Koji Kawamura <[email protected]>
---
 .../org/apache/nifi/processors/aws/s3/ListS3.java  | 46 +++++++++++++++++----
 .../nifi/processors/aws/s3/AbstractS3IT.java       |  9 ++++
 .../apache/nifi/processors/aws/s3/ITListS3.java    | 32 +++++++++++++++
 .../apache/nifi/processors/aws/s3/TestListS3.java  | 48 ++++++++++++++++++----
 4 files changed, 117 insertions(+), 18 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index d3bade9..fb4e49f 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -26,9 +26,19 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
 import com.amazonaws.services.s3.model.Tag;
+import com.amazonaws.services.s3.model.VersionListing;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -53,14 +63,6 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ListVersionsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.amazonaws.services.s3.model.S3VersionSummary;
-import com.amazonaws.services.s3.model.VersionListing;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
 
 @PrimaryNodeOnly
 @TriggerSerially
@@ -85,6 +87,8 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result;
         @WritesAttribute(attribute = "s3.storeClass", description = "The 
storage class of the object"),
         @WritesAttribute(attribute = "s3.version", description = "The version 
of the object, if applicable"),
         @WritesAttribute(attribute = "s3.tag.___", description = "If 'Write 
Object Tags' is set to 'True', the tags associated to the S3 object that is 
being listed " +
+                "will be written as part of the flowfile attributes"),
+        @WritesAttribute(attribute = "s3.user.metadata.___", description = "If 
'Write User Metadata' is set to 'True', the user defined metadata associated to 
the S3 object that is being listed " +
                 "will be written as part of the flowfile attributes")})
 @SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
 public class ListS3 extends AbstractS3Processor {
@@ -150,8 +154,18 @@ public class ListS3 extends AbstractS3Processor {
             .defaultValue("false")
             .build();
 
+    public static final PropertyDescriptor WRITE_USER_METADATA = new 
PropertyDescriptor.Builder()
+            .name("write-s3-user-metadata")
+            .displayName("Write User Metadata")
+            .description("If set to 'True', the user defined metadata 
associated with the S3 object will be written as FlowFile attributes")
+            .required(true)
+            .allowableValues(new AllowableValue("true", "True"), new 
AllowableValue("false", "False"))
+            .defaultValue("false")
+            .build();
+
+
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, 
WRITE_OBJECT_TAGS, CREDENTIALS_FILE,
+            Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY, 
WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE,
                     AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, 
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
                     SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, 
PROXY_HOST_PORT, PROXY_USERNAME,
                     PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS, 
LIST_TYPE, MIN_AGE));
@@ -286,6 +300,9 @@ public class ListS3 extends AbstractS3Processor {
                 if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
                     attributes.putAll(writeObjectTags(client, versionSummary));
                 }
+                if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+                    attributes.putAll(writeUserMetadata(client, 
versionSummary));
+                }
 
                 // Create the flowfile
                 FlowFile flowFile = session.create();
@@ -354,6 +371,17 @@ public class ListS3 extends AbstractS3Processor {
         return tagMap;
     }
 
+    private Map<String, String> writeUserMetadata(AmazonS3 client, 
S3VersionSummary versionSummary) {
+        ObjectMetadata objectMetadata = client.getObjectMetadata(new 
GetObjectMetadataRequest(versionSummary.getBucketName(), 
versionSummary.getKey()));
+        final Map<String, String> metadata = new HashMap<>();
+        if (objectMetadata != null) {
+            for (Map.Entry<String, String> e : 
objectMetadata.getUserMetadata().entrySet()) {
+                metadata.put("s3.user.metadata." + e.getKey(), e.getValue());
+            }
+        }
+        return metadata;
+    }
+
     private interface S3BucketLister {
         public void setBucketName(String bucketName);
         public void setPrefix(String prefix);
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index d59093b..00f8567 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -42,6 +42,7 @@ import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.fail;
 
@@ -148,6 +149,14 @@ public abstract class AbstractS3IT {
         client.putObject(putRequest);
     }
 
+    protected void putFileWithUserMetadata(String key, File file, Map<String, 
String> userMetadata) throws AmazonS3Exception, FileNotFoundException {
+        ObjectMetadata objectMetadata = new ObjectMetadata();
+        objectMetadata.setUserMetadata(userMetadata);
+        PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, 
new FileInputStream(file), objectMetadata);
+
+        client.putObject(putRequest);
+    }
+
     protected void putFileWithObjectTag(String key, File file, List<Tag> 
objectTags) {
         PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key, 
file);
         putRequest.setTagging(new ObjectTagging(objectTags));
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
index fbe17ed..3a47283 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
@@ -24,9 +24,12 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 
 /**
@@ -173,4 +176,33 @@ public class ITListS3 extends AbstractS3IT {
         flowFiles.assertAttributeEquals("s3.tag.dummytag2", "dummyvalue2");
     }
 
+    @Test
+    public void testUserMetadataWritten() throws FileNotFoundException {
+        Map<String, String> userMetadata = new HashMap<>();
+        userMetadata.put("dummy.metadata.1", "dummyvalue1");
+        userMetadata.put("dummy.metadata.2", "dummyvalue2");
+
+        putFileWithUserMetadata("b/fileWithUserMetadata", 
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), userMetadata);
+
+        final TestRunner runner = TestRunners.newTestRunner(new ListS3());
+
+        runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(ListS3.PREFIX, "b/");
+        runner.setProperty(ListS3.REGION, REGION);
+        runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
+        runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+
+        MockFlowFile flowFiles = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
+
+        flowFiles.assertAttributeEquals("filename", "b/fileWithUserMetadata");
+        flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.1");
+        flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.2");
+        flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.1", 
"dummyvalue1");
+        flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.2", 
"dummyvalue2");
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index be12d6d..3798d84 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -23,7 +23,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
+import com.amazonaws.services.s3.model.VersionListing;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
@@ -34,14 +43,6 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 
 import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ListVersionsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.amazonaws.services.s3.model.S3VersionSummary;
-import com.amazonaws.services.s3.model.VersionListing;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -320,10 +321,38 @@ public class TestListS3 {
     }
 
     @Test
+    public void testWriteUserMetadata() {
+        runner.setProperty(ListS3.REGION, "eu-west-1");
+        runner.setProperty(ListS3.BUCKET, "test-bucket");
+        runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
+
+        Date lastModified = new Date();
+        ObjectListing objectListing = new ObjectListing();
+        S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+        objectSummary1.setBucketName("test-bucket");
+        objectSummary1.setKey("a");
+        objectSummary1.setLastModified(lastModified);
+        objectListing.getObjectSummaries().add(objectSummary1);
+
+        
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
+        runner.run();
+
+        ArgumentCaptor<GetObjectMetadataRequest> captureRequest = 
ArgumentCaptor.forClass(GetObjectMetadataRequest.class);
+        Mockito.verify(mockS3Client, 
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
+        GetObjectMetadataRequest request = captureRequest.getValue();
+
+        assertEquals("test-bucket", request.getBucketName());
+        assertEquals("a", request.getKey());
+
+        Mockito.verify(mockS3Client, 
Mockito.never()).listVersions(Mockito.any());
+    }
+
+    @Test
     public void testGetPropertyDescriptors() throws Exception {
         ListS3 processor = new ListS3();
         List<PropertyDescriptor> pd = 
processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 21, pd.size());
+        assertEquals("size should be eq", 22, pd.size());
         assertTrue(pd.contains(ListS3.ACCESS_KEY));
         assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(ListS3.BUCKET));
@@ -331,6 +360,7 @@ public class TestListS3 {
         assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
         assertTrue(pd.contains(ListS3.REGION));
         assertTrue(pd.contains(ListS3.WRITE_OBJECT_TAGS));
+        assertTrue(pd.contains(ListS3.WRITE_USER_METADATA));
         assertTrue(pd.contains(ListS3.SECRET_KEY));
         assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
         assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));

Reply via email to