Repository: flink
Updated Branches:
  refs/heads/master 8b3805ba5 -> 4386620c0


[FLINK-1478] [jobmanager] Add deterministic strictly local split assignment 
(part 1)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6fcef7df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6fcef7df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6fcef7df

Branch: refs/heads/master
Commit: 6fcef7dfbc9d70ff17a729106464d10d068c848b
Parents: 8b3805b
Author: Fabian Hueske <fhue...@apache.org>
Authored: Thu Feb 5 15:48:17 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 9 15:45:35 2015 +0100

----------------------------------------------------------------------
 .../executiongraph/ExecutionJobVertex.java      | 125 ++++++++++++++++++-
 1 file changed, 119 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6fcef7df/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 0439c08..235fd1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.executiongraph;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -27,6 +29,7 @@ import org.apache.flink.api.common.io.StrictlyLocalAssignment;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
+import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -141,13 +144,95 @@ public class ExecutionJobVertex implements Serializable {
                                
                                if (splitSource instanceof 
StrictlyLocalAssignment) {
                                        
-                                       // group the splits by host wile 
preserving order per host
-                                       
-                                       // assign splits to subtasks
-                                       
-                                       // this.splitAssigner = new 
AssignerBasedOnPreAssignment();
-                                       
+                                       // group the splits by host while 
preserving order per host
+                                       Map<String, List<LocatableInputSplit>> 
splitsByHost = new HashMap<String, List<LocatableInputSplit>>();
+                                       for(int i=0; i<this.inputSplits.length; 
i++) {
+
+                                               // check that split has exactly 
one local host
+                                               LocatableInputSplit lis;
+                                               InputSplit is = 
this.inputSplits[i];
+                                               if(!(is instanceof 
LocatableInputSplit)) {
+                                                       new 
JobException("Invalid InputSplit type "+is.getClass().getCanonicalName()+". " +
+                                                                       
"Strictly local assignment requires LocatableInputSplit");
+                                               }
+                                               lis = (LocatableInputSplit) is;
+
+                                               if(lis.getHostnames() == null) {
+                                                       throw new 
JobException("LocatableInputSplit has no host information. " +
+                                                                       
"Strictly local assignment requires exactly one hostname for each 
LocatableInputSplit.");
+                                               } else if 
(lis.getHostnames().length != 1) {
+                                                       throw new 
JobException("Strictly local assignment requires exactly one hostname for each 
LocatableInputSplit.");
+                                               }
+                                               String hostName = 
lis.getHostnames()[0];
+
+                                               List<LocatableInputSplit> 
hostSplits = splitsByHost.get(hostName);
+                                               if(hostSplits == null) {
+                                                       hostSplits = new 
ArrayList<LocatableInputSplit>();
+                                                       
splitsByHost.put(hostName, hostSplits);
+                                               }
+                                               hostSplits.add(lis);
+                                       }
+
+                                       // assign subtasks to hosts
+                                       // get list of hosts in deterministic 
order
+                                       List<String> hosts = new 
ArrayList<String>(splitsByHost.keySet());
+                                       Collections.sort(hosts);
+                                       int numSubTasks = this.getParallelism();
+                                       int numHosts = hosts.size();
+                                       if(numSubTasks < numHosts) {
+                                               throw new 
JobException("Strictly local split assignment requires at least as " +
+                                                               "many parallel 
subtasks as distinct split hosts. Please increase the parallelism " +
+                                                               "of DataSource 
"+this.getJobVertex().getName()+" to at least "+numHosts+".");
+                                       }
+
+                                       int numSubTasksPerHost = numSubTasks / 
numHosts;
+                                       int numHostWithOneMore = numSubTasks % 
numHosts;
+
+                                       Map<String, int[]> 
subTaskHostAssignment = new HashMap<String, int[]>(numHosts);
+                                       int assignedHostsCnt = 0;
+                                       int assignedTasksCnt = 0;
+                                       for(String host : hosts) {
+                                               int numTasksToAssign = 
assignedHostsCnt < numHostWithOneMore ? numSubTasksPerHost + 1 : 
numSubTasksPerHost;
+                                               int[] subTasks = new 
int[numTasksToAssign];
+                                               for(int i=0; 
i<numTasksToAssign; i++) {
+                                                       subTasks[i] = 
assignedTasksCnt++;
+                                               }
+                                               subTaskHostAssignment.put(host, 
subTasks);
+                                               assignedHostsCnt++;
+                                       }
+
                                        // attach locality constraint to subtask
+                                       for(String host : hosts) {
+                                               int[] subTasks = 
subTaskHostAssignment.get(host);
+
+                                               for(int taskId : subTasks) {
+                                                       
this.getTaskVertices()[taskId].setTargetHostConstraint(host);
+                                               }
+                                       }
+
+                                       // assign splits to subtasks
+                                       this.inputSplitsPerSubtask = 
(List<InputSplit>[])new List[numSubTasks];
+                                       for(String host : hosts) {
+                                               List<LocatableInputSplit> 
localSplits = splitsByHost.get(host);
+                                               int[] localSubTasks = 
subTaskHostAssignment.get(host);
+
+                                               // init lists
+                                               for(int i=0; 
i<localSubTasks.length; i++) {
+                                                       
this.inputSplitsPerSubtask[localSubTasks[i]] = new ArrayList<InputSplit>();
+                                               }
+
+                                               int subTaskIdx = 0;
+                                               while(!localSplits.isEmpty()) {
+                                                       int subTask = 
localSubTasks[subTaskIdx++];
+                                                       
this.inputSplitsPerSubtask[subTask].add(localSplits.remove(localSplits.size() - 
1));
+                                                       if(subTaskIdx == 
localSubTasks.length) {
+                                                               subTaskIdx = 0;
+                                                       }
+                                               }
+                                       }
+
+                                       // create predetermined split assigner
+                                       this.splitAssigner = new 
PredeterminedInputSplitAssigner(this.inputSplitsPerSubtask);
                                        
                                } else {
                                        this.splitAssigner = 
splitSource.getInputSplitAssigner(this.inputSplits);
@@ -418,4 +503,32 @@ public class ExecutionJobVertex implements Serializable {
                        }
                }
        }
+
+       
//---------------------------------------------------------------------------------------------
+       //  Predetermined InputSplitAssigner
+       
//---------------------------------------------------------------------------------------------
+
+       public static class PredeterminedInputSplitAssigner implements 
InputSplitAssigner {
+
+               private List<InputSplit>[] inputSplitsPerSubtask;
+
+               public PredeterminedInputSplitAssigner(List<InputSplit>[] 
inputSplitsPerSubtask) {
+                       // copy input split assignment
+                       this.inputSplitsPerSubtask = (List<InputSplit>[])new 
List[inputSplitsPerSubtask.length];
+                       for(int i=0; i<inputSplitsPerSubtask.length; i++) {
+                               this.inputSplitsPerSubtask[i] = new 
ArrayList<InputSplit>(inputSplitsPerSubtask[i].size());
+                               
this.inputSplitsPerSubtask[i].addAll(inputSplitsPerSubtask[i]);
+                       }
+               }
+
+               @Override
+               public InputSplit getNextInputSplit(String host, int taskId) {
+                       if(inputSplitsPerSubtask[taskId].isEmpty()) {
+                               return null;
+                       } else {
+                               InputSplit is = 
inputSplitsPerSubtask[taskId].remove(inputSplitsPerSubtask[taskId].size() - 1);
+                               return is;
+                       }
+               }
+       }
 }

Reply via email to