jihoonson opened a new issue #10352:
URL: https://github.com/apache/druid/issues/10352


   ### Motivation
   
   Currently, the Parallel task doesn't provide any metrics and so you need to 
read through task logs when something goes wrong. Reading task logs is harder 
when it comes to Parallel task because you first need to find out what subtask 
went wrong in the supervisor task logs and then read through the subtask logs 
again. Even when you find the right task logs of a bad subtask, it might be 
hard to find anything good since task logs only have limited information of 
what the task has been doing which you need to interpret instead of actual 
metrics. We already find providing those metrics useful for streaming 
ingestion. The batch ingestion could similarly benefit from it.
   
   ### Proposed changes
   
   To help with easy debugging, the native parallel batch ingestion should 
provide useful metrics. These metrics will be exposed via both task reporting 
system and metrics emitter.
   
   #### Task reports
   
   Both live and complete task reports will be provided. Live reports will be 
provided while the ingestion task is running and complete task reports will be 
available once the task is done. 
   
   The subtask report will include metrics for bytes in/out, rows 
in/filtered/unparseable/out, disk spills, fetch time, and errors. The 
supervisor task report will include the metrics per phase which are mostly the 
average of subtask metrics.
   
   ##### Live reports
   
   The live reports of the supervisor task will include:
   - Complete phases
     - Phase duration
     - Complete status (number of succeeded/failed subtasks)
     - Errors of _N_ last failed subtasks
     - Average/min/max duration of succeeded subtasks
     - Average/min/max bytes/rows in/out of subtasks
     - Total bytes in/out of subtasks
     - Total rows in/filtered/unparseable/out of subtasks
     - Average/min/max number of disk spills of subtasks
     - Average/min/max fetch time of subtasks
     - Average/min/max number of created segments of subtasks
     - Total number of created segments
   - Current phase
     - Runtime of current phase
     - Progress (number of succeeded/failed/expected to succeed subtasks)
     - Errors of recently failed subtasks
     - Average/min/max duration of succeeded subtasks
     - Min/max bytes/rows in/out of succeeded subtasks
     - Moving average of running total of bytes/rows in/out of running subtasks
     - Running total of bytes in/out of subtasks (succeeded + running)
     - Running total of rows in/filtered/unparseable/out of subtasks (succeeded 
+ running)
     - Average/min/max number of disk spills and spill time of succeeded 
subtasks
     - Average/min/max fetch time of succeeded subtasks
     - Average/min/max number of created segments of succeeded subtasks
     - Running total of number of created segments
   - Remaining phases
     - Remaining phase names
   
   The live reports of subtasks will include:
   - Moving average of bytes/rows in/out
   - Running total of bytes/rows in/filtered/unparseable/out
   - Running total of number of disk spills, average spill time
   - Fetch time
   
   ##### Complete reports
   
   The complete reports of the supervisor task will include:
   - Duration of the supervisor task
   - Segment metrics
     - Total number of segments published
     - Average/min/max number of segments published per interval
     - Average/min/max number of rows per segment
   - Total bytes in/out
   - Total rows in/filtered/unparseable/out
   - Supervisor task error if any
   - Per-phase metrics
     - Phase duration
     - Total bytes in/out
     - Total rows in/filtered/unparseable/out
     - Number of succeeded/failed subtasks
     - Errors of _N_ recent failed subtasks
     - Average/min/max runtime of subtasks
     - Average/min/max number of total disk spills and spill time of subtasks
     - Average/min/max fetch time of subtasks
   
   The complete reports of the subtasks will include:
   - Total bytes in/out
   - Total rows in/filtered/unparseable/out
   - Total number of disk spills and spill time
   - Fetch time
   - Error if failed
   
   #### Metrics
   
   For task metrics, all the above metrics will be emitted via metrics emitter 
as well.
   
   MiddleManager will additionally emit these metrics.
   - Moving average of shuffle bytes
   - Moving average of shuffle requests
   
   #### Live reporting system for Parallel task
   
   ![live reporting 
system](https://user-images.githubusercontent.com/2322288/92177181-dcc62e00-edf4-11ea-811e-eb5c42f1537d.png)
   ![live reporting system 
(1)](https://user-images.githubusercontent.com/2322288/92177187-de8ff180-edf4-11ea-9766-c1b3b9fbde12.png)
   
   - Subtasks periodically send their live reports to the supervisor task
     - Failing in sending metrics can make the subtask fail. Needs retries.
     - Subtasks can report their current status with metrics directly to the 
supervisor task.
   - The supervisor task can use those reports as a heartbeat signal
     - If a missing report is found, the supervisor task will check with the 
Overlord to see if the subtask did fail. If the subtask is alive, the missing 
report should be noted in the live report of the supervisor task. If the 
subtask died, the supervisor task issues a new subtask for retry.
     - The supervisor task immediately retries failed subtasks when they report 
failures. They cannot succeed after reporting failures.
     - When subtasks report successes, the supervisor task checks with the 
Overlord if they did succeed. They can fail even after reporting successes.
   
   #### Complete reporting system for Parallel task
   
   - The final report is pushed to both deep storage and to the supervisor task
     - TaskReportFileWriter will be used to push to deep storage in 
middleManager and indexer
     - TaskRunnerListener can be used to send to supervisor task in peon and 
indexer
   
   ### Rationale
   
   #### Rationale for the list of metrics
   
   Live reports and metrics are mostly useful for debugging. The new metrics 
should be able to answer these questions.
   
   How is my ingestion going?
   - How long has my supervisor task been running?
   - How long does each subtask run for?
   - How many phases left to run in my parallel ingestion?
   - How many subtasks left to run in the current phase?
   - How large data is each subtask processing?
   
   Why is my ingestion slow?
   - Are there any intermittent subtask failures?
   - Is each subtask processing too many data?
   - In each subtask, are there too many spilling on disk?
   - Is shuffle slow?
   
   What was the last state of my succeeded ingestion?
   - How many segments did my ingestion create?
   - What was total size of created segments?
   - How many subtask failures were in my ingestion? What were those failures 
if any?
   - How long did my ingestion take?
   
   Why did my ingestion fail?
   - Were there any subtask failures? If so, what were the error messages?
   - Did the parallel task fail? If so, what was the error message?
   
   Why does my ingestion not create segments?
   - How many rows were in published segments?
   - How many rows were filtered out?
   - How many rows were unparseable?
   
   Why is my query slow after ingestion?
   - Are there too few rows per segment?
   - Are there too many segments per time chunk?
   
   #### More HTTP connections for live reporting system
   
   In the proposed live reporting system, each subtask needs to talk to its 
supervisor task over HTTP. This will result in more HTTP connections between 
tasks. However, I would like to go with the current approach for now instead of 
making connections between middleManagers because
   - Currently, there is another API in the supervisor task which every subtask 
directly calls to allocate new segments in dynamic partitioning. This API call 
cannot be delegated by middleManager without introducing a new async API 
framework.
   - Even though there is already a supervisor task API called by every 
subtask, I would say the number of HTTP connections won’t be that large in most 
cases. The number of HTTP connections is computed by `maxNumConcurrentSubTasks` 
* `druid.global.http.numConnections` (20 by default). In general, 
`maxNumConcurrentSubTasks` doesn’t go beyond 200 even in large clusters.
   - The concern with high number of connections could be that too many 
connections can affect query performance somehow in the data node model where a 
middleManager and a historical live in the same machine. However, even though 
every subtask is already making connections to the supervisor task in dynamic 
partitioning, I haven’t heard of any problem in that yet.
   - If we did observe some problem with very large number of HTTP connections, 
there would be still workarounds.
     - We can adjust `maxNumConcurrentSubTasks` or 
`druid.global.http.numConnections`. Especially for 
`druid.global.http.numConnections`, I’m not sure why it’s defaulted as 20 for 
peons and middleManagers. We should consider lowering it.
     - Another workaround is using Indexer. This problem doesn’t exist with 
Indexers since the connections will be made between Indexers, not tasks.
   
   #### Additional memory pressure in the supervisor task
   
   The supervisor task will track metrics per phase not per subtask (except for 
error messages in failed subtasks). The metrics for each phase is computed by 
aggregating subtask metrics whenever they send reports. As a result, the 
supervisor task needs to keep more or less 20 metrics per phase in memory. This 
shouldn't be large.
   
   ### Operational impact
   
   As described above, there will be two changes of more HTTP connections 
between peons and additional memory usage in the supervisor task. However, 
neither of them is expected to have a huge impact in operation.
   
   ### Test plan
   
   The live and complete reports should be tested in integration tests. I will 
perform some testing for metrics on our internal cluster.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to