FrankChen021 opened a new issue #11658:
URL: https://github.com/apache/druid/issues/11658


   ### Affected Version
   
   Since 0.16
   
   ### Description
   
   There's a configuration `resetOffsetAutomatically` in 
`KafkaIndexTaskTuningConfig` that allows Kafka offset to be reset automatically 
once the Kafka offset is out of range. The error that offset is out of range 
typically occurs when messages in Kafka expires before the Druid ingestion task 
reads data from Kafka. 
   
   But current automatic resetting implementation uses a wrong offset to reset. 
That means the resetting does no take effect and causes another out of range 
error, and then automatic resetting is called again. The ingestion task falls 
into a dead loop.
   
   ### Problem Analysis
   
   
https://github.com/apache/druid/blob/59d257816b85dbeeca336b8e25d341d67bbc5697/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java#L134-L155
   
   From the code(Line 148, Line 154) above we can see that, a variable 
`nextOffset` is used for automatic resetting. But this variable holds the 
offset we're currently reading from Kafka, and this is the offset that causes 
out of range exception(Line 134).
   
   This means automatic resetting uses the offset which causes out of range to 
reset the offset. Of course, this resetting won't help and causes another out 
of range exception in the next round of polling messages from Kafka.
   
   ### How to fix
   
   To fix this problem, the `leastAvailableOffset` variable should be used to 
reset the offset. Since there's a check(Line 152) that guarantees that the 
`leastAvailableOffset` is greater than current reading offset, the automatic 
resetting also won't causes data duplication. The fixes looks like as follows
   
   ```java
           if (leastAvailableOffset > nextOffset) {
             doReset = true;
             resetPartitions.put(topicPartition, leastAvailableOffset);
   
             recordSupplier.seek(streamPartition, leastAvailableOffset);
           }
   ```
   
   I will open a PR to fix this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to