This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5bd4d78223 NIFI-15545 Added current partition lag method to Kafka
Consumer Service (#10852)
5bd4d78223 is described below
commit 5bd4d7822346bfd68383aaf4ef64b4021093280c
Author: lkuchars <[email protected]>
AuthorDate: Fri Feb 6 16:32:03 2026 +0100
NIFI-15545 Added current partition lag method to Kafka Consumer Service
(#10852)
Signed-off-by: David Handermann <[email protected]>
---
.../service/api/consumer/KafkaConsumerService.java | 10 ++++
.../service/consumer/Kafka3ConsumerService.java | 22 +++++++--
.../service/Kafka3ConnectionServiceBaseIT.java | 53 ++++++++++++++++++----
3 files changed, 73 insertions(+), 12 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
index 3662b4ed10..1a0e36affc 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-api/src/main/java/org/apache/nifi/kafka/service/api/consumer/KafkaConsumerService.java
@@ -17,11 +17,13 @@
package org.apache.nifi.kafka.service.api.consumer;
import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.record.ByteRecord;
import java.io.Closeable;
import java.time.Duration;
import java.util.List;
+import java.util.OptionalLong;
/**
* Kafka Consumer Service must be closed to avoid leaking connection resources
@@ -57,4 +59,12 @@ public interface KafkaConsumerService extends Closeable {
* @return List of Partition State information
*/
List<PartitionState> getPartitionStates();
+
+ /**
+ * Get current lag (in records) for the specified topic partition
+ *
+ * @param topicPartitionSummary Topic Partition to query for consumer lag
+ * @return OptionalLong containing the current lag or empty when not
available
+ */
+ OptionalLong currentLag(TopicPartitionSummary topicPartitionSummary);
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
index 59ea23818a..250594c60c 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
@@ -43,9 +43,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
/**
* Kafka 3 Consumer Service implementation with Object Pooling for subscribed
Kafka Consumers
@@ -148,8 +150,10 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
final String topic = topics.next();
partitionStates = consumer.partitionsFor(topic)
.stream()
- .map(partitionInfo -> new
PartitionState(partitionInfo.topic(), partitionInfo.partition()))
- .collect(Collectors.toList());
+ .map(partitionInfo -> new PartitionState(
+ partitionInfo.topic(),
+ partitionInfo.partition()))
+ .collect(toList());
} else {
partitionStates = Collections.emptyList();
}
@@ -157,6 +161,18 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
return partitionStates;
}
+ @Override
+ public OptionalLong currentLag(final TopicPartitionSummary
topicPartitionSummary) {
+ final TopicPartition topicPartition = new
TopicPartition(topicPartitionSummary.getTopic(),
topicPartitionSummary.getPartition());
+ try {
+ return consumer.currentLag(topicPartition);
+ } catch (final IllegalStateException e) {
+ // this case can be pretty common during rebalancing or before
first poll call
+ componentLog.debug("Unable to fetch current lag for partition
{}-{}: {}", topicPartitionSummary.getTopic(),
topicPartitionSummary.getPartition(), e.getMessage());
+ return OptionalLong.empty();
+ }
+ }
+
@Override
public void close() {
closed = true;
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
index f7080c6885..9df87b5275 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
@@ -25,6 +25,7 @@ import
org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
import org.apache.nifi.kafka.service.api.consumer.PollingContext;
@@ -70,6 +71,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
@@ -78,6 +80,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.security.auth.x500.X500Principal;
+import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -91,10 +94,14 @@ public class Kafka3ConnectionServiceBaseIT {
public static final String IMAGE_NAME = "confluentinc/cp-kafka:7.8.6"; //
January 2026
- private static final String DYNAMIC_PROPERTY_KEY_PUBLISH =
"delivery.timeout.ms";
- private static final String DYNAMIC_PROPERTY_VALUE_PUBLISH = "60000";
- private static final String DYNAMIC_PROPERTY_KEY_CONSUME =
"fetch.max.wait.ms";
- private static final String DYNAMIC_PROPERTY_VALUE_CONSUME = "1000";
+ private static final String DELIVERY_TIMEOUT_MS_KEY =
"delivery.timeout.ms";
+ private static final String DELIVERY_TIMEOUT_MS_VALUE = "60000";
+ private static final String FETCH_MAX_WAIT_MS_KEY = "fetch.max.wait.ms";
+ private static final String FETCH_MAX_WAIT_MS_VALUE = "1000";
+ private static final String MAX_POLL_RECORDS_KEY = "max.poll.records";
+ private static final String MAX_POLL_RECORDS_VALUE = "10";
+ private static final Integer MAX_POLL_RECORDS_VALUE_INT =
Integer.valueOf(MAX_POLL_RECORDS_VALUE);
+
private static final String GROUP_ID =
Kafka3ConnectionService.class.getSimpleName();
@@ -205,8 +212,9 @@ public class Kafka3ConnectionServiceBaseIT {
final Map<String, String> properties = new LinkedHashMap<>();
properties.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS.getName(),
kafkaContainer.getBootstrapServers());
properties.put(Kafka3ConnectionService.CLIENT_TIMEOUT.getName(),
CLIENT_TIMEOUT);
- properties.put(DYNAMIC_PROPERTY_KEY_PUBLISH,
DYNAMIC_PROPERTY_VALUE_PUBLISH);
- properties.put(DYNAMIC_PROPERTY_KEY_CONSUME,
DYNAMIC_PROPERTY_VALUE_CONSUME);
+ properties.put(DELIVERY_TIMEOUT_MS_KEY, DELIVERY_TIMEOUT_MS_VALUE);
+ properties.put(FETCH_MAX_WAIT_MS_KEY, FETCH_MAX_WAIT_MS_VALUE);
+ properties.put(MAX_POLL_RECORDS_KEY, MAX_POLL_RECORDS_VALUE);
return properties;
}
@@ -234,7 +242,7 @@ public class Kafka3ConnectionServiceBaseIT {
void testProduceOneNoTransaction() {
final ProducerConfiguration producerConfiguration = new
ProducerConfiguration(false, null, null, null, null, 1_000_000);
final KafkaProducerService producerService =
service.getProducerService(producerConfiguration);
- final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null,
null, RECORD_VALUE, Collections.emptyList());
+ final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null,
null, RECORD_VALUE, emptyList());
final List<KafkaRecord> kafkaRecords =
Collections.singletonList(kafkaRecord);
producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC
+ "-produce", null, null, null));
final RecordSummary summary = producerService.complete();
@@ -245,7 +253,7 @@ public class Kafka3ConnectionServiceBaseIT {
void testProduceOneWithTransaction() {
final ProducerConfiguration producerConfiguration = new
ProducerConfiguration(true, "transaction-", null, null, null, 1_000_000);
final KafkaProducerService producerService =
service.getProducerService(producerConfiguration);
- final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null,
null, RECORD_VALUE, Collections.emptyList());
+ final KafkaRecord kafkaRecord = new KafkaRecord(null, null, null,
null, RECORD_VALUE, emptyList());
final List<KafkaRecord> kafkaRecords =
Collections.singletonList(kafkaRecord);
producerService.send(kafkaRecords.iterator(), new PublishContext(TOPIC
+ "-produce", null, null, null));
final RecordSummary summary = producerService.complete();
@@ -258,7 +266,7 @@ public class Kafka3ConnectionServiceBaseIT {
final KafkaProducerService producerService =
service.getProducerService(producerConfiguration);
final long timestamp = System.currentTimeMillis();
- final KafkaRecord kafkaRecord = new KafkaRecord(null, null, timestamp,
RECORD_KEY, RECORD_VALUE, Collections.emptyList());
+ final KafkaRecord kafkaRecord = new KafkaRecord(null, null, timestamp,
RECORD_KEY, RECORD_VALUE, emptyList());
final List<KafkaRecord> kafkaRecords =
Collections.singletonList(kafkaRecord);
producerService.send(kafkaRecords.iterator(), new
PublishContext(TOPIC, null, null, null));
final RecordSummary summary = producerService.complete();
@@ -285,6 +293,33 @@ public class Kafka3ConnectionServiceBaseIT {
assertFalse(consumerRecords.hasNext());
}
+ @Test
+ void testCurrentLag() {
+ final long timestamp = System.currentTimeMillis();
+ final String groupId = "Group_" + timestamp;
+ final String topic = "Topic_" + timestamp;
+ final int partition = 0;
+ final ProducerConfiguration producerConfiguration = new
ProducerConfiguration(false, null, null, null, null, 1_000_000);
+ final KafkaProducerService producerService =
service.getProducerService(producerConfiguration);
+
+ final KafkaRecord kafkaRecord = new KafkaRecord(null, partition,
timestamp, RECORD_KEY, RECORD_VALUE, emptyList());
+ final List<KafkaRecord> kafkaRecords = List.of(kafkaRecord);
+ final PollingContext pollingContext = new PollingContext(groupId,
Set.of(topic), AutoOffsetReset.EARLIEST);
+ final KafkaConsumerService consumerService =
service.getConsumerService(pollingContext);
+
+ // produce MAX_POLL_RECORDS_VALUE + 1 records, so that on next poll
consumer calculates the record lag to be 1
+ for (int i = 0; i < MAX_POLL_RECORDS_VALUE_INT + 1; i++) {
+ producerService.send(kafkaRecords.iterator(), new
PublishContext(topic, null, null, null));
+ }
+ // Consumer stats (including lag) will be available only after the
first poll
+ poll(consumerService);
+
+ final TopicPartitionSummary topicPartitionSummary = new
TopicPartitionSummary(topic, partition);
+ final OptionalLong currentLag =
consumerService.currentLag(topicPartitionSummary);
+ assertTrue(currentLag.isPresent());
+ assertEquals(1, currentLag.getAsLong());
+ }
+
@Test
void testVerifySuccessful() {
final String bootstrapServers = kafkaContainer.getBootstrapServers();