[ https://issues.apache.org/jira/browse/FLINK-1478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14312264#comment-14312264 ]
ASF GitHub Bot commented on FLINK-1478: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/375#discussion_r24329907 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java --- @@ -260,15 +260,49 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException { -// ExecutionVertex[] vertices = this.taskVertices; -// -// for (int i = 0; i < vertices.length; i++) { -// ExecutionVertex v = vertices[i]; -// -// if (v.get -// } + ExecutionVertex[] vertices = this.taskVertices; - for (ExecutionVertex ev : getTaskVertices()) { + // check if we need to do pre-assignment of tasks + if (inputSplitsPerSubtask != null) { + + final Map<String, List<Instance>> instances = scheduler.getInstancesByHost(); + final Map<String, Integer> assignments = new HashMap<String, Integer>(); + + for (int i = 0; i < vertices.length; i++) { + List<LocatableInputSplit> splitsForHost = inputSplitsPerSubtask[i]; + if (splitsForHost == null || splitsForHost.isEmpty()) { + continue; + } + + String[] hostNames = splitsForHost.get(0).getHostnames(); + if (hostNames == null || hostNames.length == 0 || hostNames[0] == null) { + continue; + } + + String host = hostNames[0]; + ExecutionVertex v = vertices[i]; + + List<Instance> instancesOnHost = instances.get(host); + + if (instancesOnHost == null || instancesOnHost.isEmpty()) { + throw new NoResourceAvailableException("Cannot schedule a strictly local task to host " + host + + ". No TaskManager available on that host."); + } + + Integer pos = assignments.get(host); + if (pos == null) { + pos = 0; + assignments.put(host, 0); + } else { + assignments.put(host, pos + 1 % instancesOnHost.size()); --- End diff -- It should be possible that multiple subtasks go to the same instance. If there are too many, it would fail in the scheduler, yes. We can check the the number of subtasks on the instance does not exceed the number of slots. This seems to me like a workaround solution anyways (until we can tie splits to tasks), so it might be okay. > Add strictly local input split assignment > ----------------------------------------- > > Key: FLINK-1478 > URL: https://issues.apache.org/jira/browse/FLINK-1478 > Project: Flink > Issue Type: New Feature > Components: JobManager > Affects Versions: 0.9 > Reporter: Stephan Ewen > Assignee: Fabian Hueske > Fix For: 0.9 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)