This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 47f4c8ce2e NIFI-12028 - Add s3.region attribute to ListS3 generated
flow files
47f4c8ce2e is described below
commit 47f4c8ce2e0b1ec5fa176cb44a94e64dce354fa0
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Sep 8 13:01:33 2023 +0200
NIFI-12028 - Add s3.region attribute to ListS3 generated flow files
This closes #7672
Signed-off-by: Mike Thomsen <[email protected]>
---
.../org/apache/nifi/processors/aws/s3/ListS3.java | 21 +++++++++++++--------
.../apache/nifi/processors/aws/s3/TestListS3.java | 2 ++
2 files changed, 15 insertions(+), 8 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 0f51b73317..5a1649814e 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -116,6 +116,7 @@ import java.util.stream.Collectors;
+ "Node is selected, the new node can pick up where the previous node
left off, without duplicating the data.")
@WritesAttributes({
@WritesAttribute(attribute = "s3.bucket", description = "The name of
the S3 bucket"),
+ @WritesAttribute(attribute = "s3.region", description = "The region of
the S3 bucket"),
@WritesAttribute(attribute = "filename", description = "The name of
the file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that
can be used to see if the file has changed"),
@WritesAttribute(attribute = "s3.isLatest", description = "A boolean
indicating if this is the latest version of the object"),
@@ -499,7 +500,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
- writer = new RecordObjectWriter(session, writerFactory,
getLogger());
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
}
try {
@@ -523,7 +524,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
ObjectMetadata objectMetadata = getObjectMetadata(context,
client, versionSummary);
// Write the entity to the listing
- writer.addToListing(versionSummary, taggingResult,
objectMetadata);
+ writer.addToListing(versionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
// Track the latest lastModified timestamp and keys having
that timestamp.
// NOTE: Amazon S3 lists objects in UTF-8 character
encoding in lexicographical order. Not ordered by timestamps.
@@ -633,7 +634,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
if (writerFactory == null) {
writer = new AttributeObjectWriter(session);
} else {
- writer = new RecordObjectWriter(session, writerFactory,
getLogger());
+ writer = new RecordObjectWriter(session, writerFactory,
getLogger(), context.getProperty(S3_REGION).getValue());
}
try {
@@ -648,7 +649,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
GetObjectTaggingResult taggingResult =
getTaggingResult(context, s3Client, s3VersionSummary);
ObjectMetadata objectMetadata = getObjectMetadata(context,
s3Client, s3VersionSummary);
- writer.addToListing(s3VersionSummary, taggingResult,
objectMetadata);
+ writer.addToListing(s3VersionSummary, taggingResult,
objectMetadata, context.getProperty(S3_REGION).getValue());
listCount++;
@@ -910,7 +911,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
interface S3ObjectWriter {
void beginListing() throws IOException, SchemaNotFoundException;
- void addToListing(S3VersionSummary summary, GetObjectTaggingResult
taggingResult, ObjectMetadata objectMetadata) throws IOException;
+ void addToListing(S3VersionSummary summary, GetObjectTaggingResult
taggingResult, ObjectMetadata objectMetadata, String region) throws IOException;
void finishListing() throws IOException;
@@ -957,11 +958,13 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
private final ComponentLog logger;
private RecordSetWriter recordWriter;
private FlowFile flowFile;
+ private String region;
- public RecordObjectWriter(final ProcessSession session, final
RecordSetWriterFactory writerFactory, final ComponentLog logger) {
+ public RecordObjectWriter(final ProcessSession session, final
RecordSetWriterFactory writerFactory, final ComponentLog logger, final String
region) {
this.session = session;
this.writerFactory = writerFactory;
this.logger = logger;
+ this.region = region;
}
@Override
@@ -974,7 +977,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
}
@Override
- public void addToListing(final S3VersionSummary summary, final
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata)
throws IOException {
+ public void addToListing(final S3VersionSummary summary, final
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata,
String region) throws IOException {
recordWriter.write(createRecordForListing(summary, taggingResult,
objectMetadata));
}
@@ -988,6 +991,7 @@ public class ListS3 extends AbstractS3Processor implements
VerifiableProcessor {
} else {
final Map<String, String> attributes = new
HashMap<>(writeResult.getAttributes());
attributes.put("record.count",
String.valueOf(writeResult.getRecordCount()));
+ attributes.put("s3.region", region);
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
@@ -1060,11 +1064,12 @@ public class ListS3 extends AbstractS3Processor
implements VerifiableProcessor {
}
@Override
- public void addToListing(final S3VersionSummary versionSummary, final
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata) {
+ public void addToListing(final S3VersionSummary versionSummary, final
GetObjectTaggingResult taggingResult, final ObjectMetadata objectMetadata,
final String region) {
// Create the attributes
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.FILENAME.key(),
versionSummary.getKey());
attributes.put("s3.bucket", versionSummary.getBucketName());
+ attributes.put("s3.region", region);
if (versionSummary.getOwner() != null) { // We may not have
permission to read the owner
attributes.put("s3.owner", versionSummary.getOwner().getId());
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 031d833530..4b2afc9cf2 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -116,6 +116,7 @@ public class TestListS3 {
MockFlowFile ff0 = flowFiles.get(0);
ff0.assertAttributeEquals("filename", "a");
ff0.assertAttributeEquals("s3.bucket", "test-bucket");
+ ff0.assertAttributeEquals("s3.region", "eu-west-1");
String lastModifiedTimestamp = String.valueOf(lastModified.getTime());
ff0.assertAttributeEquals("s3.lastModified", lastModifiedTimestamp);
flowFiles.get(1).assertAttributeEquals("filename", "b/c");
@@ -174,6 +175,7 @@ public class TestListS3 {
final MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ListS3.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals("record.count", "3");
+ flowFile.assertAttributeEquals("s3.region", "eu-west-1");
flowFile.assertContentEquals("a,test-bucket,,," + lastModifiedString +
",0,,true,,,\n"
+ "b/c,test-bucket,,," + lastModifiedString + ",0,,true,,,\n"
+ "d/e,test-bucket,,," + lastModifiedString + ",0,,true,,,\n");