pvillard31 commented on code in PR #9807:
URL: https://github.com/apache/nifi/pull/9807#discussion_r1999452948


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -365,23 +363,53 @@ public void onStopped() {
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         final KafkaConsumerService consumerService = 
getConsumerService(context);
 
+        final long maxUncommittedMillis = 
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long stopTime = System.currentTimeMillis() + 
maxUncommittedMillis;
+        final OffsetTracker offsetTracker = new OffsetTracker();
+
         try {
-            final Iterator<ByteRecord> consumerRecords = 
consumerService.poll().iterator();
-            if (!consumerRecords.hasNext()) {
-                getLogger().debug("No Kafka Records consumed: {}", 
pollingContext);
-                return;
+            while (System.currentTimeMillis() < stopTime) {
+                try {
+                    final Duration maxWaitDuration = 
Duration.ofMillis(stopTime - System.currentTimeMillis());
+                    final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
+                    if (!consumerRecords.hasNext()) {
+                        getLogger().debug("No Kafka Records consumed: {}", 
pollingContext);
+                        continue;
+                    }
+
+                    processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
+                } catch (final Exception e) {
+                    getLogger().error("Failed to consume Kafka Records", e);
+                    consumerService.rollback();
+
+                    try {
+                        consumerService.close();
+                    } catch (final IOException ex) {
+                        getLogger().warn("Failed to close Kafka Consumer 
Service", ex);
+                    }

Review Comment:
   I feel like explicitly calling the `close()` here is making things easier to 
read compared to an empty try block. I can definitely make the change if there 
is a consensus that the empty block is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to