khandelwal-prateek commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1866230182
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -348,13 +348,15 @@ record = queue.take(); // Committed offset should always be the offset of the next record to be read (hence +1) partitionOffsetsToCommit.put(partition, record.getOffset() + 1); } + } catch (InterruptedException e) { + // Stop queue processor and return when encountered InterruptedException + log.warn("Thread interrupted while processing queue ", e); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + // Log the error and let the queue processor continue processing + log.error("Encountered exception while processing record so stopping queue processing. Record: {} Exception: {}", record, e); Review Comment: update `log.error(.. so stopping queue processing..)` as the thread continues to process after the change ########## gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java: ########## @@ -176,6 +177,33 @@ public void testCalculateProduceToConsumeLag() { Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L)); } + @Test + public void testQueueProcessorRuntimeExceptionEncountered() throws Exception { + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest"); + //Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id + String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis()); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId); + consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true"); + + // Create an instance of MockedHighLevelConsumer using an anonymous class + MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) { + @Override + public void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) { + super.processMessage(message); + // Override the method to throw a custom exception + throw new RuntimeException("Simulated exception in processMessage"); + } + }; + consumer.startAsync().awaitRunning(); + + // assert all NUM_MSGS messages were processed. + consumer.awaitExactlyNMessages(NUM_MSGS, 10000); Review Comment: I am not sure if this method tests the change. iiuc this method would have worked without the change also as `processMessage` adds to the queue before throwing exception and this method call just checks the queue size. Also, add the test with `enable.auto.commit: false` and verify that exception doesn't cause processing to stop and that the same message is retried to validate that the message is not skipped. ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -336,8 +336,8 @@ public QueueProcessor(BlockingQueue queue) { public void run() { log.info("Starting queue processing.. " + Thread.currentThread().getName()); KafkaConsumerRecord record = null; - try { - while (true) { + while (!shutdownRequested) { Review Comment: The shutdownRequested flag is set to true before calling `ExecutorsUtils.shutdownExecutorService(.., 5000, TimeUnit.MILLISECONDS);` on shutdown i.e. currently, the executor waits for 5 seconds on shutdown to process any pending messages, however, this change would will stop processing messages immediately. If we want to continue/retry processing on all exceptions, this can be changed to `while (!Thread.currentThread().isInterrupted()) { ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org