[ https://issues.apache.org/jira/browse/GOBBLIN-2177?focusedWorklogId=946534&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946534 ]
ASF GitHub Bot logged work on GOBBLIN-2177: ------------------------------------------- Author: ASF GitHub Bot Created on: 03/Dec/24 14:06 Start Date: 03/Dec/24 14:06 Worklog Time Spent: 10m Work Description: abhishekmjain commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1867572067 ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -348,13 +348,15 @@ record = queue.take(); // Committed offset should always be the offset of the next record to be read (hence +1) partitionOffsetsToCommit.put(partition, record.getOffset() + 1); } + } catch (InterruptedException e) { + // Stop queue processor and return when encountered InterruptedException + log.warn("Thread interrupted while processing queue ", e); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + // Log the error and let the queue processor continue processing + log.error("Encountered exception while processing record. Record: {} Exception: {}", record, e); } Review Comment: Actually there is no retry happening here during `processMessage`, are you suggesting we implement a retry mechanism here? Issue Time Tracking ------------------- Worklog Id: (was: 946534) Time Spent: 1.5h (was: 1h 20m) > Avoid stopping Kafka HighLevelConsumer - QueueProcessor on > non-InterruptedExceptions > ------------------------------------------------------------------------------------ > > Key: GOBBLIN-2177 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2177 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Abhishek Jain > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > The QueueProcessor within HighLevelConsumer contains an infinite while loop > that is enclosed in a try-catch block. When any exception is encountered, > this loop breaks, which halts the processing of any consumed messages until > the service is restarted. > We should not break this infinite loop on all exceptions; rather, we should > break it only on InterruptedException, which truly means the QueueProcessor > should stop processing. -- This message was sent by Atlassian Jira (v8.20.10#820010)