[ 
https://issues.apache.org/jira/browse/FLINK-1478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312264#comment-14312264
 ] 

ASF GitHub Bot commented on FLINK-1478:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/375#discussion_r24329907
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 ---
    @@ -260,15 +260,49 @@ public void 
connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
        
        public void scheduleAll(Scheduler scheduler, boolean queued) throws 
NoResourceAvailableException {
                
    -//         ExecutionVertex[] vertices = this.taskVertices;
    -//         
    -//         for (int i = 0; i < vertices.length; i++) {
    -//                 ExecutionVertex v = vertices[i];
    -//                 
    -//                 if (v.get 
    -//         }
    +           ExecutionVertex[] vertices = this.taskVertices;
                
    -           for (ExecutionVertex ev : getTaskVertices()) {
    +           // check if we need to do pre-assignment of tasks
    +           if (inputSplitsPerSubtask != null) {
    +           
    +                   final Map<String, List<Instance>> instances = 
scheduler.getInstancesByHost();
    +                   final Map<String, Integer> assignments = new 
HashMap<String, Integer>();
    +                   
    +                   for (int i = 0; i < vertices.length; i++) {
    +                           List<LocatableInputSplit> splitsForHost = 
inputSplitsPerSubtask[i];
    +                           if (splitsForHost == null || 
splitsForHost.isEmpty()) {
    +                                   continue;
    +                           }
    +                           
    +                           String[] hostNames = 
splitsForHost.get(0).getHostnames();
    +                           if (hostNames == null || hostNames.length == 0 
|| hostNames[0] == null) {
    +                                   continue;
    +                           }
    +                           
    +                           String host = hostNames[0];
    +                           ExecutionVertex v = vertices[i];
    +                           
    +                           List<Instance> instancesOnHost = 
instances.get(host);
    +                           
    +                           if (instancesOnHost == null || 
instancesOnHost.isEmpty()) {
    +                                   throw new 
NoResourceAvailableException("Cannot schedule a strictly local task to host " + 
host
    +                                                   + ". No TaskManager 
available on that host.");
    +                           }
    +                           
    +                           Integer pos = assignments.get(host);
    +                           if (pos == null) {
    +                                   pos = 0;
    +                                   assignments.put(host, 0);
    +                           } else {
    +                                   assignments.put(host, pos + 1 % 
instancesOnHost.size());
    --- End diff --
    
    It should be possible that multiple subtasks go to the same instance. If 
there are too many, it would fail in the scheduler, yes. We can check the the 
number of subtasks on the instance does not exceed the number of slots.
    
    This seems to me like a workaround solution anyways (until we can tie 
splits to tasks), so it might be okay.


> Add strictly local input split assignment
> -----------------------------------------
>
>                 Key: FLINK-1478
>                 URL: https://issues.apache.org/jira/browse/FLINK-1478
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager
>    Affects Versions: 0.9
>            Reporter: Stephan Ewen
>            Assignee: Fabian Hueske
>             Fix For: 0.9
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to