sfc-gh-mpayne commented on code in PR #9971:
URL: https://github.com/apache/nifi/pull/9971#discussion_r2110108100


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -353,73 +358,101 @@ public void onStopped() {
         KafkaConsumerService service;
 
         while ((service = consumerServices.poll()) != null) {
-            try {
-                service.close();
-            } catch (IOException e) {
-                getLogger().warn("Failed to close Kafka Consumer Service", e);
-            }
+            close(service);
         }
     }
 
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         final KafkaConsumerService consumerService = 
getConsumerService(context);
+        if (consumerService == null) {
+            getLogger().debug("No Kafka Consumer Service available; will yield 
and return immediately");
+            context.yield();
+            return;
+        }
 
         final long maxUncommittedMillis = 
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
         final long stopTime = System.currentTimeMillis() + 
maxUncommittedMillis;
         final OffsetTracker offsetTracker = new OffsetTracker();
+        boolean recordsReceived = false;
 
-        try {
-            while (System.currentTimeMillis() < stopTime) {
-                try {
-                    final Duration maxWaitDuration = 
Duration.ofMillis(stopTime - System.currentTimeMillis());
-                    final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
-                    if (!consumerRecords.hasNext()) {
-                        getLogger().debug("No Kafka Records consumed: {}", 
pollingContext);
-                        continue;
-                    }
-
-                    processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to consume Kafka Records", e);
-                    consumerService.rollback();
-
-                    try {
-                        consumerService.close();
-                    } catch (final IOException ex) {
-                        getLogger().warn("Failed to close Kafka Consumer 
Service", ex);
-                    }
+        while (System.currentTimeMillis() < stopTime) {
+            try {
+                final Duration maxWaitDuration = Duration.ofMillis(stopTime - 
System.currentTimeMillis());
+                if (maxWaitDuration.toMillis() <= 0) {
                     break;
                 }
+
+                final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
+                if (!consumerRecords.hasNext()) {
+                    getLogger().trace("No Kafka Records consumed: {}", 
pollingContext);
+                    continue;
+                }
+
+                recordsReceived = true;
+                processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
+            } catch (final Exception e) {
+                getLogger().error("Failed to consume Kafka Records", e);
+                consumerService.rollback();
+                close(consumerService);
+                context.yield();
+                break;

Review Comment:
   Thanks for reviewing @turcsanyip and great catch. I pushed an update. In the 
even that we catch an Exception there, we will now rollback the ProcessSession 
so that any previously created/transferred FlowFiles now get rolled back. I 
also updated the `close(KafkaConsumerService)` to check 
`KafkaConsumerService.isClosed()` before decrementing the count. This should 
address each of the two concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to