This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b43be1d0e4 NIFI-15563 Added Current Lag gauge recording to ConsumeKafka
b43be1d0e4 is described below

commit b43be1d0e462ead0c593d65c1a15107fb70e3ae0
Author: Lukas Kucharski <[email protected]>
AuthorDate: Fri Feb 6 21:31:53 2026 +0100

    NIFI-15563 Added Current Lag gauge recording to ConsumeKafka
    
    This closes #10880
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/kafka/processors/ConsumeKafkaIT.java      | 34 ++++++++++
 .../apache/nifi/kafka/processors/ConsumeKafka.java | 73 +++++++++++++++++++++-
 2 files changed, 106 insertions(+), 1 deletion(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index e4e12898e3..8bed57a9a9 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -40,6 +40,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -52,6 +53,8 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
 
     private static final long FIRST_OFFSET = 0;
 
+    private static final String METRIC_CURRENT_LAG_RECORDS = 
"topic.%s.partition.%d.currentLagRecords";
+
     private TestRunner runner;
 
     @BeforeEach
@@ -254,6 +257,37 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
         runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
     }
 
+    @Test
+    @Timeout(30)
+    public void testLagReporting() throws ExecutionException, 
InterruptedException {
+        final String topic = "testLagReporting";
+        final int partition = 0;
+        final int maxPollRecords = 2;
+
+        runner.setProperty(ConsumeKafka.GROUP_ID, "testLagReportingGroup");
+        runner.setProperty(ConsumeKafka.TOPICS, topic);
+        runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, 
ProcessingStrategy.FLOW_FILE.getValue());
+        runner.setProperty(ConsumeKafka.MAX_UNCOMMITTED_TIME, "1000 millis");
+
+        final ControllerService connectionService = 
runner.getControllerService(CONNECTION_SERVICE_ID);
+        runner.disableControllerService(connectionService);
+        runner.setProperty(connectionService, 
Kafka3ConnectionService.MAX_POLL_RECORDS, Integer.toString(maxPollRecords));
+        runner.enableControllerService(connectionService);
+
+        final List<ProducerRecord<String, String>> records = 
IntStream.range(0, 1000)
+                .mapToObj(i -> new ProducerRecord<>(topic, partition, "key" + 
i, "val" + i))
+                .toList();
+        produce(topic, records);
+
+        final String gaugeName = METRIC_CURRENT_LAG_RECORDS.formatted(topic, 
partition);
+        runner.run(1, false, true);
+
+        // consume until at least one gauge with lag gets recorded
+        while (runner.getGaugeValues(gaugeName).isEmpty()) {
+            runner.run(1, false, false);
+        }
+    }
+
     @Timeout(5)
     @Test
     void testMaxUncommittedSize() throws InterruptedException, 
ExecutionException {
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index b92bd862a3..f56cb2d089 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -41,6 +41,7 @@ import 
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessag
 import 
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
 import org.apache.nifi.kafka.service.api.KafkaConnectionService;
 import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
 import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
 import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
 import org.apache.nifi.kafka.service.api.consumer.PollingContext;
@@ -57,6 +58,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.metrics.CommitTiming;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -68,9 +70,11 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -330,6 +334,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
     private static final Set<Relationship> SUCCESS_RELATIONSHIP = 
Set.of(SUCCESS);
     private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = 
Set.of(SUCCESS, PARSE_FAILURE);
+    private static final String METRIC_CURRENT_LAG_RECORDS = 
"topic.%s.partition.%d.currentLagRecords";
 
     private volatile Charset headerEncoding;
     private volatile Pattern headerNamePattern;
@@ -432,7 +437,8 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
                     break;
                 }
 
-                final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
+                final TopicPartitionScanningIterator consumerRecords = new 
TopicPartitionScanningIterator(consumerService.poll(maxWaitDuration).iterator());
+
                 if (!consumerRecords.hasNext()) {
                     getLogger().trace("No Kafka Records consumed: {}", 
pollingContext);
                     // Check if a rebalance occurred during poll - if so, 
break to commit what we have
@@ -445,6 +451,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
                 recordsReceived = true;
                 processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
+                reportCurrentLag(consumerService, session, 
consumerRecords.getTopicPartitionSummaries());
 
                 // Check if a rebalance occurred during poll - if so, break to 
commit what we have
                 if (consumerService.hasRevokedPartitions()) {
@@ -498,6 +505,20 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             });
     }
 
+    private void reportCurrentLag(final KafkaConsumerService consumerService, 
final ProcessSession session, final Set<TopicPartitionSummary> 
topicPartitionSummaries) {
+        for (final TopicPartitionSummary topicPartitionSummary : 
topicPartitionSummaries) {
+            final OptionalLong currentLag = 
consumerService.currentLag(topicPartitionSummary);
+            if (currentLag.isPresent()) {
+                final String gaugeName = 
getCurrentLagGaugeName(topicPartitionSummary);
+                session.recordGauge(gaugeName, currentLag.getAsLong(), 
CommitTiming.NOW);
+            }
+        }
+    }
+
+    private String getCurrentLagGaugeName(final TopicPartitionSummary 
topicPartitionSummary) {
+        return 
METRIC_CURRENT_LAG_RECORDS.formatted(topicPartitionSummary.getTopic(), 
topicPartitionSummary.getPartition());
+    }
+
     private void commitOffsets(final KafkaConsumerService consumerService, 
final OffsetTracker offsetTracker, final PollingContext pollingContext, final 
ProcessSession session) {
         try {
             if (commitOffsets) {
@@ -686,4 +707,54 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
         return pollingContext;
     }
+
+    static class TopicPartitionScanningIterator implements 
Iterator<ByteRecord> {
+
+        private final Iterator<ByteRecord> iterator;
+        private final Set<TopicPartitionSummary> topicPartitionSummaries = new 
HashSet<>();
+        private TopicPartitionSummary lastTopicPartition;
+
+        TopicPartitionScanningIterator(final Iterator<ByteRecord> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
+        }
+
+        @Override
+        public ByteRecord next() {
+            final ByteRecord record = iterator.next();
+            // Avoid unnecessary TopicPartitionSummary object creation
+            if (isNewTopicPartition(record)) {
+                final TopicPartitionSummary summary = new 
TopicPartitionSummary(record.getTopic(), record.getPartition());
+                topicPartitionSummaries.add(summary);
+                lastTopicPartition = summary;
+            }
+            return record;
+        }
+
+        public Set<TopicPartitionSummary> getTopicPartitionSummaries() {
+            return topicPartitionSummaries;
+        }
+
+        private boolean isNewTopicPartition(final ByteRecord record) {
+            final String topic = record.getTopic();
+            final int partition = record.getPartition();
+
+            final boolean newTopicPartition;
+            if (lastTopicPartition == null) {
+                newTopicPartition = true;
+            } else {
+                if (lastTopicPartition.getTopic().equals(topic)) {
+                    newTopicPartition = lastTopicPartition.getPartition() != 
partition;
+                } else {
+                    newTopicPartition = true;
+                }
+            }
+
+            return newTopicPartition;
+        }
+    }
 }

Reply via email to