abhishekmjain commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1867036534
########## gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java: ########## @@ -176,6 +177,33 @@ public void testCalculateProduceToConsumeLag() { Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L)); } + @Test + public void testQueueProcessorRuntimeExceptionEncountered() throws Exception { + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest"); + //Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id + String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis()); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId); + consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true"); + + // Create an instance of MockedHighLevelConsumer using an anonymous class + MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) { + @Override + public void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) { + super.processMessage(message); + // Override the method to throw a custom exception + throw new RuntimeException("Simulated exception in processMessage"); + } + }; + consumer.startAsync().awaitRunning(); + + // assert all NUM_MSGS messages were processed. + consumer.awaitExactlyNMessages(NUM_MSGS, 10000); Review Comment: I tried running this test without the change in PR, it failed with timeout waiting for 10 messages  The reason is we want to process the message even if an exception is encountered. In older world the `processMessage` gets called only once. The loop breaks as soon as an exception is encountered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org