abhishekmjain commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1867572067
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -348,13 +348,15 @@ record = queue.take(); // Committed offset should always be the offset of the next record to be read (hence +1) partitionOffsetsToCommit.put(partition, record.getOffset() + 1); } + } catch (InterruptedException e) { + // Stop queue processor and return when encountered InterruptedException + log.warn("Thread interrupted while processing queue ", e); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + // Log the error and let the queue processor continue processing + log.error("Encountered exception while processing record. Record: {} Exception: {}", record, e); } Review Comment: Actually there is no retry happening here during `processMessage`, are you suggesting we implement a retry mechanism here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org