This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bd4d78223 NIFI-15545 Added current partition lag method to Kafka 
Consumer Service (#10852)
5bd4d78223 is described below

commit 5bd4d7822346bfd68383aaf4ef64b4021093280c
Author: lkuchars <[email protected]>
AuthorDate: Fri Feb 6 16:32:03 2026 +0100

    NIFI-15545 Added current partition lag method to Kafka Consumer Service 
(#10852)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../service/api/consumer/KafkaConsumerService.java | 10 ++++
 .../service/consumer/Kafka3ConsumerService.java    | 22 +++++++--
 .../service/Kafka3ConnectionServiceBaseIT.java     | 53 ++++++++++++++++++----
 3 files changed, 73 insertions(+), 12 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
index 3662b4ed10..1a0e36affc 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
@@ -17,11 +17,13 @@
 package org.apache.nifi.kafka.service.api.consumer;
 
 import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
 import org.apache.nifi.kafka.service.api.record.ByteRecord;
 
 import java.io.Closeable;
 import java.time.Duration;
 import java.util.List;
+import java.util.OptionalLong;
 
 /**
  * Kafka Consumer Service must be closed to avoid leaking connection resources
@@ -57,4 +59,12 @@ public interface KafkaConsumerService extends Closeable {
      * @return List of Partition State information
      */
     List<PartitionState> getPartitionStates();
+
+    /**
+     * Get current lag (in records) for the specified topic partition
+     *
+     * @param topicPartitionSummary Topic Partition to query for consumer lag
+     * @return OptionalLong containing the current lag or empty when not 
available
+     */
+    OptionalLong currentLag(TopicPartitionSummary topicPartitionSummary);
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
index 59ea23818a..250594c60c 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
@@ -43,9 +43,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
 
 /**
  * Kafka 3 Consumer Service implementation with Object Pooling for subscribed 
Kafka Consumers
@@ -148,8 +150,10 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
             final String topic = topics.next();
             partitionStates = consumer.partitionsFor(topic)
                 .stream()
-                .map(partitionInfo -> new 
PartitionState(partitionInfo.topic(), partitionInfo.partition()))
-                .collect(Collectors.toList());
+                .map(partitionInfo -> new PartitionState(
+                        partitionInfo.topic(),
+                        partitionInfo.partition()))
+                .collect(toList());
         } else {
             partitionStates = Collections.emptyList();
         }
@@ -157,6 +161,18 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
         return partitionStates;
     }
 
+    @Override
+    public OptionalLong currentLag(final TopicPartitionSummary 
topicPartitionSummary) {
+        final TopicPartition topicPartition = new 
TopicPartition(topicPartitionSummary.getTopic(), 
topicPartitionSummary.getPartition());
+        try {
+            return consumer.currentLag(topicPartition);
+        } catch (final IllegalStateException e) {
+            // this case can be pretty common during rebalancing or before 
first poll call
+            componentLog.debug("Unable to fetch current lag for partition 
{}-{}: {}", topicPartitionSummary.getTopic(), 
topicPartitionSummary.getPartition(), e.getMessage());
+            return OptionalLong.empty();
+        }
+    }
+
     @Override
     public void close() {
         closed = true;
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
index f7080c6885..9df87b5275 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
 import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
 import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
 import org.apache.nifi.kafka.service.api.consumer.PollingContext;
@@ -70,6 +71,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
@@ -78,6 +80,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.security.auth.x500.X500Principal;
 
+import static java.util.Collections.emptyList;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -91,10 +94,14 @@ public class Kafka3ConnectionServiceBaseIT {
 
     public static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.6"; // 
January 2026
 
-    private static final String DYNAMIC_PROPERTY_KEY_PUBLISH = 
"delivery.timeout.ms";
-    private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
-    private static final String DYNAMIC_PROPERTY_KEY_CONSUME = 
"fetch.max.wait.ms";
-    private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+    private static final String DELIVERY_TIMEOUT_MS_KEY = 
"delivery.timeout.ms";
+    private static final String DELIVERY_TIMEOUT_MS_VALUE = "60000";
+    private static final String FETCH_MAX_WAIT_MS_KEY = "fetch.max.wait.ms";
+    private static final String FETCH_MAX_WAIT_MS_VALUE = "1000";
+    private static final String MAX_POLL_RECORDS_KEY = "max.poll.records";
+    private static final String MAX_POLL_RECORDS_VALUE = "10";
+    private static final Integer MAX_POLL_RECORDS_VALUE_INT = 
Integer.valueOf(MAX_POLL_RECORDS_VALUE);
+
 
     private static final String GROUP_ID = 
Kafka3ConnectionService.class.getSimpleName();
 
@@ -205,8 +212,9 @@ public class Kafka3ConnectionServiceBaseIT {
         final Map<String, String> properties = new LinkedHashMap<>();
         properties.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS.getName(), 
kafkaContainer.getBootstrapServers());
         properties.put(Kafka3ConnectionService.CLIENT_TIMEOUT.getName(), 
CLIENT_TIMEOUT);
-        properties.put(DYNAMIC_PROPERTY_KEY_PUBLISH, 
DYNAMIC_PROPERTY_VALUE_PUBLISH);
-        properties.put(DYNAMIC_PROPERTY_KEY_CONSUME, 
DYNAMIC_PROPERTY_VALUE_CONSUME);
+        properties.put(DELIVERY_TIMEOUT_MS_KEY, DELIVERY_TIMEOUT_MS_VALUE);
+        properties.put(FETCH_MAX_WAIT_MS_KEY, FETCH_MAX_WAIT_MS_VALUE);
+        properties.put(MAX_POLL_RECORDS_KEY, MAX_POLL_RECORDS_VALUE);
         return properties;
     }
 
@@ -234,7 +242,7 @@ public class Kafka3ConnectionServiceBaseIT {
     void testProduceOneNoTransaction() {
         final ProducerConfiguration producerConfiguration = new 
ProducerConfiguration(false, null, null, null, null, 1_000_000);
         final KafkaProducerService producerService = 
service.getProducerService(producerConfiguration);
-        final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, 
null, RECORD_VALUE, Collections.emptyList());
+        final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, 
null, RECORD_VALUE, emptyList());
         final List<KafkaRecord> kafkaRecords = 
Collections.singletonList(kafkaRecord);
         producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC 
+ "-produce", null, null, null));
         final RecordSummary summary = producerService.complete();
@@ -245,7 +253,7 @@ public class Kafka3ConnectionServiceBaseIT {
     void testProduceOneWithTransaction() {
         final ProducerConfiguration producerConfiguration = new 
ProducerConfiguration(true, "transaction-", null, null, null, 1_000_000);
         final KafkaProducerService producerService = 
service.getProducerService(producerConfiguration);
-        final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, 
null, RECORD_VALUE, Collections.emptyList());
+        final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null, 
null, RECORD_VALUE, emptyList());
         final List<KafkaRecord> kafkaRecords = 
Collections.singletonList(kafkaRecord);
         producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC 
+ "-produce", null, null, null));
         final RecordSummary summary = producerService.complete();
@@ -258,7 +266,7 @@ public class Kafka3ConnectionServiceBaseIT {
         final KafkaProducerService producerService = 
service.getProducerService(producerConfiguration);
 
         final long timestamp = System.currentTimeMillis();
-        final KafkaRecord kafkaRecord = new KafkaRecord(null, null, timestamp, 
RECORD_KEY, RECORD_VALUE, Collections.emptyList());
+        final KafkaRecord kafkaRecord = new KafkaRecord(null, null, timestamp, 
RECORD_KEY, RECORD_VALUE, emptyList());
         final List<KafkaRecord> kafkaRecords = 
Collections.singletonList(kafkaRecord);
         producerService.send(kafkaRecords.iterator(), new 
PublishContext(TOPIC, null, null, null));
         final RecordSummary summary = producerService.complete();
@@ -285,6 +293,33 @@ public class Kafka3ConnectionServiceBaseIT {
         assertFalse(consumerRecords.hasNext());
     }
 
+    @Test
+    void testCurrentLag() {
+        final long timestamp = System.currentTimeMillis();
+        final String groupId = "Group_" + timestamp;
+        final String topic = "Topic_" + timestamp;
+        final int partition = 0;
+        final ProducerConfiguration producerConfiguration = new 
ProducerConfiguration(false, null, null, null, null, 1_000_000);
+        final KafkaProducerService producerService = 
service.getProducerService(producerConfiguration);
+
+        final KafkaRecord kafkaRecord = new KafkaRecord(null, partition, 
timestamp, RECORD_KEY, RECORD_VALUE, emptyList());
+        final List<KafkaRecord> kafkaRecords = List.of(kafkaRecord);
+        final PollingContext pollingContext = new PollingContext(groupId, 
Set.of(topic), AutoOffsetReset.EARLIEST);
+        final KafkaConsumerService consumerService = 
service.getConsumerService(pollingContext);
+
+        // produce MAX_POLL_RECORDS_VALUE + 1 records, so that on next poll 
consumer calculates the record lag to be 1
+        for (int i = 0; i < MAX_POLL_RECORDS_VALUE_INT + 1; i++) {
+            producerService.send(kafkaRecords.iterator(), new 
PublishContext(topic, null, null, null));
+        }
+        // Consumer stats (including lag) will be available only after the 
first poll
+        poll(consumerService);
+
+        final TopicPartitionSummary topicPartitionSummary = new 
TopicPartitionSummary(topic, partition);
+        final OptionalLong currentLag = 
consumerService.currentLag(topicPartitionSummary);
+        assertTrue(currentLag.isPresent());
+        assertEquals(1, currentLag.getAsLong());
+    }
+
     @Test
     void testVerifySuccessful() {
         final String bootstrapServers = kafkaContainer.getBootstrapServers();

Reply via email to