This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 99c5a76e2 [GOBBLIN-2044] Catch and log exceptions in HighLevelConsumer
queue consumption (#3923)
99c5a76e2 is described below
commit 99c5a76e2b6193a8c78053b4e618ededfccbd3aa
Author: umustafi <[email protected]>
AuthorDate: Tue Apr 16 15:16:31 2024 -0700
[GOBBLIN-2044] Catch and log exceptions in HighLevelConsumer queue
consumption (#3923)
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 82a45ceaf..ea996b5fb 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -335,9 +335,10 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
@Override
public void run() {
log.info("Starting queue processing.. " +
Thread.currentThread().getName());
+ KafkaConsumerRecord record = null;
try {
while (true) {
- KafkaConsumerRecord record = queue.take();
+ record = queue.take();
messagesRead.inc();
HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
recordsProcessed.incrementAndGet();
@@ -349,9 +350,11 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
}
}
} catch (InterruptedException e) {
- log.warn("Encountered exception while processing queue ", e);
+ log.warn("Thread interrupted while processing queue ", e);
// TODO: evaluate whether we should interrupt the thread or continue
processing
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ log.error("Encountered exception while processing record so stopping
queue processing. Record: {} Exception: {}", record, e);
}
}
}