FrankChen021 opened a new pull request, #18226:
URL: https://github.com/apache/druid/pull/18226
Fixes #14344
### Description
When `OffsetOutOfRangeException` is raised, previously, it waits until task
completes. This brings problems for operators that it may take long time to
observe such exception is raised.
Now the exception handling is improved if auto reset is not enabled:
1. all ingested messages before the expired offset will be persisted and
published
2. task will be marked as FAIL so that operators/users can take actions
against problematic topics as early as possible
#### Release note
Tasks that encounters `OffsetOutOfRangeException` exception will fail
immediately
####
Follow these steps to test:
1. set up a topic in Kafka
2. config the topic's retention time/retension size to a very small value
```bash
./kafka-configs.sh --bootstrap-server localhost:9092 --entity-type
topics --entity-name a --alter --add-config retention.ms=10000
retention.bytes=16384
```
4. set up a kafka ingestion task in Druid
6. use `kafka-producer-perf-test.sh` to generate events to the test topic in
very large throughput
```bash
sh kafka-producer-perf-test.sh --topic a --print-metrics --payload-file
payload.txt --producer-props bootstrap.servers=localhost:9092 --num-records
1000000000 --throughput 10000000
```
after a while when kafka server deletes offset, we can see from task log,
task pushed segments and fail at last
```
2025-07-09T07:53:14,267 INFO [task-runner-0-priority-0]
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Pushing
[1] segments in background
2025-07-09T07:53:14,267 INFO [task-runner-0-priority-0]
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Pushing
segments:
[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
2025-07-09T07:53:14,267 INFO [task-runner-0-priority-0]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted
rows[0] and (estimated) bytes[0]
2025-07-09T07:53:14,267 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Flushed
in-memory data with commit metadata
[AppenderatorDriverMetadata{segments={index_kafka_a_efbb0759e7aa99a_1=[SegmentWithState{segmentIdentifier=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7,
state=APPENDING}]},
lastSegmentIds={index_kafka_a_efbb0759e7aa99a_1=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7},
callerMetadata={nextPartitions=SeekableStreamStartSequenceNumbers{stream='a',
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null',
multiTopicPartition=false}=3009454933}, exclusivePartitions=[]},
publishPartitions=SeekableStreamEndSequenceNumbers{stream='a',
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null',
multiTopicPartition=false}=3009454933}}}}] for segments:
2025-07-09T07:53:14,267 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted
stats: processed rows: [9442198], persisted rows[0], sinks: [2], total
fireHydrants (across sinks): [64], persisted fireHydrants (across sinks): [0]
2025-07-09T07:53:14,267 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-merge]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Preparing
to push (stats): processed rows: [9442198], sinks: [1], fireHydrants (across
sinks): [30]
2025-07-09T07:53:14,426 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-merge]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator -
Segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
of 305,093 bytes built from 30 incremental persist(s) in 155ms; pushed to deep
storage in 2ms. Load spec is:
{"type":"local","path":"/Users/frank/source/open/druid/distribution/apache-druid-33.0.0/var/druid/segments/a/2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z/2025-07-08T06:55:43.564Z/7/e5a7be8f-02b8-4b51-a967-a0a8ee3708b4/index/"}
2025-07-09T07:53:14,426 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-merge]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Push
complete...
2025-07-09T07:53:14,438 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish]
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver -
Published [1] segments with commit
metadata[{nextPartitions=SeekableStreamStartSequenceNumbers{stream='a',
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null',
multiTopicPartition=false}=3009454933}, exclusivePartitions=[]},
publishPartitions=SeekableStreamEndSequenceNumbers{stream='a',
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null',
multiTopicPartition=false}=3009454933}}}].
2025-07-09T07:53:14,438 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish]
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver -
Published segments:
[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
2025-07-09T07:53:14,438 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish]
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver -
Published segment schemas[SegmentSchemaMapping{segmentIdToMetadataMap={},
schemaFingerprintToPayloadMap={}, version='1'}].
2025-07-09T07:53:14,438 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Published 1 segments for sequence [index_kafka_a_efbb0759e7aa99a_1] with
metadata
[AppenderatorDriverMetadata{segments={index_kafka_a_efbb0759e7aa99a_1=[SegmentWithState{segmentIdentifier=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7,
state=APPENDING}]},
lastSegmentIds={index_kafka_a_efbb0759e7aa99a_1=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7},
callerMetadata={nextPartitions=SeekableStreamStartSequenceNumbers{stream='a',
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null',
multiTopicPartition=false}=3009454933}, exclusivePartitions=[]},
publishPartitions=SeekableStreamEndSequenceNumbers{stream='a',
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null',
multiTopicPartition=false}=3009454933}}}}].
2025-07-09T07:53:14,438 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Published segments:
[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
2025-07-09T07:53:14,439 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Saved
sequence metadata to disk: []
2025-07-09T07:53:29,504 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
at
path[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:52:28.251Z_85b89ba69fc64644816411d28bf6ee161]
2025-07-09T07:53:29,505 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Unannounced
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7],
scheduling drop in [30000] millisecs
2025-07-09T07:53:29,505 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver -
Successfully handed off [1] segments.
2025-07-09T07:53:29,510 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_6]
at
path[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:52:28.251Z_85b89ba69fc64644816411d28bf6ee161]
2025-07-09T07:53:29,510 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.curator.announcement.Announcer - Unannouncing
[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:52:28.251Z_85b89ba69fc64644816411d28bf6ee161]
2025-07-09T07:53:29,523 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Unannounced
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_6],
scheduling drop in [30000] millisecs
2025-07-09T07:53:29,523 INFO
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist]
org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver -
Successfully handed off [1] segments.
2025-07-09T07:53:29,527 INFO [task-runner-0-priority-0]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Handoff complete for segments: DataSegment{binaryVersion=9,
id=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_6,
loadSpec={type=>local,
path=>/Users/frank/source/open/druid/distribution/apache-druid-33.0.0/var/druid/segments/a/2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z/2025-07-08T06:55:43.564Z/6/40c3824e-0308-4385-9595-c69287926478/index/},
dimensions=[value], metrics=[], shardSpec=NumberedShardSpec{partitionNum=6,
partitions=0}, lastCompactionState=null, size=343313}
2025-07-09T07:53:29,527 INFO [task-runner-0-priority-0]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Handoff complete for segments: DataSegment{binaryVersion=9,
id=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7,
loadSpec={type=>local,
path=>/Users/frank/source/open/druid/distribution/apache-druid-33.0.0/var/druid/segments/a/2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z/2025-07-08T06:55:43.564Z/7/e5a7be8f-02b8-4b51-a967-a0a8ee3708b4/index/},
dimensions=[value], metrics=[], shardSpec=NumberedShardSpec{partitionNum=7,
partitions=0}, lastCompactionState=null, size=305093}
2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Closing reporter
org.apache.kafka.common.metrics.JmxReporter
2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Closing reporter
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0]
org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
2025-07-09T07:53:29,534 INFO [task-runner-0-priority-0]
org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for
consumer-kafka-supervisor-ofmlhdlj-1 unregistered
2025-07-09T07:53:29,534 INFO [task-runner-0-priority-0]
org.apache.druid.curator.announcement.Announcer - Unannouncing
[/druid/internal-discovery/PEON/localhost:8100]
2025-07-09T07:53:29,540 INFO [task-runner-0-priority-0]
org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced self
[{"druidNode":{"service":"druid/middleManager","host":"localhost","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","serverType":"indexer-executor","priority":0},"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__default"}},"startTime":"2025-07-09T07:51:29.426Z"}].
2025-07-09T07:53:29,541 INFO [task-runner-0-priority-0]
org.apache.druid.curator.announcement.Announcer - Unannouncing
[/druid/announcements/localhost:8100]
2025-07-09T07:53:29,549 INFO [task-runner-0-priority-0]
org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed
with status: {
"id" : "index_kafka_a_efbb0759e7aa99a_oinmidfh",
"status" : "FAILED",
"duration" : 120144,
"errorMsg" : "Fetch position FetchPosition{offset=3009454933,
offsetEpoch=Optional[0],
currentLeader=LeaderAndEpoch{leader=Optional[10.22.64.229:9092 (id: 0 rack:
null)], epoch=0}} is out of range for partition a-0\nThis may happen when given
offsets have been deleted at the Kafka server due to the retention
configuration. \nYou can use supervisor's reset API to set the offset to a
valid position.",
"location" : {
"host" : null,
"port" : -1,
"tlsPort" : -1
}
}
2025-07-09T07:53:29,551 INFO [main]
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle
[module] stage [ANNOUNCEMENTS]
2025-07-09T07:53:29,554 INFO [main]
org.apache.druid.curator.announcement.Announcer - Unannouncing
[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:51:29.441Z_4a9f27198f324d99a3832da84e8fcc8f0]
2025-07-09T07:53:29,560 INFO [main]
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle
[module] stage [SERVER]
2025-07-09T07:53:29,564 INFO [main]
org.eclipse.jetty.server.AbstractConnector - Stopped
ServerConnector@118f2486{HTTP/1.1, (http/1.1)}{0.0.0.0:8100}
2025-07-09T07:53:29,565 INFO [main] org.eclipse.jetty.server.session - node0
Stopped scavenging
2025-07-09T07:53:29,566 INFO [main]
org.eclipse.jetty.server.handler.ContextHandler - Stopped
o.e.j.s.ServletContextHandler@61ec2cb5{/,null,STOPPED}
2025-07-09T07:53:29,567 INFO [main]
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting
graceful shutdown of task[index_kafka_a_efbb0759e7aa99a_oinmidfh].
2025-07-09T07:53:29,567 INFO [main]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Stopping forcefully (status: [PUBLISHING])
2025-07-09T07:53:29,567 INFO [main]
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle
[module] stage [NORMAL]
2025-07-09T07:53:29,567 INFO [main]
org.apache.druid.server.coordination.SegmentBootstrapper - Stopping...
2025-07-09T07:53:29,567 INFO [main]
org.apache.druid.server.coordination.SegmentBootstrapper - Stopped.
2025-07-09T07:53:29,567 INFO [main]
org.apache.druid.server.coordination.ZkCoordinator - Stopping ZkCoordinator for
[DruidServerMetadata{name='localhost:8100', hostAndPort='localhost:8100',
hostAndTlsPort='null', maxSize=0, tier='_default_tier', type=indexer-executor,
priority=0}]
2025-07-09T07:53:29,571 INFO
[LookupExtractorFactoryContainerProvider-MainThread]
org.apache.druid.query.lookup.LookupReferencesManager - Lookup Management loop
exited. Lookup notices are not handled anymore.
2025-07-09T07:53:29,572 INFO [Curator-Framework-0]
org.apache.curator.framework.imps.CuratorFrameworkImpl -
backgroundOperationsLoop exiting
2025-07-09T07:53:29,680 INFO [main] org.apache.zookeeper.ZooKeeper -
Session: 0x100120c32ee0040 closed
2025-07-09T07:53:29,680 INFO [main-EventThread]
org.apache.zookeeper.ClientCnxn - EventThread shut down for session:
0x100120c32ee0040
2025-07-09T07:53:29,683 INFO [main]
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle
[module] stage [INIT]
```
On the task view, it's state is displayed as FAIIL

The status of task records the out of range exception

This PR has:
- [X] been self-reviewed.
- [ ] using the [concurrency
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
(Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [ ] added integration tests.
- [X] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]