jihoonson opened a new issue #10352:
URL: https://github.com/apache/druid/issues/10352
### Motivation
Currently, the Parallel task doesn't provide any metrics and so you need to
read through task logs when something goes wrong. Reading task logs is harder
when it comes to Parallel task because you first need to find out what subtask
went wrong in the supervisor task logs and then read through the subtask logs
again. Even when you find the right task logs of a bad subtask, it might be
hard to find anything good since task logs only have limited information of
what the task has been doing which you need to interpret instead of actual
metrics. We already find providing those metrics useful for streaming
ingestion. The batch ingestion could similarly benefit from it.
### Proposed changes
To help with easy debugging, the native parallel batch ingestion should
provide useful metrics. These metrics will be exposed via both task reporting
system and metrics emitter.
#### Task reports
Both live and complete task reports will be provided. Live reports will be
provided while the ingestion task is running and complete task reports will be
available once the task is done.
The subtask report will include metrics for bytes in/out, rows
in/filtered/unparseable/out, disk spills, fetch time, and errors. The
supervisor task report will include the metrics per phase which are mostly the
average of subtask metrics.
##### Live reports
The live reports of the supervisor task will include:
- Complete phases
- Phase duration
- Complete status (number of succeeded/failed subtasks)
- Errors of _N_ last failed subtasks
- Average/min/max duration of succeeded subtasks
- Average/min/max bytes/rows in/out of subtasks
- Total bytes in/out of subtasks
- Total rows in/filtered/unparseable/out of subtasks
- Average/min/max number of disk spills of subtasks
- Average/min/max fetch time of subtasks
- Average/min/max number of created segments of subtasks
- Total number of created segments
- Current phase
- Runtime of current phase
- Progress (number of succeeded/failed/expected to succeed subtasks)
- Errors of recently failed subtasks
- Average/min/max duration of succeeded subtasks
- Min/max bytes/rows in/out of succeeded subtasks
- Moving average of running total of bytes/rows in/out of running subtasks
- Running total of bytes in/out of subtasks (succeeded + running)
- Running total of rows in/filtered/unparseable/out of subtasks (succeeded
+ running)
- Average/min/max number of disk spills and spill time of succeeded
subtasks
- Average/min/max fetch time of succeeded subtasks
- Average/min/max number of created segments of succeeded subtasks
- Running total of number of created segments
- Remaining phases
- Remaining phase names
The live reports of subtasks will include:
- Moving average of bytes/rows in/out
- Running total of bytes/rows in/filtered/unparseable/out
- Running total of number of disk spills, average spill time
- Fetch time
##### Complete reports
The complete reports of the supervisor task will include:
- Duration of the supervisor task
- Segment metrics
- Total number of segments published
- Average/min/max number of segments published per interval
- Average/min/max number of rows per segment
- Total bytes in/out
- Total rows in/filtered/unparseable/out
- Supervisor task error if any
- Per-phase metrics
- Phase duration
- Total bytes in/out
- Total rows in/filtered/unparseable/out
- Number of succeeded/failed subtasks
- Errors of _N_ recent failed subtasks
- Average/min/max runtime of subtasks
- Average/min/max number of total disk spills and spill time of subtasks
- Average/min/max fetch time of subtasks
The complete reports of the subtasks will include:
- Total bytes in/out
- Total rows in/filtered/unparseable/out
- Total number of disk spills and spill time
- Fetch time
- Error if failed
#### Metrics
For task metrics, all the above metrics will be emitted via metrics emitter
as well.
MiddleManager will additionally emit these metrics.
- Moving average of shuffle bytes
- Moving average of shuffle requests
#### Live reporting system for Parallel task


- Subtasks periodically send their live reports to the supervisor task
- Failing in sending metrics can make the subtask fail. Needs retries.
- Subtasks can report their current status with metrics directly to the
supervisor task.
- The supervisor task can use those reports as a heartbeat signal
- If a missing report is found, the supervisor task will check with the
Overlord to see if the subtask did fail. If the subtask is alive, the missing
report should be noted in the live report of the supervisor task. If the
subtask died, the supervisor task issues a new subtask for retry.
- The supervisor task immediately retries failed subtasks when they report
failures. They cannot succeed after reporting failures.
- When subtasks report successes, the supervisor task checks with the
Overlord if they did succeed. They can fail even after reporting successes.
#### Complete reporting system for Parallel task
- The final report is pushed to both deep storage and to the supervisor task
- TaskReportFileWriter will be used to push to deep storage in
middleManager and indexer
- TaskRunnerListener can be used to send to supervisor task in peon and
indexer
### Rationale
#### Rationale for the list of metrics
Live reports and metrics are mostly useful for debugging. The new metrics
should be able to answer these questions.
How is my ingestion going?
- How long has my supervisor task been running?
- How long does each subtask run for?
- How many phases left to run in my parallel ingestion?
- How many subtasks left to run in the current phase?
- How large data is each subtask processing?
Why is my ingestion slow?
- Are there any intermittent subtask failures?
- Is each subtask processing too many data?
- In each subtask, are there too many spilling on disk?
- Is shuffle slow?
What was the last state of my succeeded ingestion?
- How many segments did my ingestion create?
- What was total size of created segments?
- How many subtask failures were in my ingestion? What were those failures
if any?
- How long did my ingestion take?
Why did my ingestion fail?
- Were there any subtask failures? If so, what were the error messages?
- Did the parallel task fail? If so, what was the error message?
Why does my ingestion not create segments?
- How many rows were in published segments?
- How many rows were filtered out?
- How many rows were unparseable?
Why is my query slow after ingestion?
- Are there too few rows per segment?
- Are there too many segments per time chunk?
#### More HTTP connections for live reporting system
In the proposed live reporting system, each subtask needs to talk to its
supervisor task over HTTP. This will result in more HTTP connections between
tasks. However, I would like to go with the current approach for now instead of
making connections between middleManagers because
- Currently, there is another API in the supervisor task which every subtask
directly calls to allocate new segments in dynamic partitioning. This API call
cannot be delegated by middleManager without introducing a new async API
framework.
- Even though there is already a supervisor task API called by every
subtask, I would say the number of HTTP connections won’t be that large in most
cases. The number of HTTP connections is computed by `maxNumConcurrentSubTasks`
* `druid.global.http.numConnections` (20 by default). In general,
`maxNumConcurrentSubTasks` doesn’t go beyond 200 even in large clusters.
- The concern with high number of connections could be that too many
connections can affect query performance somehow in the data node model where a
middleManager and a historical live in the same machine. However, even though
every subtask is already making connections to the supervisor task in dynamic
partitioning, I haven’t heard of any problem in that yet.
- If we did observe some problem with very large number of HTTP connections,
there would be still workarounds.
- We can adjust `maxNumConcurrentSubTasks` or
`druid.global.http.numConnections`. Especially for
`druid.global.http.numConnections`, I’m not sure why it’s defaulted as 20 for
peons and middleManagers. We should consider lowering it.
- Another workaround is using Indexer. This problem doesn’t exist with
Indexers since the connections will be made between Indexers, not tasks.
#### Additional memory pressure in the supervisor task
The supervisor task will track metrics per phase not per subtask (except for
error messages in failed subtasks). The metrics for each phase is computed by
aggregating subtask metrics whenever they send reports. As a result, the
supervisor task needs to keep more or less 20 metrics per phase in memory. This
shouldn't be large.
### Operational impact
As described above, there will be two changes of more HTTP connections
between peons and additional memory usage in the supervisor task. However,
neither of them is expected to have a huge impact in operation.
### Test plan
The live and complete reports should be tested in integration tests. I will
perform some testing for metrics on our internal cluster.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]