abhishekmjain commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1867036534


##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java:
##########
@@ -176,6 +177,33 @@ public void testCalculateProduceToConsumeLag() {
     Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
   }
 
+  @Test
+  public void testQueueProcessorRuntimeExceptionEncountered() throws Exception 
{
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+    //Generate a brand new consumer group id to ensure there are no previously 
committed offsets for this group id
+    String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", 
System.currentTimeMillis());
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
+    consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, 
"true");
+
+    // Create an instance of MockedHighLevelConsumer using an anonymous class
+    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, 
ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) {
+      @Override
+      public void processMessage(DecodeableKafkaRecord<byte[], byte[]> 
message) {
+        super.processMessage(message);
+        // Override the method to throw a custom exception
+        throw new RuntimeException("Simulated exception in processMessage");
+      }
+    };
+    consumer.startAsync().awaitRunning();
+
+    // assert all NUM_MSGS messages were processed.
+    consumer.awaitExactlyNMessages(NUM_MSGS, 10000);

Review Comment:
   I tried running this test without the change in PR, it failed with timeout 
waiting for 10 messages
   
   
![image](https://github.com/user-attachments/assets/6d04e8c2-26f6-4fc6-8015-0a5c20c70bbb)
   
   The reason is we want to process the message even if an exception is 
encountered. In older world the `processMessage` gets called only once. The 
loop breaks as soon as an exception is encountered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to