Repository: flink Updated Branches: refs/heads/master 8b3805ba5 -> 4386620c0
[FLINK-1478] [jobmanager] Add deterministic strictly local split assignment (part 1) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6fcef7df Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6fcef7df Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6fcef7df Branch: refs/heads/master Commit: 6fcef7dfbc9d70ff17a729106464d10d068c848b Parents: 8b3805b Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Feb 5 15:48:17 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 9 15:45:35 2015 +0100 ---------------------------------------------------------------------- .../executiongraph/ExecutionJobVertex.java | 125 ++++++++++++++++++- 1 file changed, 119 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6fcef7df/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 0439c08..235fd1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.executiongraph; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,6 +29,7 @@ import org.apache.flink.api.common.io.StrictlyLocalAssignment; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.core.io.InputSplitSource; +import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; @@ -141,13 +144,95 @@ public class ExecutionJobVertex implements Serializable { if (splitSource instanceof StrictlyLocalAssignment) { - // group the splits by host wile preserving order per host - - // assign splits to subtasks - - // this.splitAssigner = new AssignerBasedOnPreAssignment(); - + // group the splits by host while preserving order per host + Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String, List<LocatableInputSplit>>(); + for(int i=0; i<this.inputSplits.length; i++) { + + // check that split has exactly one local host + LocatableInputSplit lis; + InputSplit is = this.inputSplits[i]; + if(!(is instanceof LocatableInputSplit)) { + new JobException("Invalid InputSplit type "+is.getClass().getCanonicalName()+". " + + "Strictly local assignment requires LocatableInputSplit"); + } + lis = (LocatableInputSplit) is; + + if(lis.getHostnames() == null) { + throw new JobException("LocatableInputSplit has no host information. " + + "Strictly local assignment requires exactly one hostname for each LocatableInputSplit."); + } else if (lis.getHostnames().length != 1) { + throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit."); + } + String hostName = lis.getHostnames()[0]; + + List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName); + if(hostSplits == null) { + hostSplits = new ArrayList<LocatableInputSplit>(); + splitsByHost.put(hostName, hostSplits); + } + hostSplits.add(lis); + } + + // assign subtasks to hosts + // get list of hosts in deterministic order + List<String> hosts = new ArrayList<String>(splitsByHost.keySet()); + Collections.sort(hosts); + int numSubTasks = this.getParallelism(); + int numHosts = hosts.size(); + if(numSubTasks < numHosts) { + throw new JobException("Strictly local split assignment requires at least as " + + "many parallel subtasks as distinct split hosts. Please increase the parallelism " + + "of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+"."); + } + + int numSubTasksPerHost = numSubTasks / numHosts; + int numHostWithOneMore = numSubTasks % numHosts; + + Map<String, int[]> subTaskHostAssignment = new HashMap<String, int[]>(numHosts); + int assignedHostsCnt = 0; + int assignedTasksCnt = 0; + for(String host : hosts) { + int numTasksToAssign = assignedHostsCnt < numHostWithOneMore ? numSubTasksPerHost + 1 : numSubTasksPerHost; + int[] subTasks = new int[numTasksToAssign]; + for(int i=0; i<numTasksToAssign; i++) { + subTasks[i] = assignedTasksCnt++; + } + subTaskHostAssignment.put(host, subTasks); + assignedHostsCnt++; + } + // attach locality constraint to subtask + for(String host : hosts) { + int[] subTasks = subTaskHostAssignment.get(host); + + for(int taskId : subTasks) { + this.getTaskVertices()[taskId].setTargetHostConstraint(host); + } + } + + // assign splits to subtasks + this.inputSplitsPerSubtask = (List<InputSplit>[])new List[numSubTasks]; + for(String host : hosts) { + List<LocatableInputSplit> localSplits = splitsByHost.get(host); + int[] localSubTasks = subTaskHostAssignment.get(host); + + // init lists + for(int i=0; i<localSubTasks.length; i++) { + this.inputSplitsPerSubtask[localSubTasks[i]] = new ArrayList<InputSplit>(); + } + + int subTaskIdx = 0; + while(!localSplits.isEmpty()) { + int subTask = localSubTasks[subTaskIdx++]; + this.inputSplitsPerSubtask[subTask].add(localSplits.remove(localSplits.size() - 1)); + if(subTaskIdx == localSubTasks.length) { + subTaskIdx = 0; + } + } + } + + // create predetermined split assigner + this.splitAssigner = new PredeterminedInputSplitAssigner(this.inputSplitsPerSubtask); } else { this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits); @@ -418,4 +503,32 @@ public class ExecutionJobVertex implements Serializable { } } } + + //--------------------------------------------------------------------------------------------- + // Predetermined InputSplitAssigner + //--------------------------------------------------------------------------------------------- + + public static class PredeterminedInputSplitAssigner implements InputSplitAssigner { + + private List<InputSplit>[] inputSplitsPerSubtask; + + public PredeterminedInputSplitAssigner(List<InputSplit>[] inputSplitsPerSubtask) { + // copy input split assignment + this.inputSplitsPerSubtask = (List<InputSplit>[])new List[inputSplitsPerSubtask.length]; + for(int i=0; i<inputSplitsPerSubtask.length; i++) { + this.inputSplitsPerSubtask[i] = new ArrayList<InputSplit>(inputSplitsPerSubtask[i].size()); + this.inputSplitsPerSubtask[i].addAll(inputSplitsPerSubtask[i]); + } + } + + @Override + public InputSplit getNextInputSplit(String host, int taskId) { + if(inputSplitsPerSubtask[taskId].isEmpty()) { + return null; + } else { + InputSplit is = inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1); + return is; + } + } + } }