GWphua commented on code in PR #18226:
URL: https://github.com/apache/druid/pull/18226#discussion_r2810380561
##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskRunner.java:
##########
@@ -93,12 +94,17 @@ protected
List<OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecord
return recordSupplier.poll(task.getIOConfig().getPollTimeout());
}
catch (OffsetOutOfRangeException e) {
- //
- // Handles OffsetOutOfRangeException, which is thrown if the seeked-to
- // offset is not present in the topic-partition. This can happen if
we're asking a task to read from data
- // that has not been written yet (which is totally legitimate). So let's
wait for it to show up
- //
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
+
+ if (!task.getTuningConfig().isResetOffsetAutomatically()) {
+ throw TaskStatusException.fail(
+ e.getMessage()
+ + "\nThis may happen when given offsets have been deleted at the
Kafka server due to the retention configuration. "
+ + "\nYou can use supervisor's reset API to set the offset to a
valid position."
+ );
+ }
Review Comment:
Since this block fails with `isResetOffsetAutomatically`, I think we can
also include a message to tell users that we can set a config to reset offset
automatically, instead of needing to manually reset everytime.
Something like:
... use supervisor's reset API to set the offset to a valid position, or
turn on `resetOffsetAutomatically`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]