FrankChen021 opened a new issue #11658: URL: https://github.com/apache/druid/issues/11658
### Affected Version Since 0.16 ### Description There's a configuration `resetOffsetAutomatically` in `KafkaIndexTaskTuningConfig` that allows Kafka offset to be reset automatically once the Kafka offset is out of range. The error that offset is out of range typically occurs when messages in Kafka expires before the Druid ingestion task reads data from Kafka. But current automatic resetting implementation uses a wrong offset to reset. That means the resetting does no take effect and causes another out of range error, and then automatic resetting is called again. The ingestion task falls into a dead loop. ### Problem Analysis https://github.com/apache/druid/blob/59d257816b85dbeeca336b8e25d341d67bbc5697/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java#L134-L155 From the code(Line 148, Line 154) above we can see that, a variable `nextOffset` is used for automatic resetting. But this variable holds the offset we're currently reading from Kafka, and this is the offset that causes out of range exception(Line 134). This means automatic resetting uses the offset which causes out of range to reset the offset. Of course, this resetting won't help and causes another out of range exception in the next round of polling messages from Kafka. ### How to fix To fix this problem, the `leastAvailableOffset` variable should be used to reset the offset. Since there's a check(Line 152) that guarantees that the `leastAvailableOffset` is greater than current reading offset, the automatic resetting also won't causes data duplication. The fixes looks like as follows ```java if (leastAvailableOffset > nextOffset) { doReset = true; resetPartitions.put(topicPartition, leastAvailableOffset); recordSupplier.seek(streamPartition, leastAvailableOffset); } ``` I will open a PR to fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
