awelless commented on code in PR #10881:
URL: https://github.com/apache/nifi/pull/10881#discussion_r2813078196
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -504,17 +503,13 @@ ConsumeRecordsResult consumeRecords() {
}
final List<KinesisClientRecord> recordsToConsume = new
ArrayList<>();
+ Long lastMillisBehindLatest = null;
for (final RecordBatch batch : inProgressBatches) {
recordsToConsume.addAll(batch.records());
+ lastMillisBehindLatest = batch.millisBehindLatest;
Review Comment:
Shall we update it only if `batch.millisBehindLatest` is not null? Is it
better to have some value for an older batch, or a `null` value for a newer
batch?
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -145,6 +146,8 @@ Uses DynamoDB for check pointing and coordination, and
(optional) CloudWatch for
description = "A SubSequence Number of the last Kinesis Record
in the FlowFile. Generated by KPL when aggregating records into a single
Kinesis Record"),
@WritesAttribute(attribute = APPROXIMATE_ARRIVAL_TIMESTAMP,
description = "Approximate arrival timestamp of the last
Kinesis Record in the FlowFile"),
+ @WritesAttribute(attribute = MILLIS_BEHIND_LATEST,
+ description = "Milliseconds behind the latest record in the
shard at the time records were consumed"),
Review Comment:
Originally the issue was about writing millis behind latest in a gauge.
Do we need to write it to FlowFile attributes too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]