[ https://issues.apache.org/jira/browse/GOBBLIN-2177?focusedWorklogId=947093&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-947093 ]
ASF GitHub Bot logged work on GOBBLIN-2177: ------------------------------------------- Author: ASF GitHub Bot Created on: 06/Dec/24 18:05 Start Date: 06/Dec/24 18:05 Worklog Time Spent: 10m Work Description: abhishekmjain commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1873798459 ########## gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java: ########## @@ -176,6 +177,33 @@ public void testCalculateProduceToConsumeLag() { Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L)); } + @Test + public void testQueueProcessorRuntimeExceptionEncountered() throws Exception { + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest"); + //Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id + String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis()); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId); + consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true"); + + // Create an instance of MockedHighLevelConsumer using an anonymous class + MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) { + @Override + public void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) { + super.processMessage(message); + // Override the method to throw a custom exception + throw new RuntimeException("Simulated exception in processMessage"); + } + }; + consumer.startAsync().awaitRunning(); + + // assert all NUM_MSGS messages were processed. + consumer.awaitExactlyNMessages(NUM_MSGS, 10000); Review Comment: Updated the PR to fix it only when `enable.auto.commit` is `true`. Issue Time Tracking ------------------- Worklog Id: (was: 947093) Time Spent: 2h 20m (was: 2h 10m) > Avoid stopping Kafka HighLevelConsumer - QueueProcessor on > non-InterruptedExceptions > ------------------------------------------------------------------------------------ > > Key: GOBBLIN-2177 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2177 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Abhishek Jain > Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > The QueueProcessor within HighLevelConsumer contains an infinite while loop > that is enclosed in a try-catch block. When any exception is encountered, > this loop breaks, which halts the processing of any consumed messages until > the service is restarted. > We should not break this infinite loop on all exceptions; rather, we should > break it only on InterruptedException, which truly means the QueueProcessor > should stop processing. -- This message was sent by Atlassian Jira (v8.20.10#820010)