This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new cdee1d8 NIFI-6487 Add S3 User Metadata to ListS3 processor
cdee1d8 is described below
commit cdee1d8c09c52dd2aa78311cf8c8fe9b2f449d44
Author: JF Beauvais <[email protected]>
AuthorDate: Thu Jul 25 14:20:35 2019 +0200
NIFI-6487 Add S3 User Metadata to ListS3 processor
Fix imports auto formatted by intellij
NIFI-6487 Fix WriteAttribute documentation
This closes #3603.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../org/apache/nifi/processors/aws/s3/ListS3.java | 46 +++++++++++++++++----
.../nifi/processors/aws/s3/AbstractS3IT.java | 9 ++++
.../apache/nifi/processors/aws/s3/ITListS3.java | 32 +++++++++++++++
.../apache/nifi/processors/aws/s3/TestListS3.java | 48 ++++++++++++++++++----
4 files changed, 117 insertions(+), 18 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index d3bade9..fb4e49f 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -26,9 +26,19 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingResult;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
import com.amazonaws.services.s3.model.Tag;
+import com.amazonaws.services.s3.model.VersionListing;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -53,14 +63,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ListVersionsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.amazonaws.services.s3.model.S3VersionSummary;
-import com.amazonaws.services.s3.model.VersionListing;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
@PrimaryNodeOnly
@TriggerSerially
@@ -85,6 +87,8 @@ import com.amazonaws.services.s3.model.ListObjectsV2Result;
@WritesAttribute(attribute = "s3.storeClass", description = "The
storage class of the object"),
@WritesAttribute(attribute = "s3.version", description = "The version
of the object, if applicable"),
@WritesAttribute(attribute = "s3.tag.___", description = "If 'Write
Object Tags' is set to 'True', the tags associated to the S3 object that is
being listed " +
+ "will be written as part of the flowfile attributes"),
+ @WritesAttribute(attribute = "s3.user.metadata.___", description = "If
'Write User Metadata' is set to 'True', the user defined metadata associated to
the S3 object that is being listed " +
"will be written as part of the flowfile attributes")})
@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
public class ListS3 extends AbstractS3Processor {
@@ -150,8 +154,18 @@ public class ListS3 extends AbstractS3Processor {
.defaultValue("false")
.build();
+ public static final PropertyDescriptor WRITE_USER_METADATA = new
PropertyDescriptor.Builder()
+ .name("write-s3-user-metadata")
+ .displayName("Write User Metadata")
+ .description("If set to 'True', the user defined metadata
associated with the S3 object will be written as FlowFile attributes")
+ .required(true)
+ .allowableValues(new AllowableValue("true", "True"), new
AllowableValue("false", "False"))
+ .defaultValue("false")
+ .build();
+
+
public static final List<PropertyDescriptor> properties =
Collections.unmodifiableList(
- Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY,
WRITE_OBJECT_TAGS, CREDENTIALS_FILE,
+ Arrays.asList(BUCKET, REGION, ACCESS_KEY, SECRET_KEY,
WRITE_OBJECT_TAGS, WRITE_USER_METADATA, CREDENTIALS_FILE,
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE, PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
PROXY_HOST_PORT, PROXY_USERNAME,
PROXY_PASSWORD, DELIMITER, PREFIX, USE_VERSIONS,
LIST_TYPE, MIN_AGE));
@@ -286,6 +300,9 @@ public class ListS3 extends AbstractS3Processor {
if (context.getProperty(WRITE_OBJECT_TAGS).asBoolean()) {
attributes.putAll(writeObjectTags(client, versionSummary));
}
+ if (context.getProperty(WRITE_USER_METADATA).asBoolean()) {
+ attributes.putAll(writeUserMetadata(client,
versionSummary));
+ }
// Create the flowfile
FlowFile flowFile = session.create();
@@ -354,6 +371,17 @@ public class ListS3 extends AbstractS3Processor {
return tagMap;
}
+ private Map<String, String> writeUserMetadata(AmazonS3 client,
S3VersionSummary versionSummary) {
+ ObjectMetadata objectMetadata = client.getObjectMetadata(new
GetObjectMetadataRequest(versionSummary.getBucketName(),
versionSummary.getKey()));
+ final Map<String, String> metadata = new HashMap<>();
+ if (objectMetadata != null) {
+ for (Map.Entry<String, String> e :
objectMetadata.getUserMetadata().entrySet()) {
+ metadata.put("s3.user.metadata." + e.getKey(), e.getValue());
+ }
+ }
+ return metadata;
+ }
+
private interface S3BucketLister {
public void setBucketName(String bucketName);
public void setPrefix(String prefix);
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
index d59093b..00f8567 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/AbstractS3IT.java
@@ -42,6 +42,7 @@ import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.fail;
@@ -148,6 +149,14 @@ public abstract class AbstractS3IT {
client.putObject(putRequest);
}
+ protected void putFileWithUserMetadata(String key, File file, Map<String,
String> userMetadata) throws AmazonS3Exception, FileNotFoundException {
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setUserMetadata(userMetadata);
+ PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key,
new FileInputStream(file), objectMetadata);
+
+ client.putObject(putRequest);
+ }
+
protected void putFileWithObjectTag(String key, File file, List<Tag>
objectTags) {
PutObjectRequest putRequest = new PutObjectRequest(BUCKET_NAME, key,
file);
putRequest.setTagging(new ObjectTagging(objectTags));
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
index fbe17ed..3a47283 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITListS3.java
@@ -24,9 +24,12 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
@@ -173,4 +176,33 @@ public class ITListS3 extends AbstractS3IT {
flowFiles.assertAttributeEquals("s3.tag.dummytag2", "dummyvalue2");
}
+ @Test
+ public void testUserMetadataWritten() throws FileNotFoundException {
+ Map<String, String> userMetadata = new HashMap<>();
+ userMetadata.put("dummy.metadata.1", "dummyvalue1");
+ userMetadata.put("dummy.metadata.2", "dummyvalue2");
+
+ putFileWithUserMetadata("b/fileWithUserMetadata",
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME), userMetadata);
+
+ final TestRunner runner = TestRunners.newTestRunner(new ListS3());
+
+ runner.setProperty(ListS3.CREDENTIALS_FILE, CREDENTIALS_FILE);
+ runner.setProperty(ListS3.PREFIX, "b/");
+ runner.setProperty(ListS3.REGION, REGION);
+ runner.setProperty(ListS3.BUCKET, BUCKET_NAME);
+ runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ListS3.REL_SUCCESS, 1);
+
+ MockFlowFile flowFiles =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
+
+ flowFiles.assertAttributeEquals("filename", "b/fileWithUserMetadata");
+ flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.1");
+ flowFiles.assertAttributeExists("s3.user.metadata.dummy.metadata.2");
+ flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.1",
"dummyvalue1");
+ flowFiles.assertAttributeEquals("s3.user.metadata.dummy.metadata.2",
"dummyvalue2");
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index be12d6d..3798d84 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -23,7 +23,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectTaggingRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ListVersionsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.S3VersionSummary;
+import com.amazonaws.services.s3.model.VersionListing;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
@@ -34,14 +43,6 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ListVersionsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
-import com.amazonaws.services.s3.model.S3ObjectSummary;
-import com.amazonaws.services.s3.model.S3VersionSummary;
-import com.amazonaws.services.s3.model.VersionListing;
-import com.amazonaws.services.s3.model.ListObjectsV2Request;
-import com.amazonaws.services.s3.model.ListObjectsV2Result;
import org.junit.Before;
import org.junit.Test;
@@ -320,10 +321,38 @@ public class TestListS3 {
}
@Test
+ public void testWriteUserMetadata() {
+ runner.setProperty(ListS3.REGION, "eu-west-1");
+ runner.setProperty(ListS3.BUCKET, "test-bucket");
+ runner.setProperty(ListS3.WRITE_USER_METADATA, "true");
+
+ Date lastModified = new Date();
+ ObjectListing objectListing = new ObjectListing();
+ S3ObjectSummary objectSummary1 = new S3ObjectSummary();
+ objectSummary1.setBucketName("test-bucket");
+ objectSummary1.setKey("a");
+ objectSummary1.setLastModified(lastModified);
+ objectListing.getObjectSummaries().add(objectSummary1);
+
+
Mockito.when(mockS3Client.listObjects(Mockito.any(ListObjectsRequest.class))).thenReturn(objectListing);
+
+ runner.run();
+
+ ArgumentCaptor<GetObjectMetadataRequest> captureRequest =
ArgumentCaptor.forClass(GetObjectMetadataRequest.class);
+ Mockito.verify(mockS3Client,
Mockito.times(1)).getObjectMetadata(captureRequest.capture());
+ GetObjectMetadataRequest request = captureRequest.getValue();
+
+ assertEquals("test-bucket", request.getBucketName());
+ assertEquals("a", request.getKey());
+
+ Mockito.verify(mockS3Client,
Mockito.never()).listVersions(Mockito.any());
+ }
+
+ @Test
public void testGetPropertyDescriptors() throws Exception {
ListS3 processor = new ListS3();
List<PropertyDescriptor> pd =
processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 21, pd.size());
+ assertEquals("size should be eq", 22, pd.size());
assertTrue(pd.contains(ListS3.ACCESS_KEY));
assertTrue(pd.contains(ListS3.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(ListS3.BUCKET));
@@ -331,6 +360,7 @@ public class TestListS3 {
assertTrue(pd.contains(ListS3.ENDPOINT_OVERRIDE));
assertTrue(pd.contains(ListS3.REGION));
assertTrue(pd.contains(ListS3.WRITE_OBJECT_TAGS));
+ assertTrue(pd.contains(ListS3.WRITE_USER_METADATA));
assertTrue(pd.contains(ListS3.SECRET_KEY));
assertTrue(pd.contains(ListS3.SIGNER_OVERRIDE));
assertTrue(pd.contains(ListS3.SSL_CONTEXT_SERVICE));