This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2595f6c328 [GOBBLIN-2177] Avoid shutting down QueueProcessor on 
non-InterruptedException (#4080)
2595f6c328 is described below

commit 2595f6c328df95599634e0f95d9bf6ddf8150fb2
Author: abhishekmjain <[email protected]>
AuthorDate: Fri Dec 13 13:30:17 2024 +0530

    [GOBBLIN-2177] Avoid shutting down QueueProcessor on 
non-InterruptedException (#4080)
---
 .../gobblin/runtime/HighLevelConsumerTest.java     | 29 ++++++++++++++++++++++
 .../gobblin/runtime/kafka/HighLevelConsumer.java   | 23 ++++++++++++-----
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index bb0b96f903..1ec5bbbd88 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.KafkaTestBase;
 import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
 import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
 import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
@@ -176,6 +177,34 @@ public class HighLevelConsumerTest extends KafkaTestBase {
     Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
   }
 
+  @Test
+  public void testQueueProcessorRuntimeExceptionEncounteredAutoCommitEnabled() 
throws Exception {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+    //Generate a brand new consumer group id to ensure there are no previously 
committed offsets for this group id
+    String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", 
System.currentTimeMillis());
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
+    consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, 
"true");
+
+    // Create an instance of MockedHighLevelConsumer using an anonymous class
+    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, 
ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) {
+      int callCount = 0;
+      @Override
+      public void processMessage(DecodeableKafkaRecord<byte[], byte[]> 
message) {
+        super.processMessage(message);
+        // Override the method to throw a custom exception
+        throw new RuntimeException("Simulated exception in processMessage");
+      }
+    };
+    consumer.startAsync().awaitRunning();
+
+    // Assert all NUM_MSGS messages were processed
+    consumer.awaitExactlyNMessages(NUM_MSGS, 10000);
+    consumer.shutDown();
+  }
+
   private List<byte[]> createByteArrayMessages() {
     List<byte[]> records = Lists.newArrayList();
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index ea996b5fb6..64bf9b8102 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -340,18 +340,29 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
         while (true) {
           record = queue.take();
           messagesRead.inc();
-          HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
-          recordsProcessed.incrementAndGet();
+          try {
+            HighLevelConsumer.this.processMessage((DecodeableKafkaRecord) 
record);
+            recordsProcessed.incrementAndGet();
+          }
+          catch (Exception e) {
+            // Rethrow exception in case auto commit is disabled
+            if (!HighLevelConsumer.this.enableAutoCommit) {
+              throw e;
+            }
+            // Continue with processing next records in case auto commit is 
enabled
+            log.error("Encountered exception while processing record. Record: 
{} Exception: {}", record, e);
+          }
 
-          if(!HighLevelConsumer.this.enableAutoCommit) {
-            KafkaPartition partition = new 
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
+          if (!HighLevelConsumer.this.enableAutoCommit) {
+            KafkaPartition partition =
+                new 
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic)
+                    .build();
             // Committed offset should always be the offset of the next record 
to be read (hence +1)
             partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
           }
         }
-      } catch (InterruptedException e) {
+      } catch(InterruptedException e){
         log.warn("Thread interrupted while processing queue ", e);
-        // TODO: evaluate whether we should interrupt the thread or continue 
processing
         Thread.currentThread().interrupt();
       } catch (Exception e) {
         log.error("Encountered exception while processing record so stopping 
queue processing. Record: {} Exception: {}", record, e);

Reply via email to