This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eaca86ab6f NIFI-14598: Do not make Kafka consumer available to other
threads until done using it, by ensuring that it is not added back to the pool
until session commit completes (successfully or not). And do not make more than
Max Concurrent Tasks active consumers.
eaca86ab6f is described below
commit eaca86ab6fb4b8eb96c941e5e45571720973252b
Author: Mark Payne <[email protected]>
AuthorDate: Fri May 23 16:09:52 2025 -0400
NIFI-14598: Do not make Kafka consumer available to other threads until
done using it, by ensuring that it is not added back to the pool until session
commit completes (successfully or not). And do not make more than Max
Concurrent Tasks active consumers.
NIFI-14598: If ConsumeKafka encounters an Exception while consuming, ensure
that we rollback ProcessSession so that already-created FlowFiles are not
transferred before we revert the offsets. Also ensure that we cannot decrement
the Active Consuemr counter more than once for the same consumer
NIFI-14598: Fixed bug in which ConsumeKafka duplicated the last record upon
restart
This closes #9971.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi/kafka/processors/ConsumeKafkaIT.java | 40 ++++++
.../kafka/processors/ConsumeKafkaOffsetsIT.java | 2 +-
.../apache/nifi/kafka/processors/ConsumeKafka.java | 145 ++++++++++++++-------
.../kafka/service/Kafka3ConnectionService.java | 1 +
.../service/consumer/Kafka3ConsumerService.java | 7 +-
5 files changed, 145 insertions(+), 50 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index 1b805ee806..e85eaed9ed 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -159,4 +159,44 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
runner.run(1, true, false);
runner.assertTransferCount("success", 1);
}
+
+ @Test
+ public void testConsumesAllRecordsWithoutDuplicates() throws
ExecutionException, InterruptedException {
+ final String topic = "testConsumesAllRecordsWithoutDuplicates";
+
+ runner.setProperty(ConsumeKafka.GROUP_ID,
"testConsumesAllRecordsWithoutDuplicates");
+ runner.setProperty(ConsumeKafka.TOPICS, topic);
+ runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY,
ProcessingStrategy.FLOW_FILE.getValue());
+
+ produceOne(topic, 0, null, "1", null);
+
+ // Initialize processor
+ runner.run(1, false, true);
+ while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+ runner.run(1, false, false);
+ }
+
+ // Ensure that we have exactly 1 FlowFile output
+ runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
+ runner.clearTransferState();
+
+ // Add another record and ensure that we get exactly 1 more
+ produceOne(topic, 0, null, "1", null);
+ while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
+ runner.clearTransferState();
+
+ // Stop processor, add another, and then ensure that we consume
exactly 1 more.
+ runner.stop();
+ produceOne(topic, 0, null, "1", null);
+ runner.run(1, false, true);
+ while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+ runner.run(1, false, false);
+ }
+
+ runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
+ }
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
index cb5c55781c..b653d5c8bc 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
@@ -85,7 +85,7 @@ class ConsumeKafkaOffsetsIT extends AbstractConsumeKafkaIT {
assertEquals(1, committedOffsets.entrySet().size());
Map.Entry<TopicPartition, OffsetAndMetadata> entry =
committedOffsets.entrySet().iterator().next();
assertEquals(topic, entry.getKey().topic());
- assertEquals(values.length - 1, entry.getValue().offset());
+ assertEquals(values.length, entry.getValue().offset());
}
}
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 44d8f72ff0..4643a02733 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -72,6 +72,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
@@ -305,8 +306,10 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
private volatile boolean useReader;
private volatile String brokerUri;
private volatile PollingContext pollingContext;
+ private volatile int maxConsumerCount;
private final Queue<KafkaConsumerService> consumerServices = new
LinkedBlockingQueue<>();
+ private final AtomicInteger activeConsumerCount = new AtomicInteger();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -345,6 +348,8 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
?
context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class)
: KeyFormat.BYTE_ARRAY;
brokerUri =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class).getBrokerUri();
+ maxConsumerCount = context.getMaxConcurrentTasks();
+ activeConsumerCount.set(0);
}
@OnStopped
@@ -353,11 +358,7 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
KafkaConsumerService service;
while ((service = consumerServices.poll()) != null) {
- try {
- service.close();
- } catch (IOException e) {
- getLogger().warn("Failed to close Kafka Consumer Service", e);
- }
+ close(service);
}
}
@@ -365,61 +366,101 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) {
final KafkaConsumerService consumerService =
getConsumerService(context);
+ if (consumerService == null) {
+ getLogger().debug("No Kafka Consumer Service available; will yield
and return immediately");
+ context.yield();
+ return;
+ }
final long maxUncommittedMillis =
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final long stopTime = System.currentTimeMillis() +
maxUncommittedMillis;
final OffsetTracker offsetTracker = new OffsetTracker();
+ boolean recordsReceived = false;
- try {
- while (System.currentTimeMillis() < stopTime) {
- try {
- final Duration maxWaitDuration =
Duration.ofMillis(stopTime - System.currentTimeMillis());
- final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
- if (!consumerRecords.hasNext()) {
- getLogger().debug("No Kafka Records consumed: {}",
pollingContext);
- continue;
- }
-
- processConsumerRecords(context, session, offsetTracker,
consumerRecords);
- } catch (final Exception e) {
- getLogger().error("Failed to consume Kafka Records", e);
- consumerService.rollback();
-
- try {
- consumerService.close();
- } catch (final IOException ex) {
- getLogger().warn("Failed to close Kafka Consumer
Service", ex);
- }
+ while (System.currentTimeMillis() < stopTime) {
+ try {
+ final Duration maxWaitDuration = Duration.ofMillis(stopTime -
System.currentTimeMillis());
+ if (maxWaitDuration.toMillis() <= 0) {
break;
}
+
+ final Iterator<ByteRecord> consumerRecords =
consumerService.poll(maxWaitDuration).iterator();
+ if (!consumerRecords.hasNext()) {
+ getLogger().trace("No Kafka Records consumed: {}",
pollingContext);
+ continue;
+ }
+
+ recordsReceived = true;
+ processConsumerRecords(context, session, offsetTracker,
consumerRecords);
+ } catch (final Exception e) {
+ getLogger().error("Failed to consume Kafka Records", e);
+ consumerService.rollback();
+ close(consumerService);
+ context.yield();
+ // If there are any FlowFiles already created and transferred,
roll them back because we're rolling back offsets and
+ // because we will consume the data again, we don't want to
transfer out the FlowFiles.
+ session.rollback();
+ return;
}
+ }
+
+ if (!recordsReceived) {
+ getLogger().trace("No Kafka Records consumed, re-queuing
consumer");
+ consumerServices.offer(consumerService);
+ return;
+ }
+
+ session.commitAsync(
+ () -> commitOffsets(consumerService, offsetTracker,
pollingContext),
+ throwable -> {
+ getLogger().error("Failed to commit session; will roll back
any uncommitted records", throwable);
+ rollback(consumerService);
+ context.yield();
+ });
+ }
+
+ private void commitOffsets(final KafkaConsumerService consumerService,
final OffsetTracker offsetTracker, final PollingContext pollingContext) {
+ if (!commitOffsets) {
+ return;
+ }
- session.commitAsync(
- () -> {
- if (commitOffsets) {
-
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
- }
- },
- throwable -> {
- getLogger().error("Failed to commit session; will roll
back any uncommitted records", throwable);
-
- if (!consumerService.isClosed()) {
- consumerService.rollback();
-
- try {
- consumerService.close();
- } catch (final IOException e) {
- getLogger().warn("Failed to close Kafka
Consumer Service", e);
- }
- }
- });
- } finally {
- if (!consumerService.isClosed()) {
+ try {
+
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
+ consumerServices.offer(consumerService);
+ getLogger().debug("Committed offsets for Kafka Consumer Service");
+ } catch (final Exception e) {
+ close(consumerService);
+ getLogger().error("Failed to commit offsets for Kafka Consumer
Service", e);
+ }
+ }
+
+ private void rollback(final KafkaConsumerService consumerService) {
+ if (!consumerService.isClosed()) {
+ try {
+ consumerService.rollback();
consumerServices.offer(consumerService);
+ getLogger().debug("Rolled back offsets for Kafka Consumer
Service");
+ } catch (final Exception e) {
+ getLogger().warn("Failed to rollback offsets for Kafka
Consumer", e);
+ close(consumerService);
}
}
}
+ private void close(final KafkaConsumerService consumerService) {
+ if (consumerService.isClosed()) {
+ getLogger().debug("Asked to close Kafka Consumer Service but
consumer already closed");
+ return;
+ }
+
+ try {
+ consumerService.close();
+ activeConsumerCount.decrementAndGet();
+ } catch (final IOException ioe) {
+ getLogger().warn("Failed to close Kafka Consumer Service", ioe);
+ }
+ }
+
@Override
public List<ConfigVerificationResult> verify(final ProcessContext context,
final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> verificationResults = new
ArrayList<>();
@@ -453,10 +494,22 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
return consumerService;
}
+ final int activeCount = activeConsumerCount.incrementAndGet();
+ if (activeCount > getMaxConsumerCount()) {
+ getLogger().trace("No Kafka Consumer Service available; have
already reached max count of {} so will not create a new one",
getMaxConsumerCount());
+ activeConsumerCount.decrementAndGet();
+ return null;
+ }
+
+ getLogger().debug("No Kafka Consumer Service available; creating a new
one");
final KafkaConnectionService connectionService =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
return connectionService.getConsumerService(pollingContext);
}
+ private int getMaxConsumerCount() {
+ return maxConsumerCount;
+ }
+
private void processConsumerRecords(final ProcessContext context, final
ProcessSession session, final OffsetTracker offsetTracker,
final Iterator<ByteRecord> consumerRecords) {
switch (processingStrategy) {
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index bed6590ac3..7059fe7fc8 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -307,6 +307,7 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
properties.putAll(consumerProperties);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,
subscription.getGroupId());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
subscription.getAutoOffsetReset().getValue());
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
final Consumer<byte[], byte[]> consumer = new
KafkaConsumer<>(properties, deserializer, deserializer);
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
index fdd426dc70..59ea23818a 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
@@ -36,9 +36,9 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -164,7 +164,7 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
}
private Map<TopicPartition, OffsetAndMetadata> getOffsets(final
PollingSummary pollingSummary) {
- final Map<TopicPartition, OffsetAndMetadata> offsets = new
LinkedHashMap<>();
+ final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
final Map<TopicPartitionSummary, OffsetSummary> summaryOffsets =
pollingSummary.getOffsets();
for (final Map.Entry<TopicPartitionSummary, OffsetSummary> offsetEntry
: summaryOffsets.entrySet()) {
@@ -172,7 +172,8 @@ public class Kafka3ConsumerService implements
KafkaConsumerService, Closeable, C
final TopicPartition topicPartition = new
TopicPartition(topicPartitionSummary.getTopic(),
topicPartitionSummary.getPartition());
final OffsetSummary offsetSummary = offsetEntry.getValue();
- final OffsetAndMetadata offsetAndMetadata = new
OffsetAndMetadata(offsetSummary.getOffset());
+ // Offset should indicate the offset that we want to consume from
next. This will be 1 more than the most recently obtained offset.
+ final OffsetAndMetadata offsetAndMetadata = new
OffsetAndMetadata(offsetSummary.getOffset() + 1);
offsets.put(topicPartition, offsetAndMetadata);
}