This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new db7e073368 NIFI-13203: Include s3.url attribute in FetchS3Object,
PutS3Object This closes #8795
db7e073368 is described below
commit db7e073368eeec2a17da73f74415d64d81e55458
Author: Mark Payne <[email protected]>
AuthorDate: Thu May 9 17:02:18 2024 -0400
NIFI-13203: Include s3.url attribute in FetchS3Object, PutS3Object
This closes #8795
Signed-off-by: Joseph Witt <[email protected]>
---
.../java/org/apache/nifi/processors/aws/s3/FetchS3Object.java | 8 ++++----
.../main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java | 8 ++++----
.../java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java | 9 ++++-----
3 files changed, 12 insertions(+), 13 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 21f6371c61..7d5ae411f3 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -70,6 +70,7 @@ import static
org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;
@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
@CapabilityDescription("Retrieves the contents of an S3 Object and writes it
to the content of a FlowFile")
@WritesAttributes({
+ @WritesAttribute(attribute = "s3.url", description = "The URL that can be
used to access the S3 object"),
@WritesAttribute(attribute = "s3.bucket", description = "The name of the
S3 bucket"),
@WritesAttribute(attribute = "path", description = "The path of the file"),
@WritesAttribute(attribute = "absolute.path", description = "The path of
the file"),
@@ -439,12 +440,11 @@ public class FetchS3Object extends AbstractS3Processor {
throw ffae;
}
- if (!attributes.isEmpty()) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
+ final String url = client.getResourceUrl(bucket, key);
+ attributes.put("s3.url", url);
+ flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
- final String url = client.getResourceUrl(bucket, key);
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully retrieved S3 Object for {} in {}
millis; routing to success", new Object[]{flowFile, transferMillis});
session.getProvenanceReporter().fetch(flowFile, url, transferMillis);
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 9237f77774..80c3645f7d 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -105,6 +105,7 @@ import static
org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's
filename as the filename for the S3 object")
@WritesAttributes({
+ @WritesAttribute(attribute = "s3.url", description = "The URL that can be
used to access the S3 object"),
@WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket
where the Object was put in S3"),
@WritesAttribute(attribute = "s3.key", description = "The S3 key within
where the Object was put in S3"),
@WritesAttribute(attribute = "s3.contenttype", description = "The S3
content type of the S3 Object that put in S3"),
@@ -857,12 +858,11 @@ public class PutS3Object extends AbstractS3Processor {
throw e;
}
- if (!attributes.isEmpty()) {
- flowFile = session.putAllAttributes(flowFile, attributes);
- }
+ final String url = s3.getResourceUrl(bucket, key);
+ attributes.put("s3.url", url);
+ flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
- final String url = s3.getResourceUrl(bucket, key);
final long millis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().send(flowFile, url, millis);
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 7b983a23aa..bc1d533ea1 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -449,9 +449,9 @@ public class ITPutS3Object extends AbstractS3IT {
runner.setProperty(PutS3Object.BUCKET_WITHOUT_DEFAULT_VALUE,
BUCKET_NAME);
runner.setProperty(PutS3Object.KEY, "${filename}");
- Map<String, String> attribs = new HashMap<>();
- attribs.put(CoreAttributes.FILENAME.key(), PROV1_FILE);
- runner.enqueue("prov1 contents".getBytes(), attribs);
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), PROV1_FILE);
+ runner.enqueue("prov1 contents".getBytes(), attributes);
runner.assertValid();
runner.run();
@@ -461,12 +461,11 @@ public class ITPutS3Object extends AbstractS3IT {
final List<ProvenanceEventRecord> provenanceEvents =
runner.getProvenanceEvents();
assertEquals(1, provenanceEvents.size());
- ProvenanceEventRecord provRec1 = provenanceEvents.get(0);
+ ProvenanceEventRecord provRec1 = provenanceEvents.getFirst();
assertEquals(ProvenanceEventType.SEND, provRec1.getEventType());
assertEquals(runner.getProcessor().getIdentifier(),
provRec1.getComponentId());
String targetUri = getClient().getUrl(BUCKET_NAME,
PROV1_FILE).toString();
assertEquals(targetUri, provRec1.getTransitUri());
- assertEquals(8, provRec1.getUpdatedAttributes().size());
assertEquals(BUCKET_NAME,
provRec1.getUpdatedAttributes().get(PutS3Object.S3_BUCKET_KEY));
}