FrankChen021 opened a new pull request, #18226:
URL: https://github.com/apache/druid/pull/18226

   Fixes #14344 
   
   ### Description
   
   When `OffsetOutOfRangeException` is raised, previously, it waits until task 
completes. This brings problems for operators that it may take long time to 
observe such exception is raised.
   
   Now the exception handling is improved if auto reset is not enabled:
   1. all ingested messages before the expired offset will be persisted and 
published
   2. task will be marked as FAIL so that operators/users can take actions 
against problematic topics as early as possible
   
   #### Release note
   Tasks that encounters `OffsetOutOfRangeException` exception will fail 
immediately
   
   
   ####
   Follow these steps to test:
   1. set up a topic in Kafka
   2. config the topic's retention time/retension size to a very small value
       ```bash
       ./kafka-configs.sh --bootstrap-server localhost:9092 --entity-type 
topics --entity-name a --alter --add-config retention.ms=10000 
retention.bytes=16384
       ```
   4. set up a kafka ingestion task in Druid
   6. use `kafka-producer-perf-test.sh` to generate events to the test topic in 
very large throughput
   
   ```bash
   sh kafka-producer-perf-test.sh --topic a --print-metrics --payload-file 
payload.txt --producer-props bootstrap.servers=localhost:9092 --num-records 
1000000000 --throughput 10000000
   ```
   
   after a while when kafka server deletes offset, we can see from task log, 
task pushed segments and fail at last
   
   ```
   2025-07-09T07:53:14,267 INFO [task-runner-0-priority-0] 
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Pushing 
[1] segments in background
   2025-07-09T07:53:14,267 INFO [task-runner-0-priority-0] 
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - Pushing 
segments: 
[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
   2025-07-09T07:53:14,267 INFO [task-runner-0-priority-0] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted 
rows[0] and (estimated) bytes[0]
   2025-07-09T07:53:14,267 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Flushed 
in-memory data with commit metadata 
[AppenderatorDriverMetadata{segments={index_kafka_a_efbb0759e7aa99a_1=[SegmentWithState{segmentIdentifier=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7,
 state=APPENDING}]}, 
lastSegmentIds={index_kafka_a_efbb0759e7aa99a_1=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7},
 callerMetadata={nextPartitions=SeekableStreamStartSequenceNumbers{stream='a', 
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', 
multiTopicPartition=false}=3009454933}, exclusivePartitions=[]}, 
publishPartitions=SeekableStreamEndSequenceNumbers{stream='a', 
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', 
multiTopicPartition=false}=3009454933}}}}] for segments: 
   2025-07-09T07:53:14,267 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Persisted 
stats: processed rows: [9442198], persisted rows[0], sinks: [2], total 
fireHydrants (across sinks): [64], persisted fireHydrants (across sinks): [0]
   2025-07-09T07:53:14,267 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-merge] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Preparing 
to push (stats): processed rows: [9442198], sinks: [1], fireHydrants (across 
sinks): [30]
   2025-07-09T07:53:14,426 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-merge] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - 
Segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
 of 305,093 bytes built from 30 incremental persist(s) in 155ms; pushed to deep 
storage in 2ms. Load spec is: 
{"type":"local","path":"/Users/frank/source/open/druid/distribution/apache-druid-33.0.0/var/druid/segments/a/2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z/2025-07-08T06:55:43.564Z/7/e5a7be8f-02b8-4b51-a967-a0a8ee3708b4/index/"}
   2025-07-09T07:53:14,426 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-merge] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Push 
complete...
   2025-07-09T07:53:14,438 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish] 
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - 
Published [1] segments with commit 
metadata[{nextPartitions=SeekableStreamStartSequenceNumbers{stream='a', 
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', 
multiTopicPartition=false}=3009454933}, exclusivePartitions=[]}, 
publishPartitions=SeekableStreamEndSequenceNumbers{stream='a', 
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', 
multiTopicPartition=false}=3009454933}}}].
   2025-07-09T07:53:14,438 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish] 
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - 
Published segments: 
[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
   2025-07-09T07:53:14,438 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish] 
org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver - 
Published segment schemas[SegmentSchemaMapping{segmentIdToMetadataMap={}, 
schemaFingerprintToPayloadMap={}, version='1'}].
   2025-07-09T07:53:14,438 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Published 1 segments for sequence [index_kafka_a_efbb0759e7aa99a_1] with 
metadata 
[AppenderatorDriverMetadata{segments={index_kafka_a_efbb0759e7aa99a_1=[SegmentWithState{segmentIdentifier=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7,
 state=APPENDING}]}, 
lastSegmentIds={index_kafka_a_efbb0759e7aa99a_1=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7},
 callerMetadata={nextPartitions=SeekableStreamStartSequenceNumbers{stream='a', 
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', 
multiTopicPartition=false}=3009454933}, exclusivePartitions=[]}, 
publishPartitions=SeekableStreamEndSequenceNumbers{stream='a', 
partitionSequenceNumberMap={KafkaTopicPartition{partition=0, topic='null', 
multiTopicPartition=false}=3009454933}}}}].
   2025-07-09T07:53:14,438 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Published segments: 
[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
   2025-07-09T07:53:14,439 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-publish] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Saved 
sequence metadata to disk: []
   2025-07-09T07:53:29,504 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing 
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7]
 at 
path[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:52:28.251Z_85b89ba69fc64644816411d28bf6ee161]
   2025-07-09T07:53:29,505 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Unannounced 
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7],
 scheduling drop in [30000] millisecs
   2025-07-09T07:53:29,505 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver - 
Successfully handed off [1] segments.
   2025-07-09T07:53:29,510 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.server.coordination.BatchDataSegmentAnnouncer - Unannouncing 
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_6]
 at 
path[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:52:28.251Z_85b89ba69fc64644816411d28bf6ee161]
   2025-07-09T07:53:29,510 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.curator.announcement.Announcer - Unannouncing 
[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:52:28.251Z_85b89ba69fc64644816411d28bf6ee161]
   2025-07-09T07:53:29,523 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderator - Unannounced 
segment[a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_6],
 scheduling drop in [30000] millisecs
   2025-07-09T07:53:29,523 INFO 
[[index_kafka_a_efbb0759e7aa99a_oinmidfh]-appenderator-persist] 
org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver - 
Successfully handed off [1] segments.
   2025-07-09T07:53:29,527 INFO [task-runner-0-priority-0] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Handoff complete for segments: DataSegment{binaryVersion=9, 
id=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_6,
 loadSpec={type=>local, 
path=>/Users/frank/source/open/druid/distribution/apache-druid-33.0.0/var/druid/segments/a/2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z/2025-07-08T06:55:43.564Z/6/40c3824e-0308-4385-9595-c69287926478/index/},
 dimensions=[value], metrics=[], shardSpec=NumberedShardSpec{partitionNum=6, 
partitions=0}, lastCompactionState=null, size=343313}
   2025-07-09T07:53:29,527 INFO [task-runner-0-priority-0] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Handoff complete for segments: DataSegment{binaryVersion=9, 
id=a_2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z_2025-07-08T06:55:43.564Z_7,
 loadSpec={type=>local, 
path=>/Users/frank/source/open/druid/distribution/apache-druid-33.0.0/var/druid/segments/a/2025-05-01T00:00:00.000Z_2025-05-01T01:00:00.000Z/2025-07-08T06:55:43.564Z/7/e5a7be8f-02b8-4b51-a967-a0a8ee3708b4/index/},
 dimensions=[value], metrics=[], shardSpec=NumberedShardSpec{partitionNum=7, 
partitions=0}, lastCompactionState=null, size=305093}
   2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
   2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Closing reporter 
org.apache.kafka.common.metrics.JmxReporter
   2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Closing reporter 
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter
   2025-07-09T07:53:29,531 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
   2025-07-09T07:53:29,534 INFO [task-runner-0-priority-0] 
org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for 
consumer-kafka-supervisor-ofmlhdlj-1 unregistered
   2025-07-09T07:53:29,534 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.announcement.Announcer - Unannouncing 
[/druid/internal-discovery/PEON/localhost:8100]
   2025-07-09T07:53:29,540 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.discovery.CuratorDruidNodeAnnouncer - Unannounced self 
[{"druidNode":{"service":"druid/middleManager","host":"localhost","bindOnHost":false,"plaintextPort":8100,"port":-1,"tlsPort":-1,"enablePlaintextPort":true,"enableTlsPort":false},"nodeType":"peon","services":{"dataNodeService":{"type":"dataNodeService","tier":"_default_tier","maxSize":0,"type":"indexer-executor","serverType":"indexer-executor","priority":0},"lookupNodeService":{"type":"lookupNodeService","lookupTier":"__default"}},"startTime":"2025-07-09T07:51:29.426Z"}].
   2025-07-09T07:53:29,541 INFO [task-runner-0-priority-0] 
org.apache.druid.curator.announcement.Announcer - Unannouncing 
[/druid/announcements/localhost:8100]
   2025-07-09T07:53:29,549 INFO [task-runner-0-priority-0] 
org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Task completed 
with status: {
     "id" : "index_kafka_a_efbb0759e7aa99a_oinmidfh",
     "status" : "FAILED",
     "duration" : 120144,
     "errorMsg" : "Fetch position FetchPosition{offset=3009454933, 
offsetEpoch=Optional[0], 
currentLeader=LeaderAndEpoch{leader=Optional[10.22.64.229:9092 (id: 0 rack: 
null)], epoch=0}} is out of range for partition a-0\nThis may happen when given 
offsets have been deleted at the Kafka server due to the retention 
configuration. \nYou can use supervisor's reset API to set the offset to a 
valid position.",
     "location" : {
       "host" : null,
       "port" : -1,
       "tlsPort" : -1
     }
   }
   2025-07-09T07:53:29,551 INFO [main] 
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle 
[module] stage [ANNOUNCEMENTS]
   2025-07-09T07:53:29,554 INFO [main] 
org.apache.druid.curator.announcement.Announcer - Unannouncing 
[/druid/segments/localhost:8100/localhost:8100_indexer-executor__default_tier_2025-07-09T07:51:29.441Z_4a9f27198f324d99a3832da84e8fcc8f0]
   2025-07-09T07:53:29,560 INFO [main] 
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle 
[module] stage [SERVER]
   2025-07-09T07:53:29,564 INFO [main] 
org.eclipse.jetty.server.AbstractConnector - Stopped 
ServerConnector@118f2486{HTTP/1.1, (http/1.1)}{0.0.0.0:8100}
   2025-07-09T07:53:29,565 INFO [main] org.eclipse.jetty.server.session - node0 
Stopped scavenging
   2025-07-09T07:53:29,566 INFO [main] 
org.eclipse.jetty.server.handler.ContextHandler - Stopped 
o.e.j.s.ServletContextHandler@61ec2cb5{/,null,STOPPED}
   2025-07-09T07:53:29,567 INFO [main] 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting 
graceful shutdown of task[index_kafka_a_efbb0759e7aa99a_oinmidfh].
   2025-07-09T07:53:29,567 INFO [main] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Stopping forcefully (status: [PUBLISHING])
   2025-07-09T07:53:29,567 INFO [main] 
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle 
[module] stage [NORMAL]
   2025-07-09T07:53:29,567 INFO [main] 
org.apache.druid.server.coordination.SegmentBootstrapper - Stopping...
   2025-07-09T07:53:29,567 INFO [main] 
org.apache.druid.server.coordination.SegmentBootstrapper - Stopped.
   2025-07-09T07:53:29,567 INFO [main] 
org.apache.druid.server.coordination.ZkCoordinator - Stopping ZkCoordinator for 
[DruidServerMetadata{name='localhost:8100', hostAndPort='localhost:8100', 
hostAndTlsPort='null', maxSize=0, tier='_default_tier', type=indexer-executor, 
priority=0}]
   2025-07-09T07:53:29,571 INFO 
[LookupExtractorFactoryContainerProvider-MainThread] 
org.apache.druid.query.lookup.LookupReferencesManager - Lookup Management loop 
exited. Lookup notices are not handled anymore.
   2025-07-09T07:53:29,572 INFO [Curator-Framework-0] 
org.apache.curator.framework.imps.CuratorFrameworkImpl - 
backgroundOperationsLoop exiting
   2025-07-09T07:53:29,680 INFO [main] org.apache.zookeeper.ZooKeeper - 
Session: 0x100120c32ee0040 closed
   2025-07-09T07:53:29,680 INFO [main-EventThread] 
org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 
0x100120c32ee0040
   2025-07-09T07:53:29,683 INFO [main] 
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle 
[module] stage [INIT]
   ```
   
   On the task view, it's state is displayed as FAIIL
   
![image](https://github.com/user-attachments/assets/b3e5efb4-c0c2-4cce-9136-f984ae9bbddb)
   
   The status of task records the out of range exception
   
   
![image](https://github.com/user-attachments/assets/9c32b478-59a9-443c-82c3-916cce5477a2)
   
   
   
   
   This PR has:
   
   - [X] been self-reviewed.
      - [ ] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [X] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to