This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2595f6c328 [GOBBLIN-2177] Avoid shutting down QueueProcessor on
non-InterruptedException (#4080)
2595f6c328 is described below
commit 2595f6c328df95599634e0f95d9bf6ddf8150fb2
Author: abhishekmjain <[email protected]>
AuthorDate: Fri Dec 13 13:30:17 2024 +0530
[GOBBLIN-2177] Avoid shutting down QueueProcessor on
non-InterruptedException (#4080)
---
.../gobblin/runtime/HighLevelConsumerTest.java | 29 ++++++++++++++++++++++
.../gobblin/runtime/kafka/HighLevelConsumer.java | 23 ++++++++++++-----
2 files changed, 46 insertions(+), 6 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index bb0b96f903..1ec5bbbd88 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.AbstractBaseKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.kafka.writer.Kafka09DataWriter;
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
@@ -176,6 +177,34 @@ public class HighLevelConsumerTest extends KafkaTestBase {
Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
}
+ @Test
+ public void testQueueProcessorRuntimeExceptionEncounteredAutoCommitEnabled()
throws Exception {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+ //Generate a brand new consumer group id to ensure there are no previously
committed offsets for this group id
+ String consumerGroupId = Joiner.on("-").join(TOPIC, "auto",
System.currentTimeMillis());
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
+ consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY,
"true");
+
+ // Create an instance of MockedHighLevelConsumer using an anonymous class
+ MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC,
ConfigUtils.propertiesToConfig(consumerProps), NUM_PARTITIONS) {
+ int callCount = 0;
+ @Override
+ public void processMessage(DecodeableKafkaRecord<byte[], byte[]>
message) {
+ super.processMessage(message);
+ // Override the method to throw a custom exception
+ throw new RuntimeException("Simulated exception in processMessage");
+ }
+ };
+ consumer.startAsync().awaitRunning();
+
+ // Assert all NUM_MSGS messages were processed
+ consumer.awaitExactlyNMessages(NUM_MSGS, 10000);
+ consumer.shutDown();
+ }
+
private List<byte[]> createByteArrayMessages() {
List<byte[]> records = Lists.newArrayList();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index ea996b5fb6..64bf9b8102 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -340,18 +340,29 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
while (true) {
record = queue.take();
messagesRead.inc();
- HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)record);
- recordsProcessed.incrementAndGet();
+ try {
+ HighLevelConsumer.this.processMessage((DecodeableKafkaRecord)
record);
+ recordsProcessed.incrementAndGet();
+ }
+ catch (Exception e) {
+ // Rethrow exception in case auto commit is disabled
+ if (!HighLevelConsumer.this.enableAutoCommit) {
+ throw e;
+ }
+ // Continue with processing next records in case auto commit is
enabled
+ log.error("Encountered exception while processing record. Record:
{} Exception: {}", record, e);
+ }
- if(!HighLevelConsumer.this.enableAutoCommit) {
- KafkaPartition partition = new
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic).build();
+ if (!HighLevelConsumer.this.enableAutoCommit) {
+ KafkaPartition partition =
+ new
KafkaPartition.Builder().withId(record.getPartition()).withTopicName(HighLevelConsumer.this.topic)
+ .build();
// Committed offset should always be the offset of the next record
to be read (hence +1)
partitionOffsetsToCommit.put(partition, record.getOffset() + 1);
}
}
- } catch (InterruptedException e) {
+ } catch(InterruptedException e){
log.warn("Thread interrupted while processing queue ", e);
- // TODO: evaluate whether we should interrupt the thread or continue
processing
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Encountered exception while processing record so stopping
queue processing. Record: {} Exception: {}", record, e);