This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 6be2570b2eb NIFI-15687 Reverted Current Lag gauge recording in 
ConsumeKafka (#10984)
6be2570b2eb is described below

commit 6be2570b2ebeec60a946e50343d6de779f8576db
Author: David Handermann <[email protected]>
AuthorDate: Mon Mar 9 12:27:43 2026 -0500

    NIFI-15687 Reverted Current Lag gauge recording in ConsumeKafka (#10984)
    
    This reverts commit b43be1d0e462ead0c593d65c1a15107fb70e3ae0
---
 .../nifi/kafka/processors/ConsumeKafkaIT.java      | 34 ----------
 .../apache/nifi/kafka/processors/ConsumeKafka.java | 73 +---------------------
 2 files changed, 1 insertion(+), 106 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index 8bed57a9a95..e4e12898e38 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -40,7 +40,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -53,8 +52,6 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
 
     private static final long FIRST_OFFSET = 0;
 
-    private static final String METRIC_CURRENT_LAG_RECORDS = 
"topic.%s.partition.%d.currentLagRecords";
-
     private TestRunner runner;
 
     @BeforeEach
@@ -257,37 +254,6 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
         runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
     }
 
-    @Test
-    @Timeout(30)
-    public void testLagReporting() throws ExecutionException, 
InterruptedException {
-        final String topic = "testLagReporting";
-        final int partition = 0;
-        final int maxPollRecords = 2;
-
-        runner.setProperty(ConsumeKafka.GROUP_ID, "testLagReportingGroup");
-        runner.setProperty(ConsumeKafka.TOPICS, topic);
-        runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, 
ProcessingStrategy.FLOW_FILE.getValue());
-        runner.setProperty(ConsumeKafka.MAX_UNCOMMITTED_TIME, "1000 millis");
-
-        final ControllerService connectionService = 
runner.getControllerService(CONNECTION_SERVICE_ID);
-        runner.disableControllerService(connectionService);
-        runner.setProperty(connectionService, 
Kafka3ConnectionService.MAX_POLL_RECORDS, Integer.toString(maxPollRecords));
-        runner.enableControllerService(connectionService);
-
-        final List<ProducerRecord<String, String>> records = 
IntStream.range(0, 1000)
-                .mapToObj(i -> new ProducerRecord<>(topic, partition, "key" + 
i, "val" + i))
-                .toList();
-        produce(topic, records);
-
-        final String gaugeName = METRIC_CURRENT_LAG_RECORDS.formatted(topic, 
partition);
-        runner.run(1, false, true);
-
-        // consume until at least one gauge with lag gets recorded
-        while (runner.getGaugeValues(gaugeName).isEmpty()) {
-            runner.run(1, false, false);
-        }
-    }
-
     @Timeout(5)
     @Test
     void testMaxUncommittedSize() throws InterruptedException, 
ExecutionException {
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index f56cb2d0899..b92bd862a36 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -41,7 +41,6 @@ import 
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessag
 import 
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
 import org.apache.nifi.kafka.service.api.KafkaConnectionService;
 import org.apache.nifi.kafka.service.api.common.PartitionState;
-import org.apache.nifi.kafka.service.api.common.TopicPartitionSummary;
 import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
 import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
 import org.apache.nifi.kafka.service.api.consumer.PollingContext;
@@ -58,7 +57,6 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.metrics.CommitTiming;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -70,11 +68,9 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -334,7 +330,6 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
     private static final Set<Relationship> SUCCESS_RELATIONSHIP = 
Set.of(SUCCESS);
     private static final Set<Relationship> SUCCESS_FAILURE_RELATIONSHIPS = 
Set.of(SUCCESS, PARSE_FAILURE);
-    private static final String METRIC_CURRENT_LAG_RECORDS = 
"topic.%s.partition.%d.currentLagRecords";
 
     private volatile Charset headerEncoding;
     private volatile Pattern headerNamePattern;
@@ -437,8 +432,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
                     break;
                 }
 
-                final TopicPartitionScanningIterator consumerRecords = new 
TopicPartitionScanningIterator(consumerService.poll(maxWaitDuration).iterator());
-
+                final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
                 if (!consumerRecords.hasNext()) {
                     getLogger().trace("No Kafka Records consumed: {}", 
pollingContext);
                     // Check if a rebalance occurred during poll - if so, 
break to commit what we have
@@ -451,7 +445,6 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
                 recordsReceived = true;
                 processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
-                reportCurrentLag(consumerService, session, 
consumerRecords.getTopicPartitionSummaries());
 
                 // Check if a rebalance occurred during poll - if so, break to 
commit what we have
                 if (consumerService.hasRevokedPartitions()) {
@@ -505,20 +498,6 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             });
     }
 
-    private void reportCurrentLag(final KafkaConsumerService consumerService, 
final ProcessSession session, final Set<TopicPartitionSummary> 
topicPartitionSummaries) {
-        for (final TopicPartitionSummary topicPartitionSummary : 
topicPartitionSummaries) {
-            final OptionalLong currentLag = 
consumerService.currentLag(topicPartitionSummary);
-            if (currentLag.isPresent()) {
-                final String gaugeName = 
getCurrentLagGaugeName(topicPartitionSummary);
-                session.recordGauge(gaugeName, currentLag.getAsLong(), 
CommitTiming.NOW);
-            }
-        }
-    }
-
-    private String getCurrentLagGaugeName(final TopicPartitionSummary 
topicPartitionSummary) {
-        return 
METRIC_CURRENT_LAG_RECORDS.formatted(topicPartitionSummary.getTopic(), 
topicPartitionSummary.getPartition());
-    }
-
     private void commitOffsets(final KafkaConsumerService consumerService, 
final OffsetTracker offsetTracker, final PollingContext pollingContext, final 
ProcessSession session) {
         try {
             if (commitOffsets) {
@@ -707,54 +686,4 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
         return pollingContext;
     }
-
-    static class TopicPartitionScanningIterator implements 
Iterator<ByteRecord> {
-
-        private final Iterator<ByteRecord> iterator;
-        private final Set<TopicPartitionSummary> topicPartitionSummaries = new 
HashSet<>();
-        private TopicPartitionSummary lastTopicPartition;
-
-        TopicPartitionScanningIterator(final Iterator<ByteRecord> iterator) {
-            this.iterator = iterator;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public ByteRecord next() {
-            final ByteRecord record = iterator.next();
-            // Avoid unnecessary TopicPartitionSummary object creation
-            if (isNewTopicPartition(record)) {
-                final TopicPartitionSummary summary = new 
TopicPartitionSummary(record.getTopic(), record.getPartition());
-                topicPartitionSummaries.add(summary);
-                lastTopicPartition = summary;
-            }
-            return record;
-        }
-
-        public Set<TopicPartitionSummary> getTopicPartitionSummaries() {
-            return topicPartitionSummaries;
-        }
-
-        private boolean isNewTopicPartition(final ByteRecord record) {
-            final String topic = record.getTopic();
-            final int partition = record.getPartition();
-
-            final boolean newTopicPartition;
-            if (lastTopicPartition == null) {
-                newTopicPartition = true;
-            } else {
-                if (lastTopicPartition.getTopic().equals(topic)) {
-                    newTopicPartition = lastTopicPartition.getPartition() != 
partition;
-                } else {
-                    newTopicPartition = true;
-                }
-            }
-
-            return newTopicPartition;
-        }
-    }
 }

Reply via email to