[ https://issues.apache.org/jira/browse/GOBBLIN-2177?focusedWorklogId=946924&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946924 ]
ASF GitHub Bot logged work on GOBBLIN-2177: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Dec/24 18:36 Start Date: 05/Dec/24 18:36 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1871905332 ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -348,13 +348,15 @@ record = queue.take(); // Committed offset should always be the offset of the next record to be read (hence +1) partitionOffsetsToCommit.put(partition, record.getOffset() + 1); } + } catch (InterruptedException e) { + // Stop queue processor and return when encountered InterruptedException + log.warn("Thread interrupted while processing queue ", e); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + // Log the error and let the queue processor continue processing + log.error("Encountered exception while processing record. Record: {} Exception: {}", record, e); } Review Comment: yes, exactly. right now, it's an infinite loop w/ no recovery (so obviously no retries). this PR would change that infinite loop to do infinite recovery, although NEVER retry a given message (any msg hitting an error gets skipped). I'm suggesting to change the infinite loop to do possibly infinite recovery with bounded retries on any given message. recovery only continues as long as msg retries resolve within bounded time. I'd let retries continue a configurable time, probably up to 15 mins or so in our case of gobblin cluster. at that point fail loudly via a metrics condition to alert on. (I'd set threshold very low there - like just one event.) ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -336,8 +336,8 @@ public QueueProcessor(BlockingQueue queue) { public void run() { log.info("Starting queue processing.. " + Thread.currentThread().getName()); KafkaConsumerRecord record = null; - try { - while (true) { + while (true) { + try { record = queue.take(); Review Comment: right now the `QueueProcessor` thread is dead so we no longer run: ``` partitionOffsetsToCommit.put(partition, record.getOffset() + 1); ``` which means that whether or not we continue consuming, after system restart, we'll rewind offsets to have another go with that first problematic message Issue Time Tracking ------------------- Worklog Id: (was: 946924) Time Spent: 1h 40m (was: 1.5h) > Avoid stopping Kafka HighLevelConsumer - QueueProcessor on > non-InterruptedExceptions > ------------------------------------------------------------------------------------ > > Key: GOBBLIN-2177 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2177 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Abhishek Jain > Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > The QueueProcessor within HighLevelConsumer contains an infinite while loop > that is enclosed in a try-catch block. When any exception is encountered, > this loop breaks, which halts the processing of any consumed messages until > the service is restarted. > We should not break this infinite loop on all exceptions; rather, we should > break it only on InterruptedException, which truly means the QueueProcessor > should stop processing. -- This message was sent by Atlassian Jira (v8.20.10#820010)