This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ffaa98c2ea NIFI-14752 Added Max Uncommitted Size to ConsumeKafka
ffaa98c2ea is described below
commit ffaa98c2ea652d694c5926f444851dcfc399020c
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Jul 14 21:26:25 2025 -0500
NIFI-14752 Added Max Uncommitted Size to ConsumeKafka
- Updated Offset Tracker to calculate Total Record Size based on Kafka
Consumer Record key and value length
Signed-off-by: Pierre Villard <[email protected]>
This closes #10093.
---
.../nifi/kafka/processors/ConsumeKafkaIT.java | 53 ++++++++++++++++++++++
.../apache/nifi/kafka/processors/ConsumeKafka.java | 39 +++++++++++++++-
.../kafka/processors/consumer/OffsetTracker.java | 12 +++++
3 files changed, 102 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index e85eaed9ed..03523a8cf3 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -19,7 +19,9 @@ package org.apache.nifi.kafka.processors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.service.Kafka3ConnectionService;
import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.reporting.InitializationException;
@@ -28,14 +30,18 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
@@ -199,4 +205,51 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
}
+
+ @Timeout(5)
+ @Test
+ void testMaxUncommittedSize() throws InterruptedException,
ExecutionException {
+ final String topic = UUID.randomUUID().toString();
+ final String groupId = topic.substring(0, topic.indexOf("-"));
+
+ final int recordCount = 100;
+ final int maxPollRecords = 10;
+ final int flowFilesExpected = recordCount / maxPollRecords;
+
+ final int recordSize = RECORD_VALUE.length();
+ final int maxUncommittedSize = recordSize * recordCount;
+
+ // Adjust Poll Records for this method
+ final ControllerService connectionService =
runner.getControllerService(CONNECTION_SERVICE_ID);
+ runner.disableControllerService(connectionService);
+ runner.setProperty(connectionService,
Kafka3ConnectionService.MAX_POLL_RECORDS, Integer.toString(maxPollRecords));
+ runner.enableControllerService(connectionService);
+
+ runner.setProperty(ConsumeKafka.GROUP_ID, groupId);
+ runner.setProperty(ConsumeKafka.TOPICS, topic);
+ runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY,
ProcessingStrategy.DEMARCATOR.getValue());
+ runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR,
System.lineSeparator());
+
+ // Set Uncommitted Size so that processing completes before 5 second
test timeout expires
+ runner.setProperty(ConsumeKafka.MAX_UNCOMMITTED_SIZE, "%d
B".formatted(maxUncommittedSize));
+ runner.setProperty(ConsumeKafka.MAX_UNCOMMITTED_TIME, "5 s");
+
+ final Collection<ProducerRecord<String, String>> records = new
ArrayList<>();
+ for (int i = 0; i < recordCount; i++) {
+ records.add(new ProducerRecord<>(topic, RECORD_VALUE));
+ }
+ produce(topic, records);
+
+ runner.run();
+
+ final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ConsumeKafka.SUCCESS);
+ assertEquals(flowFilesExpected, flowFiles.size());
+
+ long totalFlowFileSize = 0;
+ for (final MockFlowFile flowFile : flowFiles) {
+ totalFlowFileSize += flowFile.getSize();
+ }
+
+ assertTrue(totalFlowFileSize > maxUncommittedSize, "Total FlowFile
Size [%d] less than Max Uncommitted Size [%d]".formatted(totalFlowFileSize,
maxUncommittedSize));
+ }
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 8d5dcf8175..20ca6a9a1c 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
@@ -49,6 +50,7 @@ import org.apache.nifi.kafka.shared.property.KeyFormat;
import org.apache.nifi.kafka.shared.property.OutputStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -158,10 +160,26 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
.defaultValue("true")
.build();
+ static final PropertyDescriptor MAX_UNCOMMITTED_SIZE = new
PropertyDescriptor.Builder()
+ .name("Max Uncommitted Size")
+ .description("""
+ Maximum total size of records to consume from Kafka before
transferring FlowFiles to an output
+ relationship. Evaluated when specified based on the size
of serialized keys and values from each
+ Kafka record, before reaching the [Max Uncommitted Time].
+ """
+ )
+ .required(false)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new
PropertyDescriptor.Builder()
.name("Max Uncommitted Time")
- .description("Specifies the maximum amount of time that the
Processor can consume from Kafka before it must transfer FlowFiles on " +
- "through the flow and commit the offsets to Kafka (if
appropriate). A larger time period can result in longer latency.")
+ .description("""
+ Maximum amount of time to spend consuming records from
Kafka before transferring FlowFiles to an
+ output relationship. Longer amounts of time may produce
larger FlowFiles and increase processing
+ latency for individual records.
+ """
+ )
.required(true)
.defaultValue("100 millis")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
@@ -280,6 +298,7 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
TOPICS,
AUTO_OFFSET_RESET,
COMMIT_OFFSETS,
+ MAX_UNCOMMITTED_SIZE,
MAX_UNCOMMITTED_TIME,
HEADER_NAME_PATTERN,
HEADER_ENCODING,
@@ -308,6 +327,8 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
private volatile String brokerUri;
private volatile PollingContext pollingContext;
private volatile int maxConsumerCount;
+ private volatile boolean maxUncommittedSizeConfigured;
+ private volatile long maxUncommittedSize;
private final Queue<KafkaConsumerService> consumerServices = new
LinkedBlockingQueue<>();
private final AtomicInteger activeConsumerCount = new AtomicInteger();
@@ -351,6 +372,12 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
brokerUri =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class).getBrokerUri();
maxConsumerCount = context.getMaxConcurrentTasks();
activeConsumerCount.set(0);
+
+ final PropertyValue maxUncommittedSizeProperty =
context.getProperty(MAX_UNCOMMITTED_SIZE);
+ maxUncommittedSizeConfigured = maxUncommittedSizeProperty.isSet();
+ if (maxUncommittedSizeConfigured) {
+ maxUncommittedSize =
maxUncommittedSizeProperty.asDataSize(DataUnit.B).longValue();
+ }
}
@OnStopped
@@ -393,6 +420,14 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
recordsReceived = true;
processConsumerRecords(context, session, offsetTracker,
consumerRecords);
+
+ if (maxUncommittedSizeConfigured) {
+ // Stop consuming before reaching Max Uncommitted Time
when exceeding Max Uncommitted Size
+ final long totalRecordSize =
offsetTracker.getTotalRecordSize();
+ if (totalRecordSize > maxUncommittedSize) {
+ break;
+ }
+ }
} catch (final Exception e) {
getLogger().error("Failed to consume Kafka Records", e);
consumerService.rollback();
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
index 03b0b8c29b..e421b9c5b5 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/OffsetTracker.java
@@ -24,10 +24,12 @@ import org.apache.nifi.kafka.service.api.record.ByteRecord;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
public class OffsetTracker {
private final Map<TopicPartitionSummary, OffsetSummary> offsets = new
HashMap<>();
private final Map<String, Long> recordCounts = new HashMap<>();
+ private final AtomicLong totalRecordSize = new AtomicLong();
public void update(final ByteRecord consumerRecord) {
final TopicPartitionSummary topicPartitionSummary = new
TopicPartitionSummary(consumerRecord.getTopic(), consumerRecord.getPartition());
@@ -35,6 +37,16 @@ public class OffsetTracker {
final OffsetSummary offsetSummary =
offsets.computeIfAbsent(topicPartitionSummary, (summary) -> new
OffsetSummary(offset));
offsetSummary.setOffset(offset);
recordCounts.merge(consumerRecord.getTopic(),
consumerRecord.getBundledCount(), Long::sum);
+
+ // Update Total Record Size with Key and Value length
+ consumerRecord.getKey()
+ .map(key -> key.length)
+ .ifPresent(totalRecordSize::addAndGet);
+ totalRecordSize.addAndGet(consumerRecord.getValue().length);
+ }
+
+ public long getTotalRecordSize() {
+ return totalRecordSize.get();
}
public Map<String, Long> getRecordCounts() {