[ https://issues.apache.org/jira/browse/GOBBLIN-2177?focusedWorklogId=946445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946445 ]
ASF GitHub Bot logged work on GOBBLIN-2177: ------------------------------------------- Author: ASF GitHub Bot Created on: 03/Dec/24 04:34 Start Date: 03/Dec/24 04:34 Worklog Time Spent: 10m Work Description: abhishekmjain commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1867036534 ########## gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java: ########## @@ -176,6 +177,33 @@ public void testCalculateProduceToConsumeLag() { Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L)); } + @Test + public void testQueueProcessorRuntimeExceptionEncountered() throws Exception { + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest"); + //Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id + String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis()); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId); + consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true"); + + // Create an instance of MockedHighLevelConsumer using an anonymous class + MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) { + @Override + public void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) { + super.processMessage(message); + // Override the method to throw a custom exception + throw new RuntimeException("Simulated exception in processMessage"); + } + }; + consumer.startAsync().awaitRunning(); + + // assert all NUM_MSGS messages were processed. + consumer.awaitExactlyNMessages(NUM_MSGS, 10000); Review Comment: I tried running this test without the change in PR, it failed with timeout waiting for 10 messages  The reason is we want to process the message even if an exception is encountered. In older world the `processMessage` gets called only once. The loop breaks as soon as an exception is encountered. Issue Time Tracking ------------------- Worklog Id: (was: 946445) Time Spent: 0.5h (was: 20m) > Avoid stopping Kafka HighLevelConsumer - QueueProcessor on > non-InterruptedExceptions > ------------------------------------------------------------------------------------ > > Key: GOBBLIN-2177 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2177 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Abhishek Jain > Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > The QueueProcessor within HighLevelConsumer contains an infinite while loop > that is enclosed in a try-catch block. When any exception is encountered, > this loop breaks, which halts the processing of any consumed messages until > the service is restarted. > We should not break this infinite loop on all exceptions; rather, we should > break it only on InterruptedException, which truly means the QueueProcessor > should stop processing. -- This message was sent by Atlassian Jira (v8.20.10#820010)