[ 
https://issues.apache.org/jira/browse/GOBBLIN-2177?focusedWorklogId=946924&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946924
 ]

ASF GitHub Bot logged work on GOBBLIN-2177:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/24 18:36
            Start Date: 05/Dec/24 18:36
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1871905332


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -348,13 +348,15 @@ record = queue.take();
             // Committed offset should always be the offset of the next record 
to be read (hence +1)
             partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
           }
+        } catch (InterruptedException e) {
+          // Stop queue processor and return when encountered 
InterruptedException
+          log.warn("Thread interrupted while processing queue ", e);
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          // Log the error and let the queue processor continue processing
+          log.error("Encountered exception while processing record. Record: {} 
Exception: {}", record, e);
         }

Review Comment:
   yes, exactly.  right now, it's an infinite loop w/ no recovery (so obviously 
no retries).
   
   this PR would change that infinite loop to do infinite recovery, although 
NEVER retry a given message (any msg hitting an error gets skipped).
   
   I'm suggesting to change the infinite loop to do possibly infinite recovery 
with bounded retries on any given message.  recovery only continues as long as 
msg retries resolve within bounded time.
   
   I'd let retries continue a configurable time, probably up to 15 mins or so 
in our case of gobblin cluster.  at that point fail loudly via a metrics 
condition to alert on.  (I'd set threshold very low there - like just one 
event.)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -336,8 +336,8 @@ public QueueProcessor(BlockingQueue queue) {
     public void run() {
       log.info("Starting queue processing.. " + 
Thread.currentThread().getName());
       KafkaConsumerRecord record = null;
-      try {
-        while (true) {
+      while (true) {
+        try {
           record = queue.take();

Review Comment:
   right now the `QueueProcessor` thread is dead so we no longer run:
   ```
   partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
   ```
   which means that whether or not we continue consuming, after system restart, 
we'll rewind offsets to have another go with that first problematic message





Issue Time Tracking
-------------------

    Worklog Id:     (was: 946924)
    Time Spent: 1h 40m  (was: 1.5h)

> Avoid stopping Kafka HighLevelConsumer - QueueProcessor on 
> non-InterruptedExceptions
> ------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-2177
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2177
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Abhishek Jain
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The QueueProcessor within HighLevelConsumer contains an infinite while loop 
> that is enclosed in a try-catch block. When any exception is encountered, 
> this loop breaks, which halts the processing of any consumed messages until 
> the service is restarted.
> We should not break this infinite loop on all exceptions; rather, we should 
> break it only on InterruptedException, which truly means the QueueProcessor 
> should stop processing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to