This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 99c5a76e2 [GOBBLIN-2044] Catch and log exceptions in HighLevelConsumer 
queue consumption (#3923)
99c5a76e2 is described below

commit 99c5a76e2b6193a8c78053b4e618ededfccbd3aa
Author: umustafi <[email protected]>
AuthorDate: Tue Apr 16 15:16:31 2024 -0700

    [GOBBLIN-2044] Catch and log exceptions in HighLevelConsumer queue 
consumption (#3923)
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 82a45ceaf..ea996b5fb 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -335,9 +335,10 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
     @Override
     public void run() {
       log.info("Starting queue processing.. " + 
Thread.currentThread().getName());
+      KafkaConsumerRecord record = null;
       try {
         while (true) {
-          KafkaConsumerRecord record = queue.take();
+          record = queue.take();
           messagesRead.inc();
           HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
           recordsProcessed.incrementAndGet();
@@ -349,9 +350,11 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
           }
         }
       } catch (InterruptedException e) {
-        log.warn("Encountered exception while processing queue ", e);
+        log.warn("Thread interrupted while processing queue ", e);
         // TODO: evaluate whether we should interrupt the thread or continue 
processing
         Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        log.error("Encountered exception while processing record so stopping 
queue processing. Record: {} Exception: {}", record, e);
       }
     }
   }

Reply via email to