lkuchars commented on code in PR #10881:
URL: https://github.com/apache/nifi/pull/10881#discussion_r2822673105
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -696,20 +705,28 @@ private void checkInitializationResult(final
InitializationResult initialization
}
}
+
+
private void processRecordsFromBuffer(final ProcessSession session, final
Lease lease) {
try {
- final List<KinesisClientRecord> records =
recordBuffer.consumeRecords(lease);
+ final ConsumeRecordsResult result =
recordBuffer.consumeRecords(lease);
+ final List<KinesisClientRecord> records = result.records();
+
+ final String shardId = lease.shardId();
+ final Long millisBehindLatest = result.millisBehindLatest();
+ if (millisBehindLatest != null) {
+
session.recordGauge(makeMillisBehindLatestGaugeName(streamName, shardId),
millisBehindLatest, CommitTiming.SESSION_COMMITTED);
+ }
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]