abhishekmjain commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1873755775


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -348,13 +348,15 @@ record = queue.take();
             // Committed offset should always be the offset of the next record 
to be read (hence +1)
             partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
           }
+        } catch (InterruptedException e) {
+          // Stop queue processor and return when encountered 
InterruptedException
+          log.warn("Thread interrupted while processing queue ", e);
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          // Log the error and let the queue processor continue processing
+          log.error("Encountered exception while processing record. Record: {} 
Exception: {}", record, e);
         }

Review Comment:
   Also, adding a retry may cause some hidden effects like delay in message 
processing instead of an early failure. It may also introduce delays in 
catching a true failure.
   
   Since this piece is getting used at a lot of places, I would like to keep 
the change minimal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to