[
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334427#comment-17334427
]
wangwj edited comment on FLINK-10644 at 4/30/21, 2:40 PM:
----------------------------------------------------------
[~trohrmann]
Hi Till.
I am from the search and recommendation department of Alibaba in China. Happy
to share and discuss my job here.
Our big data processing platform uses Flink Batch to process extremely huge
data every day. Many long-tail tasks are produced everyday and we have to kill
these processes manually, which leads to a poor user experience. So I tried to
solve this problem.
I think that speculative execution means that two executions in a
ExecutionVertex running at a same time, and failover means that two tasks
running at two different time. Based on this, I think this feature(speculative
execution) is theoretically achievable. So, I have implemented a speculative
execution for batch job based on Blink, and it had a significant effect in our
product cluster.
I did as follows:
(1)Which kind of ExecutionJobVertex is suitable enable speculative execution
feature in a batch job?
The speculative execution feature correlates with the implementation details of
the region failover. So, I found that a ExecutionJobVertex will enable
speculative execution feature only if all input edges and output edges of this
ExecutionJobVertex are blocking(Condition A).
(2)How to distinguish long-tail task?
I distinguish long-tail task based on the intervals between the current time
and the execution first create/deploying time before it failover. When an
ExecutionJobVertex meets Condition A and a configurable percentage of
executions has been finished in the ExecutionJobVertex, the speculative
execution thread starts to really work. In the ExecutionJobVertex, when the
running time of a execution is greater than a configurable multiple of the
median of the running time of other finished executions, this execution is
defined as long-tail execution.(Condition B)
(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in Condition B, I solved the
problem of long-tail tasks in our product cluster.
In the next step, we may add the throughput of the task to the speculative
execution algorithm through the heartbeat of TaskManagers with JobManager.
(4)How to schedule another execution in a same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative
execution.
(5)How to make the speculative task run on another machine from the original
execution.
We have implemented a machine-dimensional blacklist per job. The machine IP was
added in the blacklist when an execution is recognized as a long-tail
execution. The blacklist would remove the machine IP when it is out of date.
When the executions are scheduled, we will add information of the blacklist to
yarn PlacementConstraint. In this way, I can ensure that the yarn container is
not on the machines in the blacklist.
(6)How to avoid errors when multiple executions finish at the same time in an
ExecutionVertex?
In ExecutionVertex executionFinished() method, multi-thread synchronization was
used to ensure that only one execution would successfully finished in an
ExecutionVertex. All the other executions will go to the cancellation logic.
(7)How to deal with multiple sink files in one ExecutionVertex when the job is
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to
the file name.
Finally, I will delete or rename these files in finalizeOnMaster().
Here we should pay attention to the situation of flink stream job processing
bounded data sets.
(8)In batch job with all-to-all shuffle, how did the downstream original
execution and speculative execution select the ResultSubPartition of the
upstream executions?
Two executions of a upstream ExecutionVertex will produce two ResultPartitions.
When the upstream ExecutionVertex finished, we will update the inputChannel of
down stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation when the down stream execution
meet DataConsumptionException. It will restart with the upstream execution that
has been finished.
(9)How to display information about speculative task on the Flink web ui?
After I have implemented this feature. When speculative execution runs faster
then original execution, the flink ui will show that this task has been
cancelled. But the result of the job is correct, which is in full compliance
with our expectations.
I don’t know much about the web, I will ask my colleague for help.
[~trohrmann]
My implementation has played a big role in our product cluster in Alibaba.
Happy to discuss it.
was (Author: wangwj):
[~trohrmann]
Hi Till.
I am from the search and recommendation department of Alibaba in China. Happy
to share and discuss my job here.
Our big data processing platform uses Flink Batch to process extremely huge
data every day. Many long-tail tasks are produced everyday and we have to kill
these processes manually, which leads to a poor user experience. So I tried to
solve this problem.
I think that speculative execution means that two executions in a
ExecutionVertex running at a same time, and failover means that two tasks
running at two different time. Based on this, I think this feature(speculative
execution) is theoretically achievable. So, I have implemented a speculative
execution for batch job based on Blink, and it had a significant effect in our
product cluster.
I did as follows:
(1)Which kind of ExecutionJobVertex is suitable enable speculative execution
feature in a batch job?
The speculative execution feature correlates with the implementation details of
the region failover. So, I found that a ExecutionJobVertex will enable
speculative execution feature only if all input edges and output edges of this
ExecutionJobVertex are blocking(Condition A).
(2)How to distinguish long-tail task?
I distinguish long-tail task based on the intervals between the current time
and the execution first create/deploying time before it failover. When an
ExecutionJobVertex meets Condition A and a configurable percentage of
executions has been finished in the ExecutionJobVertex, the speculative
execution thread starts to really work. In the ExecutionJobVertex, when the
running time of a execution is greater than a configurable multiple of the
median of the running time of other finished executions, this execution is
defined as long-tail execution.
(3)How to make the speculative execution algorithm more precise?
Baesd on the speculative execution algorithm in (2), In our product cluster, I
can completely solve the long tail problem.
In the next step, we maybe add the throughput of the task to the speculative
execution algorithm through the heartbeat of TaskManager with JobManager.
(4)How schedule another execution in an same ExecutionVertex?
We have changed the currentExecution in ExecutionVertex to a list, which means
that there can be multiple executions in an ExecutionVertex at the same time.
Then we reuse the current scheduling logic to schedule the speculative
execution execution.
(5)How to make the speculative task runs on a different machine from the
original task.
We have implemented a machine-dimensional blacklist,and add the machine ip in
the blacklist when a execution is a long tail execution base on speculative
execution algorithm in (2). The blacklist has the ability of timed out.
When schedule executions we will add blacklist information to yarn
PlacementConstraint.
In this way, I can ensure that the yarn container is not on the machines in the
blacklist.
(6)How to avoid errors when multiple executions finish at the same time in an
ExecutionVertex?
In ExecutionVertex executionFinished() method, I have done multi-thread
synchronization, to ensure that an ExecutionVertex will eventually have only
one execution successfully finished, and other executions will all go to the
cancellation logic.
(7)How to deal with multiple sink files in one ExecutionVertex when the job is
sink to files?
When batch job will sink to file, we will add an executionAttemptID suffix to
the file name.
Finally in finalizeOnMaster() I will delete or rename files.
Here we should pay attention to the situation of flink stream job processing
bounded data sets.
(8)In batch job with all-to-all shuffle, how do we let the downstream original
execution and speculative execution know which ResultSubPartition to read of
upstream task?
Two executions of a upstream ExecutionVertex will produce two ResultPartition.
When upstream ExecutionVertex have finished we will update the input channel of
down stream execution to the fastest finished execution of upstream.
Here we should pay attention to the situation that the down stream execution
when meet DataConsumptionException. It will restarts with the upstream
execution that has been finished.
(9)How to display information about speculative task on the Flink web ui.
After I have implemented this feature. When speculative execution runs faster
then original execution, the flink ui will show that this task has been
cancelled. But the result of the job is correct, which is in full compliance
with our expectations.
I don’t know much about the web, I will ask my colleague for help.
[~trohrmann]
My implementation has played a big role in our product cluster in Alibaba.
Happy to discuss it.
> Batch Job: Speculative execution
> --------------------------------
>
> Key: FLINK-10644
> URL: https://issues.apache.org/jira/browse/FLINK-10644
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Coordination
> Reporter: JIN SUN
> Assignee: BoWang
> Priority: Major
> Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a
> Batch Job, this somehow impact job latency, as pretty much this straggler
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or
> software mis-configuration, or noise neighboring. It's hard for JM to predict
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark
> has *_speculative execution_*. Speculative execution is a health-check
> procedure that checks for tasks to be speculated, i.e. running slower in a
> ExecutionJobVertex than the median of all successfully completed tasks in
> that EJV, Such slow tasks will be re-submitted to another TM. It will not
> stop the slow tasks, but run a new copy in parallel. And will kill the others
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be
> append later.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)