[
https://issues.apache.org/jira/browse/NIFI-4786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333348#comment-16333348
]
ASF GitHub Bot commented on NIFI-4786:
--------------------------------------
Github user jvwing commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2409#discussion_r162796692
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
---
@@ -89,63 +89,80 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final long maxBufferSizeBytes =
context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
- final String firehoseStreamName =
context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
- List<FlowFile> flowFiles = filterMessagesByMaxSize(session,
batchSize, maxBufferSizeBytes, firehoseStreamName,
- AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
+ List<FlowFile> flowFiles = filterMessagesByMaxSize(session,
batchSize, maxBufferSizeBytes, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
+ HashMap<String, List<FlowFile>> hashFlowFiles = new HashMap<>();
+ HashMap<String, List<Record>> recordHash = new HashMap<String,
List<Record>>();
final AmazonKinesisFirehoseClient client = getClient();
try {
- List<Record> records = new ArrayList<>();
-
List<FlowFile> failedFlowFiles = new ArrayList<>();
List<FlowFile> successfulFlowFiles = new ArrayList<>();
// Prepare batch of records
for (int i = 0; i < flowFiles.size(); i++) {
FlowFile flowFile = flowFiles.get(i);
+ final String firehoseStreamName =
context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
+
final ByteArrayOutputStream baos = new
ByteArrayOutputStream();
session.exportTo(flowFile, baos);
- records.add(new
Record().withData(ByteBuffer.wrap(baos.toByteArray())));
+
+ if ( !recordHash.containsKey(firehoseStreamName) )
--- End diff --
Thank you for following the pre-existing code style in this class file.
That's usually a good practice. However, `if` statements in NiFi code
typically have braces even for single-line contents, and there is typically no
space between parenthesis:
```
if (somevar == true) {
doStuff();
}
```
Would you please fix these up?
> Allow Expression Evaluation to Kinesis/Firehose Stream Name
> -----------------------------------------------------------
>
> Key: NIFI-4786
> URL: https://issues.apache.org/jira/browse/NIFI-4786
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 1.5.0
> Reporter: Dorian Bugeja
> Priority: Minor
> Labels: features, performance, pull-request-available
> Attachments:
> NIFI_4786___Allow_Expression_Evaluation_to_Kinesis_Firehose_Stream_Name_NIFI_4786___Allow_1.patch
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> Currenctly, the Stream name propoerty for both Firehose and Kinesis does not
> support the expression language. Routing can be performed based on an
> attribute of the flowfile and having a single component rather than multiple
> for each one.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)