This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eaca86ab6f NIFI-14598: Do not make Kafka consumer available to other 
threads until done using it, by ensuring that it is not added back to the pool 
until session commit completes (successfully or not). And do not make more than 
Max Concurrent Tasks active consumers.
eaca86ab6f is described below

commit eaca86ab6fb4b8eb96c941e5e45571720973252b
Author: Mark Payne <[email protected]>
AuthorDate: Fri May 23 16:09:52 2025 -0400

    NIFI-14598: Do not make Kafka consumer available to other threads until 
done using it, by ensuring that it is not added back to the pool until session 
commit completes (successfully or not). And do not make more than Max 
Concurrent Tasks active consumers.
    
    NIFI-14598: If ConsumeKafka encounters an Exception while consuming, ensure 
that we rollback ProcessSession so that already-created FlowFiles are not 
transferred before we revert the offsets. Also ensure that we cannot decrement 
the Active Consuemr counter more than once for the same consumer
    
    NIFI-14598: Fixed bug in which ConsumeKafka duplicated the last record upon 
restart
    
    This closes #9971.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi/kafka/processors/ConsumeKafkaIT.java      |  40 ++++++
 .../kafka/processors/ConsumeKafkaOffsetsIT.java    |   2 +-
 .../apache/nifi/kafka/processors/ConsumeKafka.java | 145 ++++++++++++++-------
 .../kafka/service/Kafka3ConnectionService.java     |   1 +
 .../service/consumer/Kafka3ConsumerService.java    |   7 +-
 5 files changed, 145 insertions(+), 50 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
index 1b805ee806..e85eaed9ed 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaIT.java
@@ -159,4 +159,44 @@ class ConsumeKafkaIT extends AbstractConsumeKafkaIT {
         runner.run(1, true, false);
         runner.assertTransferCount("success", 1);
     }
+
+    @Test
+    public void testConsumesAllRecordsWithoutDuplicates() throws 
ExecutionException, InterruptedException {
+        final String topic = "testConsumesAllRecordsWithoutDuplicates";
+
+        runner.setProperty(ConsumeKafka.GROUP_ID, 
"testConsumesAllRecordsWithoutDuplicates");
+        runner.setProperty(ConsumeKafka.TOPICS, topic);
+        runner.setProperty(ConsumeKafka.PROCESSING_STRATEGY, 
ProcessingStrategy.FLOW_FILE.getValue());
+
+        produceOne(topic, 0, null, "1", null);
+
+        // Initialize processor
+        runner.run(1, false, true);
+        while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+            runner.run(1, false, false);
+        }
+
+        // Ensure that we have exactly 1 FlowFile output
+        runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
+        runner.clearTransferState();
+
+        // Add another record and ensure that we get exactly 1 more
+        produceOne(topic, 0, null, "1", null);
+        while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+            runner.run(1, false, false);
+        }
+
+        runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
+        runner.clearTransferState();
+
+        // Stop processor, add another, and then ensure that we consume 
exactly 1 more.
+        runner.stop();
+        produceOne(topic, 0, null, "1", null);
+        runner.run(1, false, true);
+        while (runner.getFlowFilesForRelationship("success").isEmpty()) {
+            runner.run(1, false, false);
+        }
+
+        runner.assertAllFlowFilesTransferred(ConsumeKafka.SUCCESS, 1);
+    }
 }
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
index cb5c55781c..b653d5c8bc 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaOffsetsIT.java
@@ -85,7 +85,7 @@ class ConsumeKafkaOffsetsIT extends AbstractConsumeKafkaIT {
             assertEquals(1, committedOffsets.entrySet().size());
             Map.Entry<TopicPartition, OffsetAndMetadata> entry = 
committedOffsets.entrySet().iterator().next();
             assertEquals(topic, entry.getKey().topic());
-            assertEquals(values.length - 1, entry.getValue().offset());
+            assertEquals(values.length, entry.getValue().offset());
         }
     }
 
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
index 44d8f72ff0..4643a02733 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java
@@ -72,6 +72,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
@@ -305,8 +306,10 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
     private volatile boolean useReader;
     private volatile String brokerUri;
     private volatile PollingContext pollingContext;
+    private volatile int maxConsumerCount;
 
     private final Queue<KafkaConsumerService> consumerServices = new 
LinkedBlockingQueue<>();
+    private final AtomicInteger activeConsumerCount = new AtomicInteger();
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -345,6 +348,8 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
                 ? 
context.getProperty(KEY_FORMAT).asAllowableValue(KeyFormat.class)
                 : KeyFormat.BYTE_ARRAY;
         brokerUri = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class).getBrokerUri();
+        maxConsumerCount = context.getMaxConcurrentTasks();
+        activeConsumerCount.set(0);
     }
 
     @OnStopped
@@ -353,11 +358,7 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
         KafkaConsumerService service;
 
         while ((service = consumerServices.poll()) != null) {
-            try {
-                service.close();
-            } catch (IOException e) {
-                getLogger().warn("Failed to close Kafka Consumer Service", e);
-            }
+            close(service);
         }
     }
 
@@ -365,61 +366,101 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         final KafkaConsumerService consumerService = 
getConsumerService(context);
+        if (consumerService == null) {
+            getLogger().debug("No Kafka Consumer Service available; will yield 
and return immediately");
+            context.yield();
+            return;
+        }
 
         final long maxUncommittedMillis = 
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
         final long stopTime = System.currentTimeMillis() + 
maxUncommittedMillis;
         final OffsetTracker offsetTracker = new OffsetTracker();
+        boolean recordsReceived = false;
 
-        try {
-            while (System.currentTimeMillis() < stopTime) {
-                try {
-                    final Duration maxWaitDuration = 
Duration.ofMillis(stopTime - System.currentTimeMillis());
-                    final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
-                    if (!consumerRecords.hasNext()) {
-                        getLogger().debug("No Kafka Records consumed: {}", 
pollingContext);
-                        continue;
-                    }
-
-                    processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to consume Kafka Records", e);
-                    consumerService.rollback();
-
-                    try {
-                        consumerService.close();
-                    } catch (final IOException ex) {
-                        getLogger().warn("Failed to close Kafka Consumer 
Service", ex);
-                    }
+        while (System.currentTimeMillis() < stopTime) {
+            try {
+                final Duration maxWaitDuration = Duration.ofMillis(stopTime - 
System.currentTimeMillis());
+                if (maxWaitDuration.toMillis() <= 0) {
                     break;
                 }
+
+                final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
+                if (!consumerRecords.hasNext()) {
+                    getLogger().trace("No Kafka Records consumed: {}", 
pollingContext);
+                    continue;
+                }
+
+                recordsReceived = true;
+                processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
+            } catch (final Exception e) {
+                getLogger().error("Failed to consume Kafka Records", e);
+                consumerService.rollback();
+                close(consumerService);
+                context.yield();
+                // If there are any FlowFiles already created and transferred, 
roll them back because we're rolling back offsets and
+                // because we will consume the data again, we don't want to 
transfer out the FlowFiles.
+                session.rollback();
+                return;
             }
+        }
+
+        if (!recordsReceived) {
+            getLogger().trace("No Kafka Records consumed, re-queuing 
consumer");
+            consumerServices.offer(consumerService);
+            return;
+        }
+
+        session.commitAsync(
+            () -> commitOffsets(consumerService, offsetTracker, 
pollingContext),
+            throwable -> {
+                getLogger().error("Failed to commit session; will roll back 
any uncommitted records", throwable);
+                rollback(consumerService);
+                context.yield();
+            });
+    }
+
+    private void commitOffsets(final KafkaConsumerService consumerService, 
final OffsetTracker offsetTracker, final PollingContext pollingContext) {
+        if (!commitOffsets) {
+            return;
+        }
 
-            session.commitAsync(
-                    () -> {
-                        if (commitOffsets) {
-                            
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
-                        }
-                    },
-                    throwable -> {
-                        getLogger().error("Failed to commit session; will roll 
back any uncommitted records", throwable);
-
-                        if (!consumerService.isClosed()) {
-                            consumerService.rollback();
-
-                            try {
-                                consumerService.close();
-                            } catch (final IOException e) {
-                                getLogger().warn("Failed to close Kafka 
Consumer Service", e);
-                            }
-                        }
-                    });
-        } finally {
-            if (!consumerService.isClosed()) {
+        try {
+            
consumerService.commit(offsetTracker.getPollingSummary(pollingContext));
+            consumerServices.offer(consumerService);
+            getLogger().debug("Committed offsets for Kafka Consumer Service");
+        } catch (final Exception e) {
+            close(consumerService);
+            getLogger().error("Failed to commit offsets for Kafka Consumer 
Service", e);
+        }
+    }
+
+    private void rollback(final KafkaConsumerService consumerService) {
+        if (!consumerService.isClosed()) {
+            try {
+                consumerService.rollback();
                 consumerServices.offer(consumerService);
+                getLogger().debug("Rolled back offsets for Kafka Consumer 
Service");
+            } catch (final Exception e) {
+                getLogger().warn("Failed to rollback offsets for Kafka 
Consumer", e);
+                close(consumerService);
             }
         }
     }
 
+    private void close(final KafkaConsumerService consumerService) {
+        if (consumerService.isClosed()) {
+            getLogger().debug("Asked to close Kafka Consumer Service but 
consumer already closed");
+            return;
+        }
+
+        try {
+            consumerService.close();
+            activeConsumerCount.decrementAndGet();
+        } catch (final IOException ioe) {
+            getLogger().warn("Failed to close Kafka Consumer Service", ioe);
+        }
+    }
+
     @Override
     public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
         final List<ConfigVerificationResult> verificationResults = new 
ArrayList<>();
@@ -453,10 +494,22 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
             return consumerService;
         }
 
+        final int activeCount = activeConsumerCount.incrementAndGet();
+        if (activeCount > getMaxConsumerCount()) {
+            getLogger().trace("No Kafka Consumer Service available; have 
already reached max count of {} so will not create a new one", 
getMaxConsumerCount());
+            activeConsumerCount.decrementAndGet();
+            return null;
+        }
+
+        getLogger().debug("No Kafka Consumer Service available; creating a new 
one");
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
         return connectionService.getConsumerService(pollingContext);
     }
 
+    private int getMaxConsumerCount() {
+        return maxConsumerCount;
+    }
+
     private void processConsumerRecords(final ProcessContext context, final 
ProcessSession session, final OffsetTracker offsetTracker,
             final Iterator<ByteRecord> consumerRecords) {
         switch (processingStrategy) {
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index bed6590ac3..7059fe7fc8 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -307,6 +307,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         properties.putAll(consumerProperties);
         properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
subscription.getGroupId());
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
subscription.getAutoOffsetReset().getValue());
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
         final Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(properties, deserializer, deserializer);
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
index fdd426dc70..59ea23818a 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java
@@ -36,9 +36,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -164,7 +164,7 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
     }
 
     private Map<TopicPartition, OffsetAndMetadata> getOffsets(final 
PollingSummary pollingSummary) {
-        final Map<TopicPartition, OffsetAndMetadata> offsets = new 
LinkedHashMap<>();
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
 
         final Map<TopicPartitionSummary, OffsetSummary> summaryOffsets = 
pollingSummary.getOffsets();
         for (final Map.Entry<TopicPartitionSummary, OffsetSummary> offsetEntry 
: summaryOffsets.entrySet()) {
@@ -172,7 +172,8 @@ public class Kafka3ConsumerService implements 
KafkaConsumerService, Closeable, C
             final TopicPartition topicPartition = new 
TopicPartition(topicPartitionSummary.getTopic(), 
topicPartitionSummary.getPartition());
 
             final OffsetSummary offsetSummary = offsetEntry.getValue();
-            final OffsetAndMetadata offsetAndMetadata = new 
OffsetAndMetadata(offsetSummary.getOffset());
+            // Offset should indicate the offset that we want to consume from 
next. This will be 1 more than the most recently obtained offset.
+            final OffsetAndMetadata offsetAndMetadata = new 
OffsetAndMetadata(offsetSummary.getOffset() + 1);
             offsets.put(topicPartition, offsetAndMetadata);
         }
 

Reply via email to