phet commented on code in PR #4080: URL: https://github.com/apache/gobblin/pull/4080#discussion_r1867178165
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -336,8 +336,8 @@ public QueueProcessor(BlockingQueue queue) { public void run() { log.info("Starting queue processing.. " + Thread.currentThread().getName()); KafkaConsumerRecord record = null; - try { - while (true) { + while (true) { + try { record = queue.take(); Review Comment: now an exception would cause us to skip processing the message (`record`), which doesn't seem like what we'd want, especially when using this class for kafka ETL. shall we enclose only `processMessage` in a `try/catch`? ########## gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java: ########## @@ -176,6 +177,33 @@ public void testCalculateProduceToConsumeLag() { Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L)); } + @Test + public void testQueueProcessorRuntimeExceptionEncountered() throws Exception { + Properties consumerProps = new Properties(); + consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers); + consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + KAFKA_AUTO_OFFSET_RESET_KEY, "earliest"); + //Generate a brand new consumer group id to ensure there are no previously committed offsets for this group id + String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", System.currentTimeMillis()); + consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + HighLevelConsumer.GROUP_ID_KEY, consumerGroupId); + consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, "true"); + + // Create an instance of MockedHighLevelConsumer using an anonymous class + MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) { + @Override + public void processMessage(DecodeableKafkaRecord<byte[], byte[]> message) { + super.processMessage(message); + // Override the method to throw a custom exception + throw new RuntimeException("Simulated exception in processMessage"); + } + }; + consumer.startAsync().awaitRunning(); + + // assert all NUM_MSGS messages were processed. + consumer.awaitExactlyNMessages(NUM_MSGS, 10000); Review Comment: as mentioned in the other comment, this verifies that `NUM_MSGS` are seen, but each is seen only once (when it fails) and is never retried. that is, it's skipped. e.g. if you were to alternate to have the mocked `processMessage` throw an exception every other time, I believe you'd count N calls to `processMessage` and the commit offsets would increase by `NUM_MSGS`, but to demonstrate NO SKIPPING, you'd want `processMessage` to be called either `(2 * NUM_MSGS)` or `((2 * NUM_MSGS) - 1)` (depending on whether failure on odd or even invocations) ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java: ########## @@ -348,13 +348,15 @@ record = queue.take(); // Committed offset should always be the offset of the next record to be read (hence +1) partitionOffsetsToCommit.put(partition, record.getOffset() + 1); } + } catch (InterruptedException e) { + // Stop queue processor and return when encountered InterruptedException + log.warn("Thread interrupted while processing queue ", e); + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + // Log the error and let the queue processor continue processing + log.error("Encountered exception while processing record. Record: {} Exception: {}", record, e); } Review Comment: I agree that previously exiting `Runnable.run` upon `Exception` was surprising and subtle and generally not helpful, but this alternative could just put us into a continuous loop where we might encounter the same error "forever". should we cap the number of retries using something like a `Retryer`, as [here](https://github.com/apache/gobblin/blob/45ad13e07ebe212002b279132e1fa4d121a9294f/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java#L123)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org