awelless commented on code in PR #10881:
URL: https://github.com/apache/nifi/pull/10881#discussion_r2817792182


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -696,20 +705,28 @@ private void checkInitializationResult(final 
InitializationResult initialization
         }
     }
 
+
+
     private void processRecordsFromBuffer(final ProcessSession session, final 
Lease lease) {
         try {
-            final List<KinesisClientRecord> records = 
recordBuffer.consumeRecords(lease);
+            final ConsumeRecordsResult result = 
recordBuffer.consumeRecords(lease);
+            final List<KinesisClientRecord> records = result.records();
+
+            final String shardId = lease.shardId();
+            final Long millisBehindLatest = result.millisBehindLatest();
+            if (millisBehindLatest != null) {
+                
session.recordGauge(makeMillisBehindLatestGaugeName(streamName, shardId), 
millisBehindLatest, CommitTiming.SESSION_COMMITTED);
+            }

Review Comment:
   Extreme nitpicking. Can we put it near `session.adjustCounter` below? So 
that all counter and gauges are updated in a single place.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -761,7 +770,8 @@ public void processRecords(final ProcessRecordsInput 
processRecordsInput) {
             if (bufferId == null) {
                 throw new IllegalStateException("Buffer ID not found: Record 
Processor not initialized");
             }
-            recordBuffer.addRecords(bufferId, processRecordsInput.records(), 
processRecordsInput.checkpointer());
+            recordBuffer.addRecords(bufferId, processRecordsInput.records(),
+                    processRecordsInput.checkpointer(), 
processRecordsInput.millisBehindLatest());

Review Comment:
   Could you mention this gauge and a meaning behind it in [this 
file](https://github.com/apache/nifi/blob/main/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/resources/docs/org.apache.nifi.processors.aws.kinesis.ConsumeKinesis/additionalDetails.md)?
 Then it will be visible in processor documentation in NiFi



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -145,6 +146,8 @@ Uses DynamoDB for check pointing and coordination, and 
(optional) CloudWatch for
                 description = "A SubSequence Number of the last Kinesis Record 
in the FlowFile. Generated by KPL when aggregating records into a single 
Kinesis Record"),
         @WritesAttribute(attribute = APPROXIMATE_ARRIVAL_TIMESTAMP,
                 description = "Approximate arrival timestamp of the last 
Kinesis Record in the FlowFile"),
+        @WritesAttribute(attribute = MILLIS_BEHIND_LATEST,
+                description = "Milliseconds behind the latest record in the 
shard at the time records were consumed"),

Review Comment:
   If this attribute will be needed downstream, no problems.
   But I wouldn't put it in FlowFile attributes if it's only caused by the 
gauge name format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to