phet commented on code in PR #4080:
URL: https://github.com/apache/gobblin/pull/4080#discussion_r1867178165


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -336,8 +336,8 @@ public QueueProcessor(BlockingQueue queue) {
     public void run() {
       log.info("Starting queue processing.. " + 
Thread.currentThread().getName());
       KafkaConsumerRecord record = null;
-      try {
-        while (true) {
+      while (true) {
+        try {
           record = queue.take();

Review Comment:
   now an exception would cause us to skip processing the message (`record`), 
which doesn't seem like what we'd want, especially when using this class for 
kafka ETL.
   
   shall we enclose only `processMessage` in a `try/catch`?



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java:
##########
@@ -176,6 +177,33 @@ public void testCalculateProduceToConsumeLag() {
     Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
   }
 
+  @Test
+  public void testQueueProcessorRuntimeExceptionEncountered() throws Exception 
{
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+    //Generate a brand new consumer group id to ensure there are no previously 
committed offsets for this group id
+    String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", 
System.currentTimeMillis());
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
+    consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, 
"true");
+
+    // Create an instance of MockedHighLevelConsumer using an anonymous class
+    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, 
ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) {
+      @Override
+      public void processMessage(DecodeableKafkaRecord<byte[], byte[]> 
message) {
+        super.processMessage(message);
+        // Override the method to throw a custom exception
+        throw new RuntimeException("Simulated exception in processMessage");
+      }
+    };
+    consumer.startAsync().awaitRunning();
+
+    // assert all NUM_MSGS messages were processed.
+    consumer.awaitExactlyNMessages(NUM_MSGS, 10000);

Review Comment:
   as mentioned in the other comment, this verifies that `NUM_MSGS` are seen, 
but each is seen only once (when it fails) and is never retried.  that is, it's 
skipped.
   
   e.g. if you were to alternate to have the mocked `processMessage` throw an 
exception every other time, I believe you'd count N calls to `processMessage` 
and the commit offsets would increase by `NUM_MSGS`, but to demonstrate NO 
SKIPPING, you'd want `processMessage` to be called either `(2 * NUM_MSGS)` or 
`((2 * NUM_MSGS) - 1)` (depending on whether failure on odd or even invocations)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java:
##########
@@ -348,13 +348,15 @@ record = queue.take();
             // Committed offset should always be the offset of the next record 
to be read (hence +1)
             partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
           }
+        } catch (InterruptedException e) {
+          // Stop queue processor and return when encountered 
InterruptedException
+          log.warn("Thread interrupted while processing queue ", e);
+          Thread.currentThread().interrupt();
+          return;
+        } catch (Exception e) {
+          // Log the error and let the queue processor continue processing
+          log.error("Encountered exception while processing record. Record: {} 
Exception: {}", record, e);
         }

Review Comment:
   I agree that previously exiting `Runnable.run` upon `Exception` was 
surprising and subtle and generally not helpful, but this alternative could 
just put us into a continuous loop where we might encounter the same error 
"forever".  should we cap the number of retries using something like a 
`Retryer`, as 
[here](https://github.com/apache/gobblin/blob/45ad13e07ebe212002b279132e1fa4d121a9294f/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java#L123)?
  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to