This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 5be76cddde NIFI-12028 - Add s3.region attribute to ListS3 generated 
flow files
5be76cddde is described below

commit 5be76cdddef928744ee2c41f90955d4c372961f6
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Sep 8 13:01:33 2023 +0200

    NIFI-12028 - Add s3.region attribute to ListS3 generated flow files
    
    This closes #7672
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../org/apache/nifi/processors/aws/s3/ListS3.java   | 21 +++++++++++++--------
 .../apache/nifi/processors/aws/s3/TestListS3.java   |  2 ++
 2 files changed, 15 insertions(+), 8 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 954bfa0856..b90ad3babf 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -116,6 +116,7 @@ import java.util.stream.Collectors;
         + "Node is selected, the new node can pick up where the previous node 
left off, without duplicating the data.")
 @WritesAttributes({
         @WritesAttribute(attribute = "s3.bucket", description = "The name of 
the S3 bucket"),
+        @WritesAttribute(attribute = "s3.region", description = "The region of 
the S3 bucket"),
         @WritesAttribute(attribute = "filename", description = "The name of 
the file"),
         @WritesAttribute(attribute = "s3.etag", description = "The ETag that 
can be used to see if the file has changed"),
         @WritesAttribute(attribute = "s3.isLatest", description = "A boolean 
indicating if this is the latest version of the object"),
@@ -499,7 +500,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
         if (writerFactory == null) {
             writer = new AttributeObjectWriter(session);
         } else {
-            writer = new RecordObjectWriter(session, writerFactory, 
getLogger());
+            writer = new RecordObjectWriter(session, writerFactory, 
getLogger(), context.getProperty(S3_REGION).getValue());
         }
 
         try {
@@ -523,7 +524,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
                     ObjectMetadata objectMetadata = getObjectMetadata(context, 
client, versionSummary);
 
                     // Write the entity to the listing
-                    writer.addToListing(versionSummary, taggingResult, 
objectMetadata);
+                    writer.addToListing(versionSummary, taggingResult, 
objectMetadata, context.getProperty(S3_REGION).getValue());
 
                     // Track the latest lastModified timestamp and keys having 
that timestamp.
                     // NOTE: Amazon S3 lists objects in UTF-8 character 
encoding in lexicographical order. Not ordered by timestamps.
@@ -633,7 +634,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
             if (writerFactory == null) {
                 writer = new AttributeObjectWriter(session);
             } else {
-                writer = new RecordObjectWriter(session, writerFactory, 
getLogger());
+                writer = new RecordObjectWriter(session, writerFactory, 
getLogger(), context.getProperty(S3_REGION).getValue());
             }
 
             try {
@@ -648,7 +649,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
                     GetObjectTaggingResult taggingResult = 
getTaggingResult(context, s3Client, s3VersionSummary);
                     ObjectMetadata objectMetadata = getObjectMetadata(context, 
s3Client, s3VersionSummary);
 
-                    writer.addToListing(s3VersionSummary, taggingResult, 
objectMetadata);
+                    writer.addToListing(s3VersionSummary, taggingResult, 
objectMetadata, context.getProperty(S3_REGION).getValue());
 
                     listCount++;
 
@@ -910,7 +911,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
     interface S3ObjectWriter {
         void beginListing() throws IOException, SchemaNotFoundException;
 
-        void addToListing(S3VersionSummary summary, GetObjectTaggingResult 
taggingResult, ObjectMetadata objectMetadata) throws IOException;
+        void addToListing(S3VersionSummary summary, GetObjectTaggingResult 
taggingResult, ObjectMetadata objectMetadata, String region) throws IOException;
 
         void finishListing() throws IOException;
 
@@ -957,11 +958,13 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
         private final ComponentLog logger;
         private RecordSetWriter recordWriter;
         private FlowFile flowFile;
+        private String region;
 
-        public RecordObjectWriter(final ProcessSession session, final 
RecordSetWriterFactory writerFactory, final ComponentLog logger) {
+        public RecordObjectWriter(final ProcessSession session, final 
RecordSetWriterFactory writerFactory, final ComponentLog logger, final String 
region) {
             this.session = session;
             this.writerFactory = writerFactory;
             this.logger = logger;
+            this.region = region;
         }
 
         @Override
@@ -974,7 +977,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
         }
 
         @Override
-        public void addToListing(final S3VersionSummary summary, final 
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) 
throws IOException {
+        public void addToListing(final S3VersionSummary summary, final 
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata, 
String region) throws IOException {
             recordWriter.write(createRecordForListing(summary, taggingResult, 
objectMetadata));
         }
 
@@ -988,6 +991,7 @@ public class ListS3 extends AbstractS3Processor implements 
VerifiableProcessor {
             } else {
                 final Map<String, String> attributes = new 
HashMap<>(writeResult.getAttributes());
                 attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+                attributes.put("s3.region", region);
                 flowFile = session.putAllAttributes(flowFile, attributes);
 
                 session.transfer(flowFile, REL_SUCCESS);
@@ -1060,11 +1064,12 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
         }
 
         @Override
-        public void addToListing(final S3VersionSummary versionSummary, final 
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
+        public void addToListing(final S3VersionSummary versionSummary, final 
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata, 
final String region) {
             // Create the attributes
             final Map<String, String> attributes = new HashMap<>();
             attributes.put(CoreAttributes.FILENAME.key(), 
versionSummary.getKey());
             attributes.put("s3.bucket", versionSummary.getBucketName());
+            attributes.put("s3.region", region);
             if (versionSummary.getOwner() != null) { // We may not have 
permission to read the owner
                 attributes.put("s3.owner", versionSummary.getOwner().getId());
             }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 031d833530..4b2afc9cf2 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -116,6 +116,7 @@ public class TestListS3 {
         MockFlowFile ff0 = flowFiles.get(0);
         ff0.assertAttributeEquals("filename", "a");
         ff0.assertAttributeEquals("s3.bucket", "test-bucket");
+        ff0.assertAttributeEquals("s3.region", "eu-west-1");
         String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
         ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
         flowFiles.get(1).assertAttributeEquals("filename", "b/c");
@@ -174,6 +175,7 @@ public class TestListS3 {
 
         final MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
         flowFile.assertAttributeEquals("record.count", "3");
+        flowFile.assertAttributeEquals("s3.region", "eu-west-1");
         flowFile.assertContentEquals("a,test-bucket,,," + lastModifiedString + 
",0,,true,,,\n"
             + "b/c,test-bucket,,," + lastModifiedString + ",0,,true,,,\n"
             + "d/e,test-bucket,,," + lastModifiedString + ",0,,true,,,\n");

Reply via email to