http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
new file mode 100644
index 0000000..04bc527
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -0,0 +1,1578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plantranslate;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.aggregators.AggregatorRegistry;
+import org.apache.flink.api.common.aggregators.AggregatorWithName;
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.cache.DistributedCache;
+import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
+import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
+import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
+import 
org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
+import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.CoGroupDriver;
+import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
+import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
+import org.apache.flink.runtime.operators.DataSinkTask;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
+import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
+import org.apache.flink.runtime.operators.MatchDriver;
+import org.apache.flink.runtime.operators.NoOpDriver;
+import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.chaining.ChainedDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Visitor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This component translates the optimizer's resulting {@link 
org.apache.flink.optimizer.plan.OptimizedPlan}
+ * to a {@link org.apache.flink.runtime.jobgraph.JobGraph}. The translation is 
not strictly a one-to-one,
+ * because some nodes from the OptimizedPlan are collapsed into one job vertex.
+ *
+ * This translation does not make any decisions or assumptions. All 
degrees-of-freedom in the execution
+ * of the job are made by the Optimizer, so that this translation becomes a 
deterministic mapping.
+ *
+ * The basic method of operation is a top down traversal over the plan graph. 
On the way down, job vertices
+ * are created for the plan nodes, on the way back up, the nodes connect their 
predecessors.
+ */
+public class JobGraphGenerator implements Visitor<PlanNode> {
+       
+       public static final String MERGE_ITERATION_AUX_TASKS_KEY = 
"compiler.merge-iteration-aux";
+       
+       private static final boolean mergeIterationAuxTasks = 
GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
+       
+       private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new 
TaskInChain(null, null, null);
+       
+       // 
------------------------------------------------------------------------
+
+       private Map<PlanNode, AbstractJobVertex> vertices; // a map from 
optimizer nodes to job vertices
+       
+       private Map<PlanNode, TaskInChain> chainedTasks; // a map from 
optimizer nodes to job vertices
+       
+       private Map<IterationPlanNode, IterationDescriptor> iterations;
+       
+       private List<TaskInChain> chainedTasksInSequence;
+       
+       private List<AbstractJobVertex> auxVertices; // auxiliary vertices 
which are added during job graph generation
+       
+       private final int defaultMaxFan;
+       
+       private final float defaultSortSpillingThreshold;
+       
+       private int iterationIdEnumerator = 1;
+       
+       private IterationPlanNode currentIteration; // the current the 
enclosing iteration
+       
+       private List<IterationPlanNode> iterationStack;  // stack of enclosing 
iterations
+       
+       private SlotSharingGroup sharingGroup;
+       
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new job graph generator that uses the default values for 
its resource configuration.
+        */
+       public JobGraphGenerator() {
+               this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN;
+               this.defaultSortSpillingThreshold = 
ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD;
+       }
+       
+       public JobGraphGenerator(Configuration config) {
+               this.defaultMaxFan = 
config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY, 
+                               ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
+               this.defaultSortSpillingThreshold = 
config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
+                       ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
+       }
+
+       /**
+        * Translates a {@link org.apache.flink.optimizer.plan.OptimizedPlan} 
into a
+        * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
+        * 
+        * @param program Optimized plan that is translated into a JobGraph.
+        * @return JobGraph generated frmo the plan.
+        */
+       public JobGraph compileJobGraph(OptimizedPlan program) {
+               this.vertices = new HashMap<PlanNode, AbstractJobVertex>();
+               this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
+               this.chainedTasksInSequence = new ArrayList<TaskInChain>();
+               this.auxVertices = new ArrayList<AbstractJobVertex>();
+               this.iterations = new HashMap<IterationPlanNode, 
IterationDescriptor>();
+               this.iterationStack = new ArrayList<IterationPlanNode>();
+               
+               this.sharingGroup = new SlotSharingGroup();
+               
+               // this starts the traversal that generates the job graph
+               program.accept(this);
+               
+               // sanity check that we are not somehow in an iteration at the 
end
+               if (this.currentIteration != null) {
+                       throw new CompilerException("The graph translation 
ended prematurely, leaving an unclosed iteration.");
+               }
+               
+               // finalize the iterations
+               for (IterationDescriptor iteration : this.iterations.values()) {
+                       if (iteration.getIterationNode() instanceof 
BulkIterationPlanNode) {
+                               finalizeBulkIteration(iteration);
+                       } else if (iteration.getIterationNode() instanceof 
WorksetIterationPlanNode) {
+                               finalizeWorksetIteration(iteration);
+                       } else {
+                               throw new CompilerException();
+                       }
+               }
+               
+               // now that the traversal is done, we have the chained tasks 
write their configs into their
+               // parents' configurations
+               for (TaskInChain tic : this.chainedTasksInSequence) {
+                       TaskConfig t = new 
TaskConfig(tic.getContainingVertex().getConfiguration());
+                       t.addChainedTask(tic.getChainedTask(), 
tic.getTaskConfig(), tic.getTaskName());
+               }
+               
+               // create the job graph object
+               JobGraph graph = new JobGraph(program.getJobName());
+               
graph.setNumberOfExecutionRetries(program.getOriginalPactPlan().getNumberOfExecutionRetries());
+               graph.setAllowQueuedScheduling(false);
+               
+               // add vertices to the graph
+               for (AbstractJobVertex vertex : this.vertices.values()) {
+                       graph.addVertex(vertex);
+               }
+               
+               for (AbstractJobVertex vertex : this.auxVertices) {
+                       graph.addVertex(vertex);
+                       vertex.setSlotSharingGroup(sharingGroup);
+               }
+               
+               // add registered cache file into job configuration
+               for (Entry<String, DistributedCacheEntry> e : 
program.getOriginalPactPlan().getCachedFiles()) {
+                       DistributedCache.writeFileInfoToConfig(e.getKey(), 
e.getValue(), graph.getJobConfiguration());
+               }
+
+               try {
+                       InstantiationUtil.writeObjectToConfig(
+                                       
program.getOriginalPactPlan().getExecutionConfig(),
+                                       graph.getJobConfiguration(),
+                                       ExecutionConfig.CONFIG_KEY);
+               } catch (IOException e) {
+                       throw new RuntimeException("Config object could not be 
written to Job Configuration: " + e);
+               }
+
+               // release all references again
+               this.vertices = null;
+               this.chainedTasks = null;
+               this.chainedTasksInSequence = null;
+               this.auxVertices = null;
+               this.iterations = null;
+               this.iterationStack = null;
+               
+               // return job graph
+               return graph;
+       }
+       
+       /**
+        * This methods implements the pre-visiting during a depth-first 
traversal. It create the job vertex and
+        * sets local strategy.
+        * 
+        * @param node
+        *        The node that is currently processed.
+        * @return True, if the visitor should descend to the node's children, 
false if not.
+        * @see 
org.apache.flink.util.Visitor#preVisit(org.apache.flink.util.Visitable)
+        */
+       @Override
+       public boolean preVisit(PlanNode node) {
+               // check if we have visited this node before. in non-tree 
graphs, this happens
+               if (this.vertices.containsKey(node) || 
this.chainedTasks.containsKey(node) || this.iterations.containsKey(node)) {
+                       // return false to prevent further descend
+                       return false;
+               }
+
+               // the vertex to be created for the current node
+               final AbstractJobVertex vertex;
+               try {
+                       if (node instanceof SinkPlanNode) {
+                               vertex = createDataSinkVertex((SinkPlanNode) 
node);
+                       }
+                       else if (node instanceof SourcePlanNode) {
+                               vertex = 
createDataSourceVertex((SourcePlanNode) node);
+                       }
+                       else if (node instanceof BulkIterationPlanNode) {
+                               BulkIterationPlanNode iterationNode = 
(BulkIterationPlanNode) node;
+                               // for the bulk iteration, we skip creating 
anything for now. we create the graph
+                               // for the step function in the post visit.
+                               
+                               // check that the root of the step function has 
the same DOP as the iteration.
+                               // because the tail must have the same DOP as 
the head, we can only merge the last
+                               // operator with the tail, if they have the 
same DOP. not merging is currently not
+                               // implemented
+                               PlanNode root = 
iterationNode.getRootOfStepFunction();
+                               if (root.getParallelism() != 
node.getParallelism())
+                               {
+                                       throw new CompilerException("Error: The 
final operator of the step " +
+                                                       "function has a 
different degree of parallelism than the iteration operator itself.");
+                               }
+                               
+                               IterationDescriptor descr = new 
IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
+                               this.iterations.put(iterationNode, descr);
+                               vertex = null;
+                       }
+                       else if (node instanceof WorksetIterationPlanNode) {
+                               WorksetIterationPlanNode iterationNode = 
(WorksetIterationPlanNode) node;
+
+                               // we have the same constraints as for the bulk 
iteration
+                               PlanNode nextWorkSet = 
iterationNode.getNextWorkSetPlanNode();
+                               PlanNode solutionSetDelta  = 
iterationNode.getSolutionSetDeltaPlanNode();
+                               
+                               if (nextWorkSet.getParallelism() != 
node.getParallelism())
+                               {
+                                       throw new CompilerException("It is 
currently not supported that the final operator of the step " +
+                                                       "function has a 
different degree of parallelism than the iteration operator itself.");
+                               }
+                               if (solutionSetDelta.getParallelism() != 
node.getParallelism())
+                               {
+                                       throw new CompilerException("It is 
currently not supported that the final operator of the step " +
+                                                       "function has a 
different degree of parallelism than the iteration operator itself.");
+                               }
+                               
+                               IterationDescriptor descr = new 
IterationDescriptor(iterationNode, this.iterationIdEnumerator++);
+                               this.iterations.put(iterationNode, descr);
+                               vertex = null;
+                       }
+                       else if (node instanceof SingleInputPlanNode) {
+                               vertex = 
createSingleInputVertex((SingleInputPlanNode) node);
+                       }
+                       else if (node instanceof DualInputPlanNode) {
+                               vertex = 
createDualInputVertex((DualInputPlanNode) node);
+                       }
+                       else if (node instanceof NAryUnionPlanNode) {
+                               // skip the union for now
+                               vertex = null;
+                       }
+                       else if (node instanceof BulkPartialSolutionPlanNode) {
+                               // create a head node (or not, if it is merged 
into its successor)
+                               vertex = 
createBulkIterationHead((BulkPartialSolutionPlanNode) node);
+                       }
+                       else if (node instanceof SolutionSetPlanNode) {
+                               // this represents an access into the solution 
set index.
+                               // we do not create a vertex for the solution 
set here (we create the head at the workset place holder)
+                               
+                               // we adjust the joins / cogroups that go into 
the solution set here
+                               for (Channel c : node.getOutgoingChannels()) {
+                                       DualInputPlanNode target = 
(DualInputPlanNode) c.getTarget();
+                                       AbstractJobVertex accessingVertex = 
this.vertices.get(target);
+                                       TaskConfig conf = new 
TaskConfig(accessingVertex.getConfiguration());
+                                       int inputNum = c == target.getInput1() 
? 0 : c == target.getInput2() ? 1 : -1;
+                                       
+                                       // sanity checks
+                                       if (inputNum == -1) {
+                                               throw new CompilerException();
+                                       }
+                                       
+                                       // adjust the driver
+                                       if 
(conf.getDriver().equals(MatchDriver.class)) {
+                                               conf.setDriver(inputNum == 0 ? 
JoinWithSolutionSetFirstDriver.class : JoinWithSolutionSetSecondDriver.class);
+                                       }
+                                       else if 
(conf.getDriver().equals(CoGroupDriver.class)) {
+                                               conf.setDriver(inputNum == 0 ? 
CoGroupWithSolutionSetFirstDriver.class : 
CoGroupWithSolutionSetSecondDriver.class);
+                                       }
+                                       else {
+                                               throw new 
CompilerException("Found join with solution set using incompatible operator 
(only Join/CoGroup are valid).");
+                                       }
+                               }
+                               
+                               // make sure we do not visit this node again. 
for that, we add a 'already seen' entry into one of the sets
+                               this.chainedTasks.put(node, 
ALREADY_VISITED_PLACEHOLDER);
+                               
+                               vertex = null;
+                       }
+                       else if (node instanceof WorksetPlanNode) {
+                               // create the iteration head here
+                               vertex = 
createWorksetIterationHead((WorksetPlanNode) node);
+                       }
+                       else {
+                               throw new CompilerException("Unrecognized node 
type: " + node.getClass().getName());
+                       }
+               }
+               catch (Exception e) {
+                       throw new CompilerException("Error translating node '" 
+ node + "': " + e.getMessage(), e);
+               }
+               
+               // check if a vertex was created, or if it was chained or 
skipped
+               if (vertex != null) {
+                       // set degree of parallelism
+                       int pd = node.getParallelism();
+                       vertex.setParallelism(pd);
+                       
+                       vertex.setSlotSharingGroup(sharingGroup);
+                       
+                       // check whether this vertex is part of an iteration 
step function
+                       if (this.currentIteration != null) {
+                               // check that the task has the same DOP as the 
iteration as such
+                               PlanNode iterationNode = (PlanNode) 
this.currentIteration;
+                               if (iterationNode.getParallelism() < pd) {
+                                       throw new CompilerException("Error: All 
functions that are part of an iteration must have the same, or a lower, 
degree-of-parallelism than the iteration operator.");
+                               }
+
+                               // store the id of the iterations the step 
functions participate in
+                               IterationDescriptor descr = 
this.iterations.get(this.currentIteration);
+                               new 
TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId());
+                       }
+       
+                       // store in the map
+                       this.vertices.put(node, vertex);
+               }
+
+               // returning true causes deeper descend
+               return true;
+       }
+
+       /**
+        * This method implements the post-visit during the depth-first 
traversal. When the post visit happens,
+        * all of the descendants have been processed, so this method connects 
all of the current node's
+        * predecessors to the current node.
+        * 
+        * @param node
+        *        The node currently processed during the post-visit.
+        * @see 
org.apache.flink.util.Visitor#postVisit(org.apache.flink.util.Visitable) t
+        */
+       @Override
+       public void postVisit(PlanNode node) {
+               try {
+                       // --------- check special cases for which we handle 
post visit differently ----------
+                       
+                       // skip data source node (they have no inputs)
+                       // also, do nothing for union nodes, we connect them 
later when gathering the inputs for a task
+                       // solution sets have no input. the initial solution 
set input is connected when the iteration node is in its postVisit
+                       if (node instanceof SourcePlanNode || node instanceof 
NAryUnionPlanNode || node instanceof SolutionSetPlanNode) {
+                               return;
+                       }
+                       
+                       // check if we have an iteration. in that case, 
translate the step function now
+                       if (node instanceof IterationPlanNode) {
+                               // prevent nested iterations
+                               if (node.isOnDynamicPath()) {
+                                       throw new CompilerException("Nested 
Iterations are not possible at the moment!");
+                               }
+                               
+                               // if we recursively go into an iteration 
(because the constant path of one iteration contains
+                               // another one), we push the current one onto 
the stack
+                               if (this.currentIteration != null) {
+                                       
this.iterationStack.add(this.currentIteration);
+                               }
+                               
+                               this.currentIteration = (IterationPlanNode) 
node;
+                               
this.currentIteration.acceptForStepFunction(this);
+                               
+                               // pop the current iteration from the stack
+                               if (this.iterationStack.isEmpty()) {
+                                       this.currentIteration = null;
+                               } else {
+                                       this.currentIteration = 
this.iterationStack.remove(this.iterationStack.size() - 1);
+                               }
+                               
+                               // inputs for initial bulk partial solution or 
initial workset are already connected to the iteration head in the head's post 
visit.
+                               // connect the initial solution set now.
+                               if (node instanceof WorksetIterationPlanNode) {
+                                       // connect the initial solution set
+                                       WorksetIterationPlanNode wsNode = 
(WorksetIterationPlanNode) node;
+                                       AbstractJobVertex headVertex = 
this.iterations.get(wsNode).getHeadTask();
+                                       TaskConfig headConfig = new 
TaskConfig(headVertex.getConfiguration());
+                                       int inputIndex = 
headConfig.getDriverStrategy().getNumInputs();
+                                       
headConfig.setIterationHeadSolutionSetInputIndex(inputIndex);
+                                       
translateChannel(wsNode.getInitialSolutionSetInput(), inputIndex, headVertex, 
headConfig, false);
+                               }
+                               
+                               return;
+                       }
+                       
+                       final AbstractJobVertex targetVertex = 
this.vertices.get(node);
+                       
+                       
+                       // --------- Main Path: Translation of channels 
----------
+                       // 
+                       // There are two paths of translation: One for chained 
tasks (or merged tasks in general),
+                       // which do not have their own task vertex. The other 
for tasks that have their own vertex,
+                       // or are the primary task in a vertex (to which the 
others are chained).
+                       
+                       // check whether this node has its own task, or is 
merged with another one
+                       if (targetVertex == null) {
+                               // node's task is merged with another task. it 
is either chained, of a merged head vertex
+                               // from an iteration
+                               final TaskInChain chainedTask;
+                               if ((chainedTask = this.chainedTasks.get(node)) 
!= null) {
+                                       // Chained Task. Sanity check first...
+                                       final Iterator<Channel> inConns = 
node.getInputs().iterator();
+                                       if (!inConns.hasNext()) {
+                                               throw new 
CompilerException("Bug: Found chained task with no input.");
+                                       }
+                                       final Channel inConn = inConns.next();
+                                       
+                                       if (inConns.hasNext()) {
+                                               throw new 
CompilerException("Bug: Found a chained task with more than one input!");
+                                       }
+                                       if (inConn.getLocalStrategy() != null 
&& inConn.getLocalStrategy() != LocalStrategy.NONE) {
+                                               throw new 
CompilerException("Bug: Found a chained task with an input local strategy.");
+                                       }
+                                       if (inConn.getShipStrategy() != null && 
inConn.getShipStrategy() != ShipStrategyType.FORWARD) {
+                                               throw new 
CompilerException("Bug: Found a chained task with an input ship strategy other 
than FORWARD.");
+                                       }
+       
+                                       AbstractJobVertex container = 
chainedTask.getContainingVertex();
+                                       
+                                       if (container == null) {
+                                               final PlanNode sourceNode = 
inConn.getSource();
+                                               container = 
this.vertices.get(sourceNode);
+                                               if (container == null) {
+                                                       // predecessor is 
itself chained
+                                                       container = 
this.chainedTasks.get(sourceNode).getContainingVertex();
+                                                       if (container == null) {
+                                                               throw new 
IllegalStateException("Bug: Chained task predecessor has not been assigned its 
containing vertex.");
+                                                       }
+                                               } else {
+                                                       // predecessor is a 
proper task job vertex and this is the first chained task. add a forward 
connection entry.
+                                                       new 
TaskConfig(container.getConfiguration()).addOutputShipStrategy(ShipStrategyType.FORWARD);
+                                               }
+                                               
chainedTask.setContainingVertex(container);
+                                       }
+                                       
+                                       // add info about the input serializer 
type
+                                       
chainedTask.getTaskConfig().setInputSerializer(inConn.getSerializer(), 0);
+                                       
+                                       // update name of container task
+                                       String containerTaskName = 
container.getName();
+                                       if(containerTaskName.startsWith("CHAIN 
")) {
+                                               
container.setName(containerTaskName+" -> "+chainedTask.getTaskName());
+                                       } else {
+                                               container.setName("CHAIN 
"+containerTaskName+" -> "+chainedTask.getTaskName());
+                                       }
+                                       
+                                       
this.chainedTasksInSequence.add(chainedTask);
+                                       return;
+                               }
+                               else if (node instanceof 
BulkPartialSolutionPlanNode ||
+                                               node instanceof WorksetPlanNode)
+                               {
+                                       // merged iteration head task. the task 
that the head is merged with will take care of it
+                                       return;
+                               } else {
+                                       throw new CompilerException("Bug: 
Unrecognized merged task vertex.");
+                               }
+                       }
+                       
+                       // -------- Here, we translate non-chained tasks 
-------------
+                       
+                       
+                       if (this.currentIteration != null) {
+                               AbstractJobVertex head = 
this.iterations.get(this.currentIteration).getHeadTask();
+                               // the head may still be null if we descend 
into the static parts first
+                               if (head != null) {
+                                       
targetVertex.setStrictlyCoLocatedWith(head);
+                               }
+                       }
+                       
+                       
+                       // create the config that will contain all the 
description of the inputs
+                       final TaskConfig targetVertexConfig = new 
TaskConfig(targetVertex.getConfiguration());
+                                               
+                       // get the inputs. if this node is the head of an 
iteration, we obtain the inputs from the
+                       // enclosing iteration node, because the inputs are the 
initial inputs to the iteration.
+                       final Iterator<Channel> inConns;
+                       if (node instanceof BulkPartialSolutionPlanNode) {
+                               inConns = ((BulkPartialSolutionPlanNode) 
node).getContainingIterationNode().getInputs().iterator();
+                               // because the partial solution has its own 
vertex, is has only one (logical) input.
+                               // note this in the task configuration
+                               
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
+                       } else if (node instanceof WorksetPlanNode) {
+                               WorksetPlanNode wspn = (WorksetPlanNode) node;
+                               // input that is the initial workset
+                               inConns = 
Collections.singleton(wspn.getContainingIterationNode().getInput2()).iterator();
+                               
+                               // because we have a stand-alone (non-merged) 
workset iteration head, the initial workset will
+                               // be input 0 and the solution set will be 
input 1
+                               
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
+                               
targetVertexConfig.setIterationHeadSolutionSetInputIndex(1);
+                       } else {
+                               inConns = node.getInputs().iterator();
+                       }
+                       if (!inConns.hasNext()) {
+                               throw new CompilerException("Bug: Found a 
non-source task with no input.");
+                       }
+                       
+                       int inputIndex = 0;
+                       while (inConns.hasNext()) {
+                               Channel input = inConns.next();
+                               inputIndex += translateChannel(input, 
inputIndex, targetVertex, targetVertexConfig, false);
+                       }
+                       // broadcast variables
+                       int broadcastInputIndex = 0;
+                       for (NamedChannel broadcastInput: 
node.getBroadcastInputs()) {
+                               int broadcastInputIndexDelta = 
translateChannel(broadcastInput, broadcastInputIndex, targetVertex, 
targetVertexConfig, true);
+                               
targetVertexConfig.setBroadcastInputName(broadcastInput.getName(), 
broadcastInputIndex);
+                               
targetVertexConfig.setBroadcastInputSerializer(broadcastInput.getSerializer(), 
broadcastInputIndex);
+                               broadcastInputIndex += broadcastInputIndexDelta;
+                       }
+               } catch (Exception e) {
+                       throw new CompilerException(
+                               "An error occurred while translating the 
optimized plan to a nephele JobGraph: " + e.getMessage(), e);
+               }
+       }
+       
+       private int translateChannel(Channel input, int inputIndex, 
AbstractJobVertex targetVertex,
+                       TaskConfig targetVertexConfig, boolean isBroadcast) 
throws Exception
+       {
+               final PlanNode inputPlanNode = input.getSource();
+               final Iterator<Channel> allInChannels;
+               
+               if (inputPlanNode instanceof NAryUnionPlanNode) {
+                       allInChannels = ((NAryUnionPlanNode) 
inputPlanNode).getListOfInputs().iterator();
+               }
+               else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
+                       if (this.vertices.get(inputPlanNode) == null) {
+                               // merged iteration head
+                               final BulkPartialSolutionPlanNode pspn = 
(BulkPartialSolutionPlanNode) inputPlanNode;
+                               final BulkIterationPlanNode iterationNode = 
pspn.getContainingIterationNode();
+                               
+                               // check if the iteration's input is a union
+                               if (iterationNode.getInput().getSource() 
instanceof NAryUnionPlanNode) {
+                                       allInChannels = ((NAryUnionPlanNode) 
iterationNode.getInput().getSource()).getInputs().iterator();
+                               } else {
+                                       allInChannels = 
Collections.singletonList(iterationNode.getInput()).iterator();
+                               }
+                               
+                               // also, set the index of the gate with the 
partial solution
+                               
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
+                       } else {
+                               // standalone iteration head
+                               allInChannels = 
Collections.singletonList(input).iterator();
+                       }
+               } else if (inputPlanNode instanceof WorksetPlanNode) {
+                       if (this.vertices.get(inputPlanNode) == null) {
+                               // merged iteration head
+                               final WorksetPlanNode wspn = (WorksetPlanNode) 
inputPlanNode;
+                               final WorksetIterationPlanNode iterationNode = 
wspn.getContainingIterationNode();
+                               
+                               // check if the iteration's input is a union
+                               if (iterationNode.getInput2().getSource() 
instanceof NAryUnionPlanNode) {
+                                       allInChannels = ((NAryUnionPlanNode) 
iterationNode.getInput2().getSource()).getInputs().iterator();
+                               } else {
+                                       allInChannels = 
Collections.singletonList(iterationNode.getInput2()).iterator();
+                               }
+                               
+                               // also, set the index of the gate with the 
partial solution
+                               
targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
+                       } else {
+                               // standalone iteration head
+                               allInChannels = 
Collections.singletonList(input).iterator();
+                       }
+               } else if (inputPlanNode instanceof SolutionSetPlanNode) {
+                       // for now, skip connections with the solution set 
node, as this is a local index access (later to be parameterized here)
+                       // rather than a vertex connection
+                       return 0;
+               } else {
+                       allInChannels = 
Collections.singletonList(input).iterator();
+               }
+               
+               // check that the type serializer is consistent
+               TypeSerializerFactory<?> typeSerFact = null;
+               
+               // accounting for channels on the dynamic path
+               int numChannelsTotal = 0;
+               int numChannelsDynamicPath = 0;
+               int numDynamicSenderTasksTotal = 0;
+               
+
+               // expand the channel to all the union channels, in case there 
is a union operator at its source
+               while (allInChannels.hasNext()) {
+                       final Channel inConn = allInChannels.next();
+                       
+                       // sanity check the common serializer
+                       if (typeSerFact == null) {
+                               typeSerFact = inConn.getSerializer();
+                       } else if (!typeSerFact.equals(inConn.getSerializer())) 
{
+                               throw new CompilerException("Conflicting types 
in union operator.");
+                       }
+                       
+                       final PlanNode sourceNode = inConn.getSource();
+                       AbstractJobVertex sourceVertex = 
this.vertices.get(sourceNode);
+                       TaskConfig sourceVertexConfig;
+
+                       if (sourceVertex == null) {
+                               // this predecessor is chained to another task 
or an iteration
+                               final TaskInChain chainedTask;
+                               final IterationDescriptor iteration;
+                               if ((chainedTask = 
this.chainedTasks.get(sourceNode)) != null) {
+                                       // push chained task
+                                       if (chainedTask.getContainingVertex() 
== null) {
+                                               throw new 
IllegalStateException("Bug: Chained task has not been assigned its containing 
vertex when connecting.");
+                                       }
+                                       sourceVertex = 
chainedTask.getContainingVertex();
+                                       sourceVertexConfig = 
chainedTask.getTaskConfig();
+                               } else if ((iteration = 
this.iterations.get(sourceNode)) != null) {
+                                       // predecessor is an iteration
+                                       sourceVertex = iteration.getHeadTask();
+                                       sourceVertexConfig = 
iteration.getHeadFinalResultConfig();
+                               } else {
+                                       throw new CompilerException("Bug: Could 
not resolve source node for a channel.");
+                               }
+                       } else {
+                               // predecessor is its own vertex
+                               sourceVertexConfig = new 
TaskConfig(sourceVertex.getConfiguration());
+                       }
+                       DistributionPattern pattern = connectJobVertices(
+                               inConn, inputIndex, sourceVertex, 
sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
+                       
+                       // accounting on channels and senders
+                       numChannelsTotal++;
+                       if (inConn.isOnDynamicPath()) {
+                               numChannelsDynamicPath++;
+                               numDynamicSenderTasksTotal += 
getNumberOfSendersPerReceiver(pattern,
+                                       sourceVertex.getParallelism(), 
targetVertex.getParallelism());
+                       }
+               }
+               
+               // for the iterations, check that the number of dynamic 
channels is the same as the number
+               // of channels for this logical input. this condition is 
violated at the moment, if there
+               // is a union between nodes on the static and nodes on the 
dynamic path
+               if (numChannelsDynamicPath > 0 && numChannelsTotal != 
numChannelsDynamicPath) {
+                       throw new CompilerException("Error: It is currently not 
supported to union between dynamic and static path in an iteration.");
+               }
+               if (numDynamicSenderTasksTotal > 0) {
+                       if (isBroadcast) {
+                               
targetVertexConfig.setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex,
 numDynamicSenderTasksTotal);
+                       } else {
+                               
targetVertexConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex, 
numDynamicSenderTasksTotal);
+                       }
+               }
+               
+               // the local strategy is added only once. in non-union case 
that is the actual edge,
+               // in the union case, it is the edge between union and the 
target node
+               addLocalInfoFromChannelToConfig(input, targetVertexConfig, 
inputIndex, isBroadcast);
+               return 1;
+       }
+       
+       private int getNumberOfSendersPerReceiver(DistributionPattern pattern, 
int numSenders, int numReceivers) {
+               if (pattern == DistributionPattern.ALL_TO_ALL) {
+                       return numSenders;
+               } else if (pattern == DistributionPattern.POINTWISE) {
+                       if (numSenders != numReceivers) {
+                               if (numReceivers == 1) {
+                                       return numSenders;
+                               }
+                               else if (numSenders == 1) {
+                                       return 1;
+                               }
+                               else {
+                                       throw new CompilerException("Error: A 
changing degree of parallelism is currently " +
+                                                       "not supported between 
tasks within an iteration.");
+                               }
+                       } else {
+                               return 1;
+                       }
+               } else {
+                       throw new CompilerException("Unknown distribution 
pattern for channels: " + pattern);
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       // Methods for creating individual vertices
+       // 
------------------------------------------------------------------------
+       
+       private AbstractJobVertex createSingleInputVertex(SingleInputPlanNode 
node) throws CompilerException {
+               final String taskName = node.getNodeName();
+               final DriverStrategy ds = node.getDriverStrategy();
+               
+               // check, whether chaining is possible
+               boolean chaining = false;
+               {
+                       Channel inConn = node.getInput();
+                       PlanNode pred = inConn.getSource();
+                       chaining = ds.getPushChainDriverClass() != null &&
+                                       !(pred instanceof NAryUnionPlanNode) && 
// first op after union is stand-alone, because union is merged
+                                       !(pred instanceof 
BulkPartialSolutionPlanNode) &&       // partial solution merges anyways
+                                       !(pred instanceof WorksetPlanNode) &&   
// workset merges anyways
+                                       !(pred instanceof IterationPlanNode) && 
// cannot chain with iteration heads currently
+                                       inConn.getShipStrategy() == 
ShipStrategyType.FORWARD &&
+                                       inConn.getLocalStrategy() == 
LocalStrategy.NONE &&
+                                       pred.getOutgoingChannels().size() == 1 
&&
+                                       node.getParallelism() == 
pred.getParallelism() &&
+                                       node.getBroadcastInputs().isEmpty();
+                       
+                       // cannot chain the nodes that produce the next workset 
or the next solution set, if they are not the
+                       // in a tail 
+                       if (this.currentIteration != null && 
this.currentIteration instanceof WorksetIterationPlanNode &&
+                                       node.getOutgoingChannels().size() > 0)
+                       {
+                               WorksetIterationPlanNode wspn = 
(WorksetIterationPlanNode) this.currentIteration;
+                               if (wspn.getSolutionSetDeltaPlanNode() == pred 
|| wspn.getNextWorkSetPlanNode() == pred) {
+                                       chaining = false;
+                               }
+                       }
+                       // cannot chain the nodes that produce the next workset 
in a bulk iteration if a termination criterion follows
+                       if (this.currentIteration != null && 
this.currentIteration instanceof BulkIterationPlanNode)
+                       {
+                               BulkIterationPlanNode wspn = 
(BulkIterationPlanNode) this.currentIteration;
+                               if (node == 
wspn.getRootOfTerminationCriterion() && wspn.getRootOfStepFunction() == pred){
+                                       chaining = false;
+                               }else if(node.getOutgoingChannels().size() > 0 
&&(wspn.getRootOfStepFunction() == pred ||
+                                               
wspn.getRootOfTerminationCriterion() == pred)) {
+                                       chaining = false;
+                               }
+                       }
+               }
+               
+               final AbstractJobVertex vertex;
+               final TaskConfig config;
+               
+               if (chaining) {
+                       vertex = null;
+                       config = new TaskConfig(new Configuration());
+                       this.chainedTasks.put(node, new 
TaskInChain(ds.getPushChainDriverClass(), config, taskName));
+               } else {
+                       // create task vertex
+                       vertex = new AbstractJobVertex(taskName);
+                       vertex.setInvokableClass((this.currentIteration != null 
&& node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : 
RegularPactTask.class);
+                       
+                       config = new TaskConfig(vertex.getConfiguration());
+                       config.setDriver(ds.getDriverClass());
+               }
+               
+               // set user code
+               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+               
config.setStubParameters(node.getProgramOperator().getParameters());
+               
+               // set the driver strategy
+               config.setDriverStrategy(ds);
+               for(int i=0;i<ds.getNumRequiredComparators();i++) {
+                       config.setDriverComparator(node.getComparator(i), i);
+               }
+               // assign memory, file-handles, etc.
+               assignDriverResources(node, config);
+               return vertex;
+       }
+
+       private AbstractJobVertex createDualInputVertex(DualInputPlanNode node) 
throws CompilerException {
+               final String taskName = node.getNodeName();
+               final DriverStrategy ds = node.getDriverStrategy();
+               final AbstractJobVertex vertex = new 
AbstractJobVertex(taskName);
+               final TaskConfig config = new 
TaskConfig(vertex.getConfiguration());
+               vertex.setInvokableClass( (this.currentIteration != null && 
node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : 
RegularPactTask.class);
+               
+               // set user code
+               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+               
config.setStubParameters(node.getProgramOperator().getParameters());
+               
+               // set the driver strategy
+               config.setDriver(ds.getDriverClass());
+               config.setDriverStrategy(ds);
+               if (node.getComparator1() != null) {
+                       config.setDriverComparator(node.getComparator1(), 0);
+               }
+               if (node.getComparator2() != null) {
+                       config.setDriverComparator(node.getComparator2(), 1);
+               }
+               if (node.getPairComparator() != null) {
+                       
config.setDriverPairComparator(node.getPairComparator());
+               }
+               
+               // assign memory, file-handles, etc.
+               assignDriverResources(node, config);
+               return vertex;
+       }
+
+       private InputFormatVertex createDataSourceVertex(SourcePlanNode node) 
throws CompilerException {
+               final InputFormatVertex vertex = new 
InputFormatVertex(node.getNodeName());
+               final TaskConfig config = new 
TaskConfig(vertex.getConfiguration());
+
+               vertex.setInvokableClass(DataSourceTask.class);
+               
vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
+
+               // set user code
+               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+               
config.setStubParameters(node.getProgramOperator().getParameters());
+
+               config.setOutputSerializer(node.getSerializer());
+               return vertex;
+       }
+
+       private AbstractJobVertex createDataSinkVertex(SinkPlanNode node) 
throws CompilerException {
+               final OutputFormatVertex vertex = new 
OutputFormatVertex(node.getNodeName());
+               final TaskConfig config = new 
TaskConfig(vertex.getConfiguration());
+
+               vertex.setInvokableClass(DataSinkTask.class);
+               
vertex.setFormatDescription(getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
+               
+               // set user code
+               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
+               
config.setStubParameters(node.getProgramOperator().getParameters());
+
+               return vertex;
+       }
+       
+       private AbstractJobVertex 
createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
+               // get the bulk iteration that corresponds to this partial 
solution node
+               final BulkIterationPlanNode iteration = 
pspn.getContainingIterationNode();
+               
+               // check whether we need an individual vertex for the partial 
solution, or whether we
+               // attach ourselves to the vertex of the parent node. We can 
combine the head with a node of 
+               // the step function, if
+               // 1) There is one parent that the partial solution connects to 
via a forward pattern and no
+               //    local strategy
+               // 2) DOP and the number of subtasks per instance does not 
change
+               // 3) That successor is not a union
+               // 4) That successor is not itself the last node of the step 
function
+               // 5) There is no local strategy on the edge for the initial 
partial solution, as
+               //    this translates to a local strategy that would only be 
executed in the first iteration
+               
+               final boolean merge;
+               if (mergeIterationAuxTasks && pspn.getOutgoingChannels().size() 
== 1) {
+                       final Channel c = pspn.getOutgoingChannels().get(0);
+                       final PlanNode successor = c.getTarget();
+                       merge = c.getShipStrategy() == ShipStrategyType.FORWARD 
&&
+                                       c.getLocalStrategy() == 
LocalStrategy.NONE &&
+                                       c.getTempMode() == TempMode.NONE &&
+                                       successor.getParallelism() == 
pspn.getParallelism() &&
+                                       !(successor instanceof 
NAryUnionPlanNode) &&
+                                       successor != 
iteration.getRootOfStepFunction() &&
+                                       iteration.getInput().getLocalStrategy() 
== LocalStrategy.NONE;
+               } else {
+                       merge = false;
+               }
+               
+               // create or adopt the head vertex
+               final AbstractJobVertex toReturn;
+               final AbstractJobVertex headVertex;
+               final TaskConfig headConfig;
+               if (merge) {
+                       final PlanNode successor = 
pspn.getOutgoingChannels().get(0).getTarget();
+                       headVertex = (AbstractJobVertex) 
this.vertices.get(successor);
+                       
+                       if (headVertex == null) {
+                               throw new CompilerException(
+                                       "Bug: Trying to merge solution set with 
its sucessor, but successor has not been created.");
+                       }
+                       
+                       // reset the vertex type to iteration head
+                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headConfig = new 
TaskConfig(headVertex.getConfiguration());
+                       toReturn = null;
+               } else {
+                       // instantiate the head vertex and give it a no-op 
driver as the driver strategy.
+                       // everything else happens in the post visit, after the 
input (the initial partial solution)
+                       // is connected.
+                       headVertex = new AbstractJobVertex("PartialSolution 
("+iteration.getNodeName()+")");
+                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headConfig = new 
TaskConfig(headVertex.getConfiguration());
+                       headConfig.setDriver(NoOpDriver.class);
+                       toReturn = headVertex;
+               }
+               
+               // create the iteration descriptor and the iteration to it
+               IterationDescriptor descr = this.iterations.get(iteration);
+               if (descr == null) {
+                       throw new CompilerException("Bug: Iteration descriptor 
was not created at when translating the iteration node.");
+               }
+               descr.setHeadTask(headVertex, headConfig);
+               
+               return toReturn;
+       }
+       
+       private AbstractJobVertex createWorksetIterationHead(WorksetPlanNode 
wspn) {
+               // get the bulk iteration that corresponds to this partial 
solution node
+               final WorksetIterationPlanNode iteration = 
wspn.getContainingIterationNode();
+               
+               // check whether we need an individual vertex for the partial 
solution, or whether we
+               // attach ourselves to the vertex of the parent node. We can 
combine the head with a node of 
+               // the step function, if
+               // 1) There is one parent that the partial solution connects to 
via a forward pattern and no
+               //    local strategy
+               // 2) DOP and the number of subtasks per instance does not 
change
+               // 3) That successor is not a union
+               // 4) That successor is not itself the last node of the step 
function
+               // 5) There is no local strategy on the edge for the initial 
workset, as
+               //    this translates to a local strategy that would only be 
executed in the first superstep
+               
+               final boolean merge;
+               if (mergeIterationAuxTasks && wspn.getOutgoingChannels().size() 
== 1) {
+                       final Channel c = wspn.getOutgoingChannels().get(0);
+                       final PlanNode successor = c.getTarget();
+                       merge = c.getShipStrategy() == ShipStrategyType.FORWARD 
&&
+                                       c.getLocalStrategy() == 
LocalStrategy.NONE &&
+                                       c.getTempMode() == TempMode.NONE &&
+                                       successor.getParallelism() == 
wspn.getParallelism() &&
+                                       !(successor instanceof 
NAryUnionPlanNode) &&
+                                       successor != 
iteration.getNextWorkSetPlanNode() &&
+                                       
iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE;
+               } else {
+                       merge = false;
+               }
+               
+               // create or adopt the head vertex
+               final AbstractJobVertex toReturn;
+               final AbstractJobVertex headVertex;
+               final TaskConfig headConfig;
+               if (merge) {
+                       final PlanNode successor = 
wspn.getOutgoingChannels().get(0).getTarget();
+                       headVertex = (AbstractJobVertex) 
this.vertices.get(successor);
+                       
+                       if (headVertex == null) {
+                               throw new CompilerException(
+                                       "Bug: Trying to merge solution set with 
its sucessor, but successor has not been created.");
+                       }
+                       
+                       // reset the vertex type to iteration head
+                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headConfig = new 
TaskConfig(headVertex.getConfiguration());
+                       toReturn = null;
+               } else {
+                       // instantiate the head vertex and give it a no-op 
driver as the driver strategy.
+                       // everything else happens in the post visit, after the 
input (the initial partial solution)
+                       // is connected.
+                       headVertex = new 
AbstractJobVertex("IterationHead("+iteration.getNodeName()+")");
+                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headConfig = new 
TaskConfig(headVertex.getConfiguration());
+                       headConfig.setDriver(NoOpDriver.class);
+                       toReturn = headVertex;
+               }
+               
+               
headConfig.setSolutionSetUnmanaged(iteration.getIterationNode().getIterationContract().isSolutionSetUnManaged());
+               
+               // create the iteration descriptor and the iteration to it
+               IterationDescriptor descr = this.iterations.get(iteration);
+               if (descr == null) {
+                       throw new CompilerException("Bug: Iteration descriptor 
was not created at when translating the iteration node.");
+               }
+               descr.setHeadTask(headVertex, headConfig);
+               
+               return toReturn;
+       }
+       
+       private void assignDriverResources(PlanNode node, TaskConfig config) {
+               final double relativeMem = node.getRelativeMemoryPerSubTask();
+               if (relativeMem > 0) {
+                       config.setRelativeMemoryDriver(relativeMem);
+                       config.setFilehandlesDriver(this.defaultMaxFan);
+                       
config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
+               }
+       }
+       
+       private void assignLocalStrategyResources(Channel c, TaskConfig config, 
int inputNum) {
+               if (c.getRelativeMemoryLocalStrategy() > 0) {
+                       config.setRelativeMemoryInput(inputNum, 
c.getRelativeMemoryLocalStrategy());
+                       config.setFilehandlesInput(inputNum, 
this.defaultMaxFan);
+                       config.setSpillingThresholdInput(inputNum, 
this.defaultSortSpillingThreshold);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Connecting Vertices
+       // 
------------------------------------------------------------------------
+
+       /**
+        * NOTE: The channel for global and local strategies are different if 
we connect a union. The global strategy
+        * channel is then the channel into the union node, the local strategy 
channel the one from the union to the
+        * actual target operator.
+        *
+        * @param channel
+        * @param inputNumber
+        * @param sourceVertex
+        * @param sourceConfig
+        * @param targetVertex
+        * @param targetConfig
+        * @param isBroadcast
+        * @throws CompilerException
+        */
+       private DistributionPattern connectJobVertices(Channel channel, int 
inputNumber,
+                       final AbstractJobVertex sourceVertex, final TaskConfig 
sourceConfig,
+                       final AbstractJobVertex targetVertex, final TaskConfig 
targetConfig, boolean isBroadcast)
+       throws CompilerException
+       {
+               // ------------ connect the vertices to the job graph 
--------------
+               final DistributionPattern distributionPattern;
+
+               switch (channel.getShipStrategy()) {
+                       case FORWARD:
+                               distributionPattern = 
DistributionPattern.POINTWISE;
+                               break;
+                       case PARTITION_RANDOM:
+                       case BROADCAST:
+                       case PARTITION_HASH:
+                       case PARTITION_CUSTOM:
+                       case PARTITION_RANGE:
+                       case PARTITION_FORCED_REBALANCE:
+                               distributionPattern = 
DistributionPattern.ALL_TO_ALL;
+                               break;
+                       default:
+                               throw new RuntimeException("Unknown runtime 
ship strategy: " + channel.getShipStrategy());
+               }
+
+               final ResultPartitionType resultType;
+
+               switch (channel.getDataExchangeMode()) {
+
+                       case PIPELINED:
+                               resultType = ResultPartitionType.PIPELINED;
+                               break;
+
+                       case BATCH:
+                               // BLOCKING results are currently not supported 
in closed loop iterations
+                               //
+                               // See 
https://issues.apache.org/jira/browse/FLINK-1713 for details
+                               resultType = 
channel.getSource().isOnDynamicPath()
+                                               ? ResultPartitionType.PIPELINED
+                                               : ResultPartitionType.BLOCKING;
+                               break;
+
+                       case PIPELINE_WITH_BATCH_FALLBACK:
+                               throw new UnsupportedOperationException("Data 
exchange mode " +
+                                               channel.getDataExchangeMode() + 
" currently not supported.");
+
+                       default:
+                               throw new 
UnsupportedOperationException("Unknown data exchange mode.");
+
+               }
+
+               targetVertex.connectNewDataSetAsInput(sourceVertex, 
distributionPattern, resultType);
+
+               // -------------- configure the source task's ship strategy 
strategies in task config --------------
+               final int outputIndex = sourceConfig.getNumOutputs();
+               sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
+               if (outputIndex == 0) {
+                       
sourceConfig.setOutputSerializer(channel.getSerializer());
+               }
+               if (channel.getShipStrategyComparator() != null) {
+                       
sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), 
outputIndex);
+               }
+               
+               if (channel.getShipStrategy() == 
ShipStrategyType.PARTITION_RANGE) {
+                       
+                       final DataDistribution dataDistribution = 
channel.getDataDistribution();
+                       if (dataDistribution != null) {
+                               
sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
+                       } else {
+                               throw new RuntimeException("Range partitioning 
requires data distribution");
+                               // TODO: inject code and configuration for 
automatic histogram generation
+                       }
+               }
+               
+               if (channel.getShipStrategy() == 
ShipStrategyType.PARTITION_CUSTOM) {
+                       if (channel.getPartitioner() != null) {
+                               
sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex);
+                       } else {
+                               throw new CompilerException("The ship strategy 
was set to custom partitioning, but no partitioner was set.");
+                       }
+               }
+               
+               // ---------------- configure the receiver -------------------
+               if (isBroadcast) {
+                       targetConfig.addBroadcastInputToGroup(inputNumber);
+               } else {
+                       targetConfig.addInputToGroup(inputNumber);
+               }
+               return distributionPattern;
+       }
+       
+       private void addLocalInfoFromChannelToConfig(Channel channel, 
TaskConfig config, int inputNum, boolean isBroadcastChannel) {
+               // serializer
+               if (isBroadcastChannel) {
+                       
config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
+                       
+                       if (channel.getLocalStrategy() != LocalStrategy.NONE || 
(channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) {
+                               throw new CompilerException("Found local 
strategy or temp mode on a broadcast variable channel.");
+                       } else {
+                               return;
+                       }
+               } else {
+                       config.setInputSerializer(channel.getSerializer(), 
inputNum);
+               }
+               
+               // local strategy
+               if (channel.getLocalStrategy() != LocalStrategy.NONE) {
+                       config.setInputLocalStrategy(inputNum, 
channel.getLocalStrategy());
+                       if (channel.getLocalStrategyComparator() != null) {
+                               
config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
+                       }
+               }
+               
+               assignLocalStrategyResources(channel, config, inputNum);
+               
+               // materialization / caching
+               if (channel.getTempMode() != null) {
+                       final TempMode tm = channel.getTempMode();
+
+                       boolean needsMemory = false;
+                       // Don't add a pipeline breaker if the data exchange is 
already blocking.
+                       if (tm.breaksPipeline() && 
channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
+                               
config.setInputAsynchronouslyMaterialized(inputNum, true);
+                               needsMemory = true;
+                       }
+                       if (tm.isCached()) {
+                               config.setInputCached(inputNum, true);
+                               needsMemory = true;
+                       }
+                       
+                       if (needsMemory) {
+                               // sanity check
+                               if (tm == null || tm == TempMode.NONE || 
channel.getRelativeTempMemory() <= 0) {
+                                       throw new CompilerException("Bug in 
compiler: Inconsistent description of input materialization.");
+                               }
+                               
config.setRelativeInputMaterializationMemory(inputNum, 
channel.getRelativeTempMemory());
+                       }
+               }
+       }
+       
+       private void finalizeBulkIteration(IterationDescriptor descr) {
+               
+               final BulkIterationPlanNode bulkNode = (BulkIterationPlanNode) 
descr.getIterationNode();
+               final AbstractJobVertex headVertex = descr.getHeadTask();
+               final TaskConfig headConfig = new 
TaskConfig(headVertex.getConfiguration());
+               final TaskConfig headFinalOutputConfig = 
descr.getHeadFinalResultConfig();
+               
+               // ------------ finalize the head config with the final outputs 
and the sync gate ------------
+               final int numStepFunctionOuts = headConfig.getNumOutputs();
+               final int numFinalOuts = headFinalOutputConfig.getNumOutputs();
+               
+               if (numStepFunctionOuts == 0) {
+                       throw new CompilerException("The iteration has no 
operation inside the step function.");
+               }
+               
+               
headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
+               
headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + 
numFinalOuts);
+               final double relativeMemForBackChannel = 
bulkNode.getRelativeMemoryPerSubTask();
+               if (relativeMemForBackChannel <= 0) {
+                       throw new CompilerException("Bug: No memory has been 
assigned to the iteration back channel.");
+               }
+               
headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
+               
+               // --------------------------- create the sync task 
---------------------------
+               final AbstractJobVertex sync = new AbstractJobVertex("Sync(" + 
bulkNode.getNodeName() + ")");
+               sync.setInvokableClass(IterationSynchronizationSinkTask.class);
+               sync.setParallelism(1);
+               this.auxVertices.add(sync);
+               
+               final TaskConfig syncConfig = new 
TaskConfig(sync.getConfiguration());
+               syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 
headVertex.getParallelism());
+
+               // set the number of iteration / convergence criterion for the 
sync
+               final int maxNumIterations = 
bulkNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
+               if (maxNumIterations < 1) {
+                       throw new CompilerException("Cannot create bulk 
iteration with unspecified maximum number of iterations.");
+               }
+               syncConfig.setNumberOfIterations(maxNumIterations);
+               
+               // connect the sync task
+               sync.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
+               
+               // ----------------------------- create the iteration tail 
------------------------------
+               
+               final PlanNode rootOfTerminationCriterion = 
bulkNode.getRootOfTerminationCriterion();
+               final PlanNode rootOfStepFunction = 
bulkNode.getRootOfStepFunction();
+               final TaskConfig tailConfig;
+               
+               AbstractJobVertex rootOfStepFunctionVertex = 
(AbstractJobVertex) this.vertices.get(rootOfStepFunction);
+               if (rootOfStepFunctionVertex == null) {
+                       // last op is chained
+                       final TaskInChain taskInChain = 
this.chainedTasks.get(rootOfStepFunction);
+                       if (taskInChain == null) {
+                               throw new CompilerException("Bug: Tail of step 
function not found as vertex or chained task.");
+                       }
+                       rootOfStepFunctionVertex = (AbstractJobVertex) 
taskInChain.getContainingVertex();
+
+                       // the fake channel is statically typed to pact record. 
no data is sent over this channel anyways.
+                       tailConfig = taskInChain.getTaskConfig();
+               } else {
+                       tailConfig = new 
TaskConfig(rootOfStepFunctionVertex.getConfiguration());
+               }
+               
+               tailConfig.setIsWorksetUpdate();
+               
+               // No following termination criterion
+               if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
+                       
+                       
rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
+                       
+                       
tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
+               }
+               
+               
+               // create the fake output task for termination criterion, if 
needed
+               final TaskConfig tailConfigOfTerminationCriterion;
+               // If we have a termination criterion and it is not an 
intermediate node
+               if(rootOfTerminationCriterion != null && 
rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
+                       AbstractJobVertex rootOfTerminationCriterionVertex = 
(AbstractJobVertex) this.vertices.get(rootOfTerminationCriterion);
+                       
+                       
+                       if (rootOfTerminationCriterionVertex == null) {
+                               // last op is chained
+                               final TaskInChain taskInChain = 
this.chainedTasks.get(rootOfTerminationCriterion);
+                               if (taskInChain == null) {
+                                       throw new CompilerException("Bug: Tail 
of termination criterion not found as vertex or chained task.");
+                               }
+                               rootOfTerminationCriterionVertex = 
(AbstractJobVertex) taskInChain.getContainingVertex();
+
+                               // the fake channel is statically typed to pact 
record. no data is sent over this channel anyways.
+                               tailConfigOfTerminationCriterion = 
taskInChain.getTaskConfig();
+                       } else {
+                               tailConfigOfTerminationCriterion = new 
TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
+                       }
+                       
+                       
rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
+                       // Hack
+                       
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
+                       
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
+                       
+                       // tell the head that it needs to wait for the solution 
set updates
+                       headConfig.setWaitForSolutionSetUpdate();
+               }
+               
+               // ------------------- register the aggregators 
-------------------
+               AggregatorRegistry aggs = 
bulkNode.getIterationNode().getIterationContract().getAggregators();
+               Collection<AggregatorWithName<?>> allAggregators = 
aggs.getAllRegisteredAggregators();
+               
+               headConfig.addIterationAggregators(allAggregators);
+               syncConfig.addIterationAggregators(allAggregators);
+               
+               String convAggName = 
aggs.getConvergenceCriterionAggregatorName();
+               ConvergenceCriterion<?> convCriterion = 
aggs.getConvergenceCriterion();
+               
+               if (convCriterion != null || convAggName != null) {
+                       if (convCriterion == null) {
+                               throw new CompilerException("Error: Convergence 
criterion aggregator set, but criterion is null.");
+                       }
+                       if (convAggName == null) {
+                               throw new CompilerException("Error: Aggregator 
convergence criterion set, but aggregator is null.");
+                       }
+                       
+                       syncConfig.setConvergenceCriterion(convAggName, 
convCriterion);
+               }
+       }
+       
+       private void finalizeWorksetIteration(IterationDescriptor descr) {
+               final WorksetIterationPlanNode iterNode = 
(WorksetIterationPlanNode) descr.getIterationNode();
+               final AbstractJobVertex headVertex = descr.getHeadTask();
+               final TaskConfig headConfig = new 
TaskConfig(headVertex.getConfiguration());
+               final TaskConfig headFinalOutputConfig = 
descr.getHeadFinalResultConfig();
+               
+               // ------------ finalize the head config with the final outputs 
and the sync gate ------------
+               {
+                       final int numStepFunctionOuts = 
headConfig.getNumOutputs();
+                       final int numFinalOuts = 
headFinalOutputConfig.getNumOutputs();
+                       
+                       if (numStepFunctionOuts == 0) {
+                               throw new CompilerException("The workset 
iteration has no operation on the workset inside the step function.");
+                       }
+                       
+                       
headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
+                       
headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + 
numFinalOuts);
+                       final double relativeMemory = 
iterNode.getRelativeMemoryPerSubTask();
+                       if (relativeMemory <= 0) {
+                               throw new CompilerException("Bug: No memory has 
been assigned to the workset iteration.");
+                       }
+                       
+                       headConfig.setIsWorksetIteration();
+                       headConfig.setRelativeBackChannelMemory(relativeMemory 
/ 2);
+                       headConfig.setRelativeSolutionSetMemory(relativeMemory 
/ 2);
+                       
+                       // set the solution set serializer and comparator
+                       
headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer());
+                       
headConfig.setSolutionSetComparator(iterNode.getSolutionSetComparator());
+               }
+               
+               // --------------------------- create the sync task 
---------------------------
+               final TaskConfig syncConfig;
+               {
+                       final AbstractJobVertex sync = new 
AbstractJobVertex("Sync (" + iterNode.getNodeName() + ")");
+                       
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
+                       sync.setParallelism(1);
+                       this.auxVertices.add(sync);
+                       
+                       syncConfig = new TaskConfig(sync.getConfiguration());
+                       
syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 
headVertex.getParallelism());
+       
+                       // set the number of iteration / convergence criterion 
for the sync
+                       final int maxNumIterations = 
iterNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
+                       if (maxNumIterations < 1) {
+                               throw new CompilerException("Cannot create 
workset iteration with unspecified maximum number of iterations.");
+                       }
+                       syncConfig.setNumberOfIterations(maxNumIterations);
+                       
+                       // connect the sync task
+                       sync.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
+               }
+               
+               // ----------------------------- create the iteration tails 
-----------------------------
+               // ----------------------- for next workset and solution set 
delta-----------------------
+
+               {
+                       // we have three possible cases:
+                       // 1) Two tails, one for workset update, one for 
solution set update
+                       // 2) One tail for workset update, solution set update 
happens in an intermediate task
+                       // 3) One tail for solution set update, workset update 
happens in an intermediate task
+                       
+                       final PlanNode nextWorksetNode = 
iterNode.getNextWorkSetPlanNode();
+                       final PlanNode solutionDeltaNode = 
iterNode.getSolutionSetDeltaPlanNode();
+                       
+                       final boolean hasWorksetTail = 
nextWorksetNode.getOutgoingChannels().isEmpty();
+                       final boolean hasSolutionSetTail = 
(!iterNode.isImmediateSolutionSetUpdate()) || (!hasWorksetTail);
+                       
+                       {
+                               // get the vertex for the workset update
+                               final TaskConfig worksetTailConfig;
+                               AbstractJobVertex nextWorksetVertex = 
(AbstractJobVertex) this.vertices.get(nextWorksetNode);
+                               if (nextWorksetVertex == null) {
+                                       // nextWorksetVertex is chained
+                                       TaskInChain taskInChain = 
this.chainedTasks.get(nextWorksetNode);
+                                       if (taskInChain == null) {
+                                               throw new 
CompilerException("Bug: Next workset node not found as vertex or chained 
task.");
+                                       }
+                                       nextWorksetVertex = (AbstractJobVertex) 
taskInChain.getContainingVertex();
+                                       worksetTailConfig = 
taskInChain.getTaskConfig();
+                               } else {
+                                       worksetTailConfig = new 
TaskConfig(nextWorksetVertex.getConfiguration());
+                               }
+                               
+                               // mark the node to perform workset updates
+                               worksetTailConfig.setIsWorksetIteration();
+                               worksetTailConfig.setIsWorksetUpdate();
+                               
+                               if (hasWorksetTail) {
+                                       
nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
+                                       
+                                       
worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
+                               }
+                       }
+                       {
+                               final TaskConfig solutionDeltaConfig;
+                               AbstractJobVertex solutionDeltaVertex = 
(AbstractJobVertex) this.vertices.get(solutionDeltaNode);
+                               if (solutionDeltaVertex == null) {
+                                       // last op is chained
+                                       TaskInChain taskInChain = 
this.chainedTasks.get(solutionDeltaNode);
+                                       if (taskInChain == null) {
+                                               throw new 
CompilerException("Bug: Solution Set Delta not found as vertex or chained 
task.");
+                                       }
+                                       solutionDeltaVertex = 
(AbstractJobVertex) taskInChain.getContainingVertex();
+                                       solutionDeltaConfig = 
taskInChain.getTaskConfig();
+                               } else {
+                                       solutionDeltaConfig = new 
TaskConfig(solutionDeltaVertex.getConfiguration());
+                               }
+                               
+                               solutionDeltaConfig.setIsWorksetIteration();
+                               solutionDeltaConfig.setIsSolutionSetUpdate();
+                               
+                               if (hasSolutionSetTail) {
+                                       
solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
+                                       
+                                       
solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
+                                       
+                                       // tell the head that it needs to wait 
for the solution set updates
+                                       
headConfig.setWaitForSolutionSetUpdate();
+                               }
+                               else {
+                                       // no tail, intermediate update. must 
be immediate update
+                                       if 
(!iterNode.isImmediateSolutionSetUpdate()) {
+                                               throw new CompilerException("A 
solution set update without dedicated tail is not set to perform immediate 
updates.");
+                                       }
+                                       
solutionDeltaConfig.setIsSolutionSetUpdateWithoutReprobe();
+                               }
+                       }
+               }
+               
+               // ------------------- register the aggregators 
-------------------
+               AggregatorRegistry aggs = 
iterNode.getIterationNode().getIterationContract().getAggregators();
+               Collection<AggregatorWithName<?>> allAggregators = 
aggs.getAllRegisteredAggregators();
+               
+               for (AggregatorWithName<?> agg : allAggregators) {
+                       if 
(agg.getName().equals(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME)) {
+                               throw new CompilerException("User defined 
aggregator used the same name as built-in workset " +
+                                               "termination check aggregator: 
" + WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
+                       }
+               }
+               
+               headConfig.addIterationAggregators(allAggregators);
+               syncConfig.addIterationAggregators(allAggregators);
+               
+               String convAggName = 
aggs.getConvergenceCriterionAggregatorName();
+               ConvergenceCriterion<?> convCriterion = 
aggs.getConvergenceCriterion();
+               
+               if (convCriterion != null || convAggName != null) {
+                       throw new CompilerException("Error: Cannot use custom 
convergence criterion with workset iteration. Workset iterations have implicit 
convergence criterion where workset is empty.");
+               }
+               
+               
headConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
 new LongSumAggregator());
+               
syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
 new LongSumAggregator());
+               
syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
 new WorksetEmptyConvergenceCriterion());
+       }
+       
+       private static String getDescriptionForUserCode(UserCodeWrapper<?> 
wrapper) {
+               try {
+                       if (wrapper.hasObject()) {
+                               try {
+                                       return 
wrapper.getUserCodeObject().toString();
+                               }
+                               catch (Throwable t) {
+                                       return 
wrapper.getUserCodeClass().getName();
+                               }
+                       }
+                       else {
+                               return wrapper.getUserCodeClass().getName();
+                       }
+               }
+               catch (Throwable t) {
+                       return null;
+               }
+       }
+
+       // 
-------------------------------------------------------------------------------------
+       // Descriptors for tasks / configurations that are chained or merged 
with other tasks
+       // 
-------------------------------------------------------------------------------------
+       
+       /**
+        * Utility class that describes a task in a sequence of chained tasks. 
Chained tasks are tasks that run
+        * together in one thread.
+        */
+       private static final class TaskInChain {
+               
+               private final Class<? extends ChainedDriver<?, ?>> chainedTask;
+               
+               private final TaskConfig taskConfig;
+               
+               private final String taskName;
+               
+               private AbstractJobVertex containingVertex;
+
+               TaskInChain(Class<? extends ChainedDriver<?, ?>> chainedTask, 
TaskConfig taskConfig,
+                                       String taskName) {
+                       this.chainedTask = chainedTask;
+                       this.taskConfig = taskConfig;
+                       this.taskName = taskName;
+               }
+               
+               public Class<? extends ChainedDriver<?, ?>> getChainedTask() {
+                       return this.chainedTask;
+               }
+               
+               public TaskConfig getTaskConfig() {
+                       return this.taskConfig;
+               }
+               
+               public String getTaskName() {
+                       return this.taskName;
+               }
+               
+               public AbstractJobVertex getContainingVertex() {
+                       return this.containingVertex;
+               }
+               
+               public void setContainingVertex(AbstractJobVertex 
containingVertex) {
+                       this.containingVertex = containingVertex;
+               }
+       }
+       
+       private static final class IterationDescriptor {
+               
+               private final IterationPlanNode iterationNode;
+               
+               private AbstractJobVertex headTask;
+               
+               private TaskConfig headConfig;
+               
+               private TaskConfig  headFinalResultConfig;
+               
+               private final int id;
+
+               public IterationDescriptor(IterationPlanNode iterationNode, int 
id) {
+                       this.iterationNode = iterationNode;
+                       this.id = id;
+               }
+               
+               public IterationPlanNode getIterationNode() {
+                       return iterationNode;
+               }
+               
+               public void setHeadTask(AbstractJobVertex headTask, TaskConfig 
headConfig) {
+                       this.headTask = headTask;
+                       this.headFinalResultConfig = new TaskConfig(new 
Configuration());
+                       
+                       // check if we already had a configuration, for example 
if the solution set was 
+                       if (this.headConfig != null) {
+                               
headConfig.getConfiguration().addAll(this.headConfig.getConfiguration());
+                       }
+                       
+                       this.headConfig = headConfig;
+               }
+               
+               public AbstractJobVertex getHeadTask() {
+                       return headTask;
+               }
+               
+               public TaskConfig getHeadFinalResultConfig() {
+                       return headFinalResultConfig;
+               }
+               
+               public int getId() {
+                       return this.id;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java
new file mode 100644
index 0000000..f2b736c
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/AbstractSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.postpass;
+
+import java.util.Map;
+
+public abstract class AbstractSchema<X> implements Iterable<Map.Entry<Integer, 
X>> {
+
+       private int numConnectionsThatContributed;
+       
+       
+       public int getNumConnectionsThatContributed() {
+               return this.numConnectionsThatContributed;
+       }
+       
+       public void increaseNumConnectionsThatContributed() {
+               this.numConnectionsThatContributed++;
+       }
+       
+       public abstract void addType(int pos, X type) throws 
ConflictingFieldTypeInfoException;
+       
+       public abstract X getType(int field);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java
new file mode 100644
index 0000000..56d914c
--- /dev/null
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/ConflictingFieldTypeInfoException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.postpass;
+
+public final class ConflictingFieldTypeInfoException extends Exception {
+
+       private static final long serialVersionUID = 3991352502693288321L;
+
+       private final int fieldNumber;
+       
+       private final Object previousType, newType;
+
+       
+       public ConflictingFieldTypeInfoException(int fieldNumber, Object 
previousType, Object newType) {
+               super("Conflicting type info for field " + fieldNumber + ": 
Old='" + previousType + "', new='" + newType + "'.");
+               this.fieldNumber = fieldNumber;
+               this.previousType = previousType;
+               this.newType = newType;
+       }
+       
+       
+       public int getFieldNumber() {
+               return fieldNumber;
+       }
+
+       public Object getPreviousType() {
+               return this.previousType;
+       }
+
+       public Object getNewType() {
+               return this.newType;
+       }
+}

Reply via email to