GWphua opened a new pull request, #18312:
URL: https://github.com/apache/druid/pull/18312

   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is 
a corresponding issue (referenced above), it's not necessary to repeat the 
description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the 
problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, 
create a mini-section for each of them. For example: -->
   
   #### Reduce log verbosity in SeekableStreamIndexTaskRunner
   
   The stack trace in `SeekableStreamIndexTaskRunner` is not useful, and too 
verbose when an InterruptException is thrown. I feel it is enough to be aware 
that the read from Kafka is interrupted.
   
   These are stack trace examples of the exception that I am trying to suppress.
   
   ```
   2024-12-26T12:24:43,109 INFO [Curator-Framework-0] 
org.apache.curator.framework.imps.CuratorFrameworkImpl - 
backgroundOperationsLoop exiting
   2024-12-26T12:24:43,097 ERROR [task-runner-0-priority-0] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Encountered exception in run() before persisting.
   org.apache.kafka.common.errors.InterruptException: 
java.lang.InterruptedException
       at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520)
 ~[?:?]
       at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281)
 ~[?:?]
       at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[?:?]
       at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
 ~[?:?]
       at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) 
~[?:?]
       at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
~[?:?]
       at 
org.apache.druid.indexing.kafka.KafkaRecordSupplier.poll(KafkaRecordSupplier.java:128)
 ~[?:?]
       at 
org.apache.druid.indexing.kafka.IncrementalPublishingKafkaIndexTaskRunner.getRecords(IncrementalPublishingKafkaIndexTaskRunner.java:95)
 ~[?:?]
       at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:599)
 [druid-indexing-service-0.22.1.jar:0.22.1]
       at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263)
 [druid-indexing-service-0.22.1.jar:0.22.1]
       at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146)
 [druid-indexing-service-0.22.1.jar:0.22.1]
       at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471)
 [druid-indexing-service-0.22.1.jar:0.22.1]
       at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443)
 [druid-indexing-service-0.22.1.jar:0.22.1]
       at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
       at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_275]
       at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_275]
       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
   Caused by: java.lang.InterruptedException
       ... 17 more
   ```
   
   ```
   2025-07-17T08:29:04,525 WARN [task-runner-0-priority-0] 
org.apache.druid.indexing.kafka.KafkaIndexTaskRunner - 
OffsetOutOfRangeException with message [Fetch position 
FetchPosition{offset=115551711, offsetEpoch=Optional[0], 
currentLeader=LeaderAndEpoch{leader=Optional[10.2.174.31:9092 (id: 0 rack: 
null)], epoch=0}} is out of range for partition a-0]
   2025-07-17T08:29:04,541 ERROR [task-runner-0-priority-0] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Offsets were reset automatically, potential data duplication or loss: 
{class=org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner, 
task=index_kafka_a_3a6fadba6bf8bb5_lcippohd, dataSource=a, 
partitions=[KafkaTopicPartition{partition=0, topic='null', 
multiTopicPartition=false}]}
   2025-07-17T08:29:04,541 INFO [task-runner-0-priority-0] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Received pause command, pausing ingestion until resumed.
   2025-07-17T08:29:04,548 INFO [parent-monitor-0] 
org.apache.druid.indexing.worker.executor.ExecutorLifecycle - Triggering JVM 
shutdown. Check overlord logs to see why the task is being shut down.
   2025-07-17T08:29:04,549 INFO [Thread-17] org.apache.druid.cli.CliPeon - 
Running shutdown hook
   2025-07-17T08:29:04,549 INFO [Thread-17] 
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle 
[module] stage [ANNOUNCEMENTS]
   2025-07-17T08:29:04,559 INFO [Thread-17] 
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing 
[/druid/announcements/localhost:8101]
   2025-07-17T08:29:04,571 INFO [Thread-17] 
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing 
[/druid/segments/localhost:8101/localhost:8101_indexer-executor__default_tier_2025-07-17T08:25:02.137Z_8604b625600045f394fc47c0a77edbcb1]
   2025-07-17T08:29:04,576 INFO [Thread-17] 
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing 
[/druid/segments/localhost:8101/localhost:8101_indexer-executor__default_tier_2025-07-17T08:23:07.472Z_9f43e98c47774de48e2aa2449c3131880]
   2025-07-17T08:29:04,592 INFO [Thread-17] 
org.apache.druid.curator.announcement.PathChildrenAnnouncer - Unannouncing 
[/druid/internal-discovery/PEON/localhost:8101]
   2025-07-17T08:29:04,597 INFO [Thread-17] 
org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle 
[module] stage [SERVER]
   2025-07-17T08:29:04,609 INFO [Thread-17] 
org.eclipse.jetty.server.AbstractConnector - Stopped 
ServerConnector@65f36591{HTTP/1.1, (http/1.1)}{0.0.0.0:8101}
   2025-07-17T08:29:04,609 INFO [Thread-17] org.eclipse.jetty.server.session - 
node0 Stopped scavenging
   2025-07-17T08:29:04,611 INFO [Thread-17] 
org.eclipse.jetty.server.handler.ContextHandler - Stopped 
o.e.j.s.ServletContextHandler@45482f82{/,null,STOPPED}
   2025-07-17T08:29:04,612 INFO [Thread-17] 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner - Starting 
graceful shutdown of task[index_kafka_a_3a6fadba6bf8bb5_lcippohd].
   2025-07-17T08:29:04,612 INFO [Thread-17] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Stopping forcefully (status: [PAUSED])
   2025-07-17T08:29:04,612 ERROR [task-runner-0-priority-0] 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - 
Encountered exception in run() before persisting.
   java.lang.InterruptedException: null
       at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638)
 ~[?:?]
       at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.possiblyPause(SeekableStreamIndexTaskRunner.java:1392)
 ~[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
       at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:616)
 [druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
       at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:295)
 [druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
       at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.runTask(SeekableStreamIndexTask.java:159)
 [druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
       at 
org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:179) 
[druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
       at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:478)
 [druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
       at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:450)
 [druid-indexing-service-34.0.0-SNAPSHOT.jar:34.0.0-SNAPSHOT]
       at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
 [guava-32.1.3-jre.jar:?]
       at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
 [guava-32.1.3-jre.jar:?]
       at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
 [guava-32.1.3-jre.jar:?]
       at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 [?:?]
       at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 [?:?]
       at java.base/java.lang.Thread.run(Thread.java:842) [?:?]
   ```
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are 
corner cases and error conditions handled, such as when there are insufficient 
resources?
    - Class organization and design (how the logic is split between classes, 
inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, 
parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of 
emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative 
name) for every design (or naming) decision point and compare the alternatives 
with the designs that you've implemented (or the names you've chosen) to 
highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in 
this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in 
the development mailing list), link to that discussion from this PR description 
and explain what have changed in your final design compared to your original 
proposal or the consensus version in the end of the discussion. If something 
hasn't changed since the original discussion, you can omit a detailed 
discussion of those aspects of the design here, perhaps apart from brief 
mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small 
changes. -->
   
   #### Release note
   * Reduced log verbosity in SeekableStreamIndexTaskRunner when 
InterruptionException is thrown
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `SeekableStreamIndexTaskRunner.java`
   
   This PR has:
   
   - [x] been self-reviewed.
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to