harinirajendran opened a new issue, #11414:
URL: https://github.com/apache/druid/issues/11414

   ### Affected Version
   
   0.21
   
   ### Description
   
   We have a druid cluster in which we are ingesting about 2.5M events/second. 
We have 90 ingestion tasks for 1 of our data sources with task duration set to 
1 hr. Whenever the tasks roll every hour, Kafka ingestion lag spikes up 
anywhere from 3M to even 15M `druid.ingest.kafka.lag`. On further analysis, we 
noted that while tasks are rolling, some of the active ingestion tasks are 
stuck in `pause` state for a long time (sometimes up to 1.5-2 minutes) during 
which those tasks aren't ingesting any data resulting in ingestion lag spike.
   
   Logs from MM tasks with a huge gap between `pause` and `resume`
   ```
   {"@timestamp":"2021-06-21T17:34:51.628Z", "log.level":"DEBUG", 
"message":"Received pause command, pausing ingestion until resumed.", 
"service.name":"druid/middleManager","event.dataset":"druid/middleManager.log","process.thread.name":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
   {"@timestamp":"2021-06-21T17:36:27.089Z", "log.level":"DEBUG", 
"message":"Received pause command, pausing ingestion until resumed.", 
"service.name":"druid/middleManager","event.dataset":"druid/middleManager.log","process.thread.name":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
   {"@timestamp":"2021-06-21T17:36:27.097Z", "log.level":"DEBUG", 
"message":"Received resume command, resuming ingestion.", 
"service.name":"druid/middleManager","event.dataset":"druid/middleManager.log","process.thread.name":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
   ```  
   In the above loglines, we can see that that task was in `pause` state from 
`17:34:51` to `17:36:27`.
   
   On further analysis, we figured out that the MM taskRunner goes to a pause 
state when it is requesting a `checkpoint` notice 
[here](https://github.com/apache/druid/blob/8264203cee688607091232897749e959e7706010/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L729).
   From the time the taskRunner submits the checkpoint notice, it actually 
takes around 1.5 minutes for the coordinator to actually process this 
checkpoint notice.  We can see it in the coordinator logs below for a specific 
task.
   ```
   Jun 21, 2021 @ 17:34:51.624 Performing action for task[<task_id>]: 
CheckPointDataSourceMetadataAction{supervisorId='<supervisor_id>',          
taskGroupId='14', 
checkpointMetadata=KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='<kafka-topic>',
 partitionSequenceNumberMap={104=238760642689, 14=337995870870}, 
exclusivePartitions=[]}}} coordinator-0
   Jun 21, 2021 @ 17:34:51.624 Checkpointing 
[KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='<kafka-topic>',
 partitionSequenceNumberMap={104=238760642689, 14=337995870870}, 
exclusivePartitions=[]}}] for taskGroup [14] coordinator-0
   Jun 21, 2021 @ 17:36:27.086 Pause task[<task_id>] coordinator-0
   Jun 21, 2021 @ 17:36:27.087 HTTP POST: 
http://<MMHost:MMport>/druid/worker/v1/chat/<task_id>/pause  coordinator-0
   Jun 21, 2021 @ 17:36:27.089 SetEndOffsets task[<task_id>] 
endOffsets[{104=238763631003, 14=337998805846}] finalize[false] coordinator-0
   Jun 21, 2021 @ 17:36:27.089 Task [<task_id>] paused successfully  
coordinator-0
   Jun 21, 2021 @ 17:36:27.091 HTTP POST: 
http://<MMHost:MMport>/druid/worker/v1/chat/<task_id>/offsets/end?finish=false 
coordinator-0
   Jun 21, 2021 @ 17:36:27.097 Handled checkpoint notice, new checkpoint is 
[{104=238763631003, 14=337998805846}] for taskGroup [14] coordinator-0
   ```
   
   Note that this long pause of ingestion task happens **only** when tasks are 
rolling. Not during other times.
   
   Our guess here is that, while tasks are rolling, the [notices 
queue](https://github.com/confluentinc/druid/blob/0.21.0-confluent/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L752)
 has a lot of notices in them and each notice takes a long time to be processed 
thus causing significant delay in the `checkpoint` notice as well to be 
processed once its added to the queue.
   
   Currently, we do not have logs in place to figure out how many notices are 
there in this queue at any point and how long does each notice takes to get 
executed.
   
   Spent some more time analyzing further and we saw that some of the following 
functions 
[here](https://github.com/apache/druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L1447)
 are taking multiple seconds when tasks are rolling as opposed to few 
milliseconds when tasks aren't.
   ```
         discoverTasks();
   
         updateTaskStatus();
   
         checkTaskDuration();
   
         checkPendingCompletionTasks();
   
         checkCurrentTaskState();
   ```
   Some of the above functions require checking task status and that seems to 
be taking a long time.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to