GWphua opened a new pull request, #18312:
URL: https://github.com/apache/druid/pull/18312
### Description
<!-- Describe the goal of this PR, what problem are you fixing. If there is
a corresponding issue (referenced above), it's not necessary to repeat the
description here, however, you may choose to keep one summary sentence. -->
<!-- Describe your patch: what did you change in code? How did you fix the
problem? -->
<!-- If there are several relatively logically separate changes in this PR,
create a mini-section for each of them. For example: -->
#### Reduce log verbosity in SeekableStreamIndexTaskRunner
The stack trace in `SeekableStreamIndexTaskRunner` is not useful, and too
verbose when an InterruptException is thrown. I feel it is enough to be aware
that the read from Kafka is interrupted.
These are stack trace examples of the exception that I am trying to suppress.
```
2024-12-26T12:24:43,109 INFO [Curator-Framework-0]
org.apache.curator.framework.imps.CuratorFrameworkImpl -
backgroundOperationsLoop exiting
2024-12-26T12:24:43,097 ERROR [task-runner-0-priority-0]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Encountered exception in run() before persisting.
org.apache.kafka.common.errors.InterruptException:
java.lang.InterruptedException
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520)
~[?:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
~[?:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
~[?:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
~[?:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
~[?:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
~[?:?]
at
org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:128)
~[?:?]
at
org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:95)
~[?:?]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:599)
[druid-indexing-service-0.22.1.jar:0.22.1]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263)
[druid-indexing-service-0.22.1.jar:0.22.1]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146)
[druid-indexing-service-0.22.1.jar:0.22.1]
at
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471)
[druid-indexing-service-0.22.1.jar:0.22.1]
at
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443)
[druid-indexing-service-0.22.1.jar:0.22.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_275]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.InterruptedException
... 17 more
```
```
2025-07-17T08:29:04,525 WARN [task-runner-0-priority-0]
org.apache.druid.indexing.kafka.KafkaIndexTaskRunner -
OffsetOutOfRangeException with message [Fetch position
FetchPosition{offset=115551711, offsetEpoch=Optional[0],
currentLeader=LeaderAndEpoch{leader=Optional[10.2.174.31:9092 (id: 0 rack:
null)], epoch=0}} is out of range for partition a-0]
2025-07-17T08:29:04,541 ERROR [task-runner-0-priority-0]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Offsets were reset automatically, potential data duplication or loss:
{class=org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner,
task=index_kafka_a_3a6fadba6bf8bb5_lcippohd, dataSource=a,
partitions=[KafkaTopicPartition{partition=0, topic='null',
multiTopicPartition=false}]}
2025-07-17T08:29:04,541 INFO [task-runner-0-priority-0]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Received pause command, pausing ingestion until resumed.
2025-07-17T08:29:04,548 INFO [parent-monitor-0]
org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Triggering JVM
shutdown. Check overlord logs to see why the task is being shut down.
2025-07-17T08:29:04,549 INFO [Thread-17] org.apache.druid.cli.CliPeon -
Running shutdown hook
2025-07-17T08:29:04,549 INFO [Thread-17]
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle
[module] stage [ANNOUNCEMENTS]
2025-07-17T08:29:04,559 INFO [Thread-17]
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing
[/druid/announcements/localhost:8101]
2025-07-17T08:29:04,571 INFO [Thread-17]
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing
[/druid/segments/localhost:8101/localhost:8101_indexer-executor__default_tier_2025-07-17T08:25:02.137Z_8604b625600045f394fc47c0a77edbcb1]
2025-07-17T08:29:04,576 INFO [Thread-17]
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing
[/druid/segments/localhost:8101/localhost:8101_indexer-executor__default_tier_2025-07-17T08:23:07.472Z_9f43e98c47774de48e2aa2449c3131880]
2025-07-17T08:29:04,592 INFO [Thread-17]
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing
[/druid/internal-discovery/PEON/localhost:8101]
2025-07-17T08:29:04,597 INFO [Thread-17]
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle
[module] stage [SERVER]
2025-07-17T08:29:04,609 INFO [Thread-17]
org.eclipse.jetty.server.AbstractConnector - Stopped
ServerConnector@65f36591{HTTP/1.1, (http/1.1)}{0.0.0.0:8101}
2025-07-17T08:29:04,609 INFO [Thread-17] org.eclipse.jetty.server.session -
node0 Stopped scavenging
2025-07-17T08:29:04,611 INFO [Thread-17]
org.eclipse.jetty.server.handler.ContextHandler - Stopped
o.e.j.s.ServletContextHandler@45482f82{/,null,STOPPED}
2025-07-17T08:29:04,612 INFO [Thread-17]
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting
graceful shutdown of task[index_kafka_a_3a6fadba6bf8bb5_lcippohd].
2025-07-17T08:29:04,612 INFO [Thread-17]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Stopping forcefully (status: [PAUSED])
2025-07-17T08:29:04,612 ERROR [task-runner-0-priority-0]
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner -
Encountered exception in run() before persisting.
java.lang.InterruptedException: null
at
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638)
~[?:?]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.possiblyPause(SeekableStreamIndexTaskRunner.java:1392)
~[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:616)
[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:295)
[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
at
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.runTask(SeekableStreamIndexTask.java:159)
[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
at
org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:179)
[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
at
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:478)
[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
at
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:450)
[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
at
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
[guava-32.1.3-jre.jar:?]
at
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
[guava-32.1.3-jre.jar:?]
at
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
[guava-32.1.3-jre.jar:?]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[?:?]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[?:?]
at java.base/java.lang.Thread.run(Thread.java:842) [?:?]
```
<!--
In each section, please describe design decisions made, including:
- Choice of algorithms
- Behavioral aspects. What configuration values are acceptable? How are
corner cases and error conditions handled, such as when there are insufficient
resources?
- Class organization and design (how the logic is split between classes,
inheritance, composition, design patterns)
- Method organization and design (how the logic is split between methods,
parameters and return types)
- Naming (class, method, API, configuration, HTTP endpoint, names of
emitted metrics)
-->
<!-- It's good to describe an alternative design (or mention an alternative
name) for every design (or naming) decision point and compare the alternatives
with the designs that you've implemented (or the names you've chosen) to
highlight the advantages of the chosen designs and names. -->
<!-- If there was a discussion of the design of the feature implemented in
this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in
the development mailing list), link to that discussion from this PR description
and explain what have changed in your final design compared to your original
proposal or the consensus version in the end of the discussion. If something
hasn't changed since the original discussion, you can omit a detailed
discussion of those aspects of the design here, perhaps apart from brief
mentioning for the sake of readability of this PR description. -->
<!-- Some of the aspects mentioned above may be omitted for simple and small
changes. -->
#### Release note
* Reduced log verbosity in SeekableStreamIndexTaskRunner when
InterruptionException is thrown
<hr>
##### Key changed/added classes in this PR
* `SeekableStreamIndexTaskRunner.java`
This PR has:
- [x] been self-reviewed.
- [x] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [x] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]