[ 
https://issues.apache.org/jira/browse/FLINK-10644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337504#comment-17337504
 ] 

wangwj edited comment on FLINK-10644 at 4/30/21, 4:53 PM:
----------------------------------------------------------

Hi, [~trohrmann] 
[~wind_ljy]
I think the closest version of Flink to my Blink version I built this feature 
on maybe is 1.7 or 1.8
Though it seems a little far from the latest version of Flink, I found that the 
code which I want to modify is not much different from that after I read the 
code of Blink and Flink (master branch). So, I am confident to contribute this 
issue.



I think the multi-threading in the ExecutionGraph is two executions finished at 
the same time in a same ExecutionVertex. ExecutionVertex.executionFinished() 
method may be called as two different execution at the same time. Maybe I call 
it "multi-threading" is not very accurate here.

How does the speculative execution play together with other sinks? Does it only 
work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such 
as Hologres, HBase etc. 
In our scenario, the batch job usually writes data 
into Hologres (similar to HBase) or Pangu (similar to HDFS).





How does the blacklisting mechanism work? Does it work also for the K8s and 
Mesos integration or only for the Yarn integration?
The blacklist module is a thread that maintains the black machines of this job 
and removes expired elements periodically. Each element in blacklist contains 
IP and timestamp. The timestamp is used to decide whether the elements of the 
blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use 
nodeaffinity or podaffinity to achieve the same goal with Yarn 
PlacementConstraint in K8s integration. As the mesos integration will be 
deprecated in Flink-1.13, I didn’t consider it.

How much is the change encapsulated by the SchedulerNG interface?
I agree with that the SchedulerNG interface is more or less self-contained, and 
I will consider your proposal carefully in FLIP and coding.





In the next step, I'll move on to figure out what changes are needed in Flink 
(master branch) then try to write a POC.
Then I will send e-mail to [email protected] to discuss this feature.
Then I will write a FLIP and a vote on it.

Thanks




was (Author: wangwj):
Hi, [~trohrmann] 
[~wind_ljy]
I think the closest version of Flink to my Blink version I built this feature 
on maybe is 1.7 or 1.8
Though it seems a little far from the latest version of Flink, I found that the 
code which I want to modify is not much different from that after I read the 
code of Blink and Flink (master branch). So, I am confident to contribute this 
issue.



I think the multi-threading in the ExecutionGraph is two executions finished at 
the same time in a same ExecutionVertex. ExecutionVertex.executionFinished() 
method may be called as two different execution at the same time. Maybe I call 
it "multi-threading" is not very accurate here.

How does the speculative execution play together with other sinks? Does it only 
work for the file based sinks?

The speculative execution could also support sink to Key-value databases, such 
as Hologres, HBase etc. 
In our scenario, the batch job usually writes data 
into Hologres (similar to HBase) or Pangu (similar to HDFS).





How does the blacklisting mechanism work? Does it work also for the K8s and 
Mesos integration or only for the Yarn integration?
The blacklist module is a thread that maintains the black machines of this job 
and removes expired elements periodically. Each element in blacklist contains 
IP and timestamp. The timestamp is used to decide whether the elements of the 
blacklist is expired or not. 

My code only supports Yarn integration. But as far as I know, we could use 
nodeaffinity or podaffinity to achieve the same goal with Yarn 
PlacementConstraint in K8s integration. As the mesos integration will be 
deprecated in Flink-1.13, I didn’t consider it.

How much is the change encapsulated by the SchedulerNG interface?
I agree with that the SchedulerNG interface is more or less self-contained, and 
I will consider your proposal carefully in FLIP and coding.





In the next step, I'll move on to figure out what changes are needed in Flink 
(master branch) then write a POC.
Then I will send e-mail to [email protected] to discuss this feature.
Then I will write a FLIP and a vote on it.

Thanks



> Batch Job: Speculative execution
> --------------------------------
>
>                 Key: FLINK-10644
>                 URL: https://issues.apache.org/jira/browse/FLINK-10644
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Coordination
>            Reporter: JIN SUN
>            Assignee: BoWang
>            Priority: Major
>              Labels: stale-assigned
>
> Strugglers/outlier are tasks that run slower than most of the all tasks in a 
> Batch Job, this somehow impact job latency, as pretty much this straggler 
> will be in the critical path of the job and become as the bottleneck.
> Tasks may be slow for various reasons, including hardware degradation, or 
> software mis-configuration, or noise neighboring. It's hard for JM to predict 
> the runtime.
> To reduce the overhead of strugglers, other system such as Hadoop/Tez, Spark 
> has *_speculative execution_*. Speculative execution is a health-check 
> procedure that checks for tasks to be speculated, i.e. running slower in a 
> ExecutionJobVertex than the median of all successfully completed tasks in 
> that EJV, Such slow tasks will be re-submitted to another TM. It will not 
> stop the slow tasks, but run a new copy in parallel. And will kill the others 
> if one of them complete.
> This JIRA is an umbrella to apply this kind of idea in FLINK. Details will be 
> append later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to