[ 
https://issues.apache.org/jira/browse/FLINK-7219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16094193#comment-16094193
 ] 

Sihua Zhou edited comment on FLINK-7219 at 7/20/17 5:25 AM:
------------------------------------------------------------

Thanks for your reply. This question really needs to be further thought, which 
is why I split it up from 
[FLINK-7153|https://issues.apache.org/jira/browse/FLINK-7153]. Choosing slots 
based on state rather than input is a more reasonable solution. IMO, maybe we 
need to abstract a 
`SlotEvaluater` to calculate the score of a slot relative to the 
`ExecutionVertex`, the `SlotEvaluater` can be base on state or input or both 
state and inputs, Finally, the slot with the highest score will be picked. And, 
i think the `SlotEvaluater` shoud be decoupled from `Scheduler`, `Scheduler` 
doesn't need to know what the evaluate rule is, it only use it to get the score 
for the slot. 

In fact, i think current scheduler also have a problem(Which this issue 
addressed, maybe i should remove the inputs part from this issue title). It can 
only be guaranteed for the Job assigned to slots, but not very good optimized 
allocation. Because the current allocation strategy is to iterate over 
`ExecutionVertex` list and assign slots for EV one by one, all resources are 
allocated in a single traversal. I think a reasonable allocation strategy 
should consider JobGraph's overall situation, it seems to be a DP problem, if 
we want to find the most optimal solution, it may cost a lot of time(which is 
unacceptable when on recovery). but we can use greedy algorithm to find a 
approximate(not the best) optimal solution within the acceptable time. So, IMO, 
the allocation strategy needs to be modified, it will based on the 
`SlotEvaluater`, and a greedy method should be adopted to improve the resource 
allocation, finally get an approximate optimal allocation. 

What do you think about these?[~StephanEwen] 


was (Author: sihuazhou):
Thanks for your reply. This question really needs to be further thought, which 
is why I split it up from 
[FLINK-7153|https://issues.apache.org/jira/browse/FLINK-7153]. Choosing slots 
based on state rather than input is a more reasonable solution. IMO, maybe we 
need to abstract a {code}SlotEvaluater{code} to calculate the score of a slot 
relative to the {code}ExecutionVertex{code}, the {code}SlotEvaluater{code} can 
be base on state or input or both state and inputs, Finally, the slot with the 
highest score will be picked. And, i think the {code}SlotEvaluater{code} shoud 
be decouple from {code}Scheduler{code}, {code}Scheduler{code} doesn't need to 
know what the evaluate rule is, it only use it to get the score for the slot. 

In fact, i think current scheduler also have a problem(Which this issue 
addressed, maybe i should remove the inputs part from this issue title). It can 
only be guaranteed for the Job assigned to slots, but not very good optimized 
allocation. Because the current allocation strategy is to iterate over 
{code}ExecutionVertex{code} list and assign slots for EV one by one, all 
resources are allocated in a single traversal. I think a reasonable allocation 
strategy should consider JobGraph's overall situation, it seems to be a DP 
problem, if we want to find the most optimal solution, it may cost a lot of 
time(which is unacceptable when on recovery). but we can use greedy algorithm 
to find a approximate(not the best) optimal solution within the acceptable 
time. So, IMO, the allocation strategy needs to be modified, it will based on 
the {code}SlotEvaluater{code}, and a greedy method willed be adopted to improve 
the resource allocation, finally get an approximate optimal allocation. 

What do you think about these?[~StephanEwen] 

> Current allocate strategy cann‘t achieve the optimal effect with input's 
> location
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7219
>                 URL: https://issues.apache.org/jira/browse/FLINK-7219
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> This is second subtask of issue 
> [FLINK-7153|https://issues.apache.org/jira/browse/FLINK-7153?filter=-2].
> Current allocate strategy can't allocate the slot optimize.  Here is the test 
> case:
> {code}
> JobVertex v1 = new JobVertex("v1", jid1);
> JobVertex v2 = new JobVertex("v2", jid2);
> SlotSharingGroup group = new SlotSharingGroup();
> v1.setSlotSharingGroup(group);
> v2.setSlotSharingGroup(group);
> v1.setParallelism(2);
> v2.setParallelism(4);
> v1.setInvokableClass(BatchTask.class);
> v2.setInvokableClass(BatchTask.class);
> v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
> ResultPartitionType.PIPELINED_BOUNDED);
> {code}
> Currently, after allocate for v1,v2, we got a local partition and three 
> remote partition. But actually, it should be 2 local partition and 2 remote 
> partition. 
> The causes of the above problems is becuase that the current allocate 
> strategy is allocate the resource for execution one by one(if the execution 
> can allocate from SlotGroup than get it, Otherwise ask for a new one for it). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to