This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ffaa98c2ea NIFI-14752 Added Max Uncommitted Size to ConsumeKafka
ffaa98c2ea is described below

commit ffaa98c2ea652d694c5926f444851dcfc399020c
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Jul 14 21:26:25 2025 -0500

    NIFI-14752 Added Max Uncommitted Size to ConsumeKafka
    
    - Updated Offset Tracker to calculate Total Record Size based on Kafka 
Consumer Record key and value length
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10093.
---
 .../nifi/kafka/processors/ConsumeKafkaIT.java      | 53 ++++++++++++++++++++++
 .../apache/nifi/kafka/processors/ConsumeKafka.java | 39 +++++++++++++++-
 .../kafka/processors/consumer/OffsetTracker.java   | 12 +++++
 3 files changed, 102 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index e85eaed9ed..03523a8cf3 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -19,7 +19,9 @@ package org.apache.nifi.kafka.processors;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.service.Kafka3ConnectionService;
 import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
 import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
 import org.apache.nifi.reporting.InitializationException;
@@ -28,14 +30,18 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
@@ -199,4 +205,51 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
 
         runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
     }
+
+    @Timeout(5)
+    @Test
+    void testMaxUncommittedSize() throws InterruptedException, 
ExecutionException {
+        final String topic = UUID.randomUUID().toString();
+        final String groupId = topic.substring(0, topic.indexOf("-"));
+
+        final int recordCount = 100;
+        final int maxPollRecords = 10;
+        final int flowFilesExpected = recordCount / maxPollRecords;
+
+        final int recordSize = RECORD_VALUE.length();
+        final int maxUncommittedSize = recordSize * recordCount;
+
+        // Adjust Poll Records for this method
+        final ControllerService connectionService = 
runner.getControllerService(CONNECTION_SERVICE_ID);
+        runner.disableControllerService(connectionService);
+        runner.setProperty(connectionService, 
Kafka3ConnectionService.MAX_POLL_RECORDS, Integer.toString(maxPollRecords));
+        runner.enableControllerService(connectionService);
+
+        runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
+        runner.setProperty(ConsumeKafka.TOPICS, topic);
+        runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, 
ProcessingStrategy.DEMARCATOR.getValue());
+        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, 
System.lineSeparator());
+
+        // Set Uncommitted Size so that processing completes before 5 second 
test timeout expires
+        runner.setProperty(ConsumeKafka.MAX_UNCOMMITTED_SIZE, "%d 
B".formatted(maxUncommittedSize));
+        runner.setProperty(ConsumeKafka.MAX_UNCOMMITTED_TIME, "5 s");
+
+        final Collection<ProducerRecord<String, String>> records = new 
ArrayList<>();
+        for (int i = 0; i < recordCount; i++) {
+            records.add(new ProducerRecord<>(topic, RECORD_VALUE));
+        }
+        produce(topic, records);
+
+        runner.run();
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS);
+        assertEquals(flowFilesExpected, flowFiles.size());
+
+        long totalFlowFileSize = 0;
+        for (final MockFlowFile flowFile : flowFiles) {
+            totalFlowFileSize += flowFile.getSize();
+        }
+
+        assertTrue(totalFlowFileSize > maxUncommittedSize, "Total FlowFile 
Size [%d] less than Max Uncommitted Size [%d]".formatted(totalFlowFileSize, 
maxUncommittedSize));
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 8d5dcf8175..20ca6a9a1c 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.kafka.processors.common.KafkaUtils;
@@ -49,6 +50,7 @@ import org.apache.nifi.kafka.shared.property.KeyFormat;
 import org.apache.nifi.kafka.shared.property.OutputStrategy;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -158,10 +160,26 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             .defaultValue("true")
             .build();
 
+    static final PropertyDescriptor MAX_UNCOMMITTED_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Uncommitted Size")
+            .description("""
+                    Maximum total size of records to consume from Kafka before 
transferring FlowFiles to an output
+                    relationship. Evaluated when specified based on the size 
of serialized keys and values from each
+                    Kafka record, before reaching the [Max Uncommitted Time].
+                    """
+            )
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
     static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new 
PropertyDescriptor.Builder()
             .name("Max Uncommitted Time")
-            .description("Specifies the maximum amount of time that the 
Processor can consume from Kafka before it must transfer FlowFiles on " +
-                    "through the flow and commit the offsets to Kafka (if 
appropriate). A larger time period can result in longer latency.")
+            .description("""
+                    Maximum amount of time to spend consuming records from 
Kafka before transferring FlowFiles to an
+                    output relationship. Longer amounts of time may produce 
larger FlowFiles and increase processing
+                    latency for individual records.
+                    """
+            )
             .required(true)
             .defaultValue("100 millis")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@@ -280,6 +298,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             TOPICS,
             AUTO_OFFSET_RESET,
             COMMIT_OFFSETS,
+            MAX_UNCOMMITTED_SIZE,
             MAX_UNCOMMITTED_TIME,
             HEADER_NAME_PATTERN,
             HEADER_ENCODING,
@@ -308,6 +327,8 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
     private volatile String brokerUri;
     private volatile PollingContext pollingContext;
     private volatile int maxConsumerCount;
+    private volatile boolean maxUncommittedSizeConfigured;
+    private volatile long maxUncommittedSize;
 
     private final Queue<KafkaConsumerService> consumerServices = new 
LinkedBlockingQueue<>();
     private final AtomicInteger activeConsumerCount = new AtomicInteger();
@@ -351,6 +372,12 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
         brokerUri = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class).getBrokerUri();
         maxConsumerCount = context.getMaxConcurrentTasks();
         activeConsumerCount.set(0);
+
+        final PropertyValue maxUncommittedSizeProperty = 
context.getProperty(MAX_UNCOMMITTED_SIZE);
+        maxUncommittedSizeConfigured = maxUncommittedSizeProperty.isSet();
+        if (maxUncommittedSizeConfigured) {
+            maxUncommittedSize = 
maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue();
+        }
     }
 
     @OnStopped
@@ -393,6 +420,14 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
                 recordsReceived = true;
                 processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
+
+                if (maxUncommittedSizeConfigured) {
+                    // Stop consuming before reaching Max Uncommitted Time 
when exceeding Max Uncommitted Size
+                    final long totalRecordSize = 
offsetTracker.getTotalRecordSize();
+                    if (totalRecordSize > maxUncommittedSize) {
+                        break;
+                    }
+                }
             } catch (final Exception e) {
                 getLogger().error("Failed to consume Kafka Records", e);
                 consumerService.rollback();
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
index 03b0b8c29b..e421b9c5b5 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
@@ -24,10 +24,12 @@ import org.apache.nifi.kafka.service.api.record.ByteRecord;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class OffsetTracker {
     private final Map<TopicPartitionSummary, OffsetSummary> offsets = new 
HashMap<>();
     private final Map<String, Long> recordCounts = new HashMap<>();
+    private final AtomicLong totalRecordSize = new AtomicLong();
 
     public void update(final ByteRecord consumerRecord) {
         final TopicPartitionSummary topicPartitionSummary = new 
TopicPartitionSummary(consumerRecord.getTopic(), consumerRecord.getPartition());
@@ -35,6 +37,16 @@ public class OffsetTracker {
         final OffsetSummary offsetSummary = 
offsets.computeIfAbsent(topicPartitionSummary, (summary) -> new 
OffsetSummary(offset));
         offsetSummary.setOffset(offset);
         recordCounts.merge(consumerRecord.getTopic(), 
consumerRecord.getBundledCount(), Long::sum);
+
+        // Update Total Record Size with Key and Value length
+        consumerRecord.getKey()
+                .map(key -> key.length)
+                .ifPresent(totalRecordSize::addAndGet);
+        totalRecordSize.addAndGet(consumerRecord.getValue().length);
+    }
+
+    public long getTotalRecordSize() {
+        return totalRecordSize.get();
     }
 
     public Map<String, Long> getRecordCounts() {

Reply via email to