http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index c6a872c..988e903 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -55,14 +55,14 @@ import java.util.List;
  * @param <IN> The data type consumed by the combiner.
  * @param <OUT> The data type produced by the combiner.
  */
-public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
+public class GroupReduceCombineDriver<IN, OUT> implements 
Driver<GroupCombineFunction<IN, OUT>, OUT> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(GroupReduceCombineDriver.class);
 
        /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
        private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
 
-       private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
+       private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
 
        private InMemorySorter<IN> sorter;
 
@@ -87,7 +87,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements 
PactDriver<GroupCombin
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> 
context) {
+       public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> 
context) {
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 59fb603..a03e42d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -40,11 +40,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class GroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction<IT, OT>, OT> {
+public class GroupReduceDriver<IT, OT> implements 
Driver<GroupReduceFunction<IT, OT>, OT> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(GroupReduceDriver.class);
 
-       private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
+       private TaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
        
        private MutableObjectIterator<IT> input;
 
@@ -59,7 +59,7 @@ public class GroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> 
context) {
+       public void setup(TaskContext<GroupReduceFunction<IT, OT>, OT> context) 
{
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index 811f00c..7a9c8e6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -46,11 +46,11 @@ import org.slf4j.LoggerFactory;
  * 
  * @see org.apache.flink.api.common.functions.FlatJoinFunction
  */
-public class JoinDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, 
IT2, OT>, OT> {
        
        protected static final Logger LOG = 
LoggerFactory.getLogger(JoinDriver.class);
        
-       protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
taskContext;
+       protected TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
        
        private volatile JoinTaskIterator<IT1, IT2, OT> joinIterator; // the 
iterator that does the actual join 
        
@@ -59,7 +59,7 @@ public class JoinDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT1
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
+       public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index fe926cb..51f9197 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -27,15 +27,15 @@ import 
org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
+import org.apache.flink.runtime.iterative.task.AbstractIterativeTask;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements 
ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements 
ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
        
-       private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+       private TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
        
        private CompactingHashTable<IT1> hashTable;
        
@@ -55,7 +55,7 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements ResettableP
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
+       public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -99,8 +99,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> 
implements ResettableP
                final TypeComparator<IT1> solutionSetComparator;
                
                // grab a handle to the hash table from the iteration broker
-               if (taskContext instanceof AbstractIterativePactTask) {
-                       AbstractIterativePactTask<?, ?> iterativeTaskContext = 
(AbstractIterativePactTask<?, ?>) taskContext;
+               if (taskContext instanceof AbstractIterativeTask) {
+                       AbstractIterativeTask<?, ?> iterativeTaskContext = 
(AbstractIterativeTask<?, ?>) taskContext;
                        String identifier = iterativeTaskContext.brokerKey();
                        
                        Object table = 
SolutionSetBroker.instance().get(identifier);

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index 20079fc..e1fad47 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -27,15 +27,15 @@ import 
org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
+import org.apache.flink.runtime.iterative.task.AbstractIterativeTask;
 import org.apache.flink.runtime.operators.hash.CompactingHashTable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements 
ResettablePactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements 
ResettableDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
        
-       private PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
+       private TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
        
        private CompactingHashTable<IT2> hashTable;
        
@@ -55,7 +55,7 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resettable
        // 
--------------------------------------------------------------------------------------------
        
        @Override
-       public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
+       public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }
@@ -99,8 +99,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> 
implements Resettable
                final TypeComparator<IT2> solutionSetComparator;
                
                // grab a handle to the hash table from the iteration broker
-               if (taskContext instanceof AbstractIterativePactTask) {
-                       AbstractIterativePactTask<?, ?> iterativeTaskContext = 
(AbstractIterativePactTask<?, ?>) taskContext;
+               if (taskContext instanceof AbstractIterativeTask) {
+                       AbstractIterativeTask<?, ?> iterativeTaskContext = 
(AbstractIterativeTask<?, ?>) taskContext;
                        String identifier = iterativeTaskContext.brokerKey();
                        Object table = 
SolutionSetBroker.instance().get(identifier);
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index d861cbd..eefe8e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -36,9 +36,9 @@ import org.apache.flink.util.MutableObjectIterator;
  * @param <IT> The mapper's input data type.
  * @param <OT> The mapper's output data type.
  */
-public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
+public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {
        
-       private PactTaskContext<MapFunction<IT, OT>, OT> taskContext;
+       private TaskContext<MapFunction<IT, OT>, OT> taskContext;
        
        private volatile boolean running;
 
@@ -46,7 +46,7 @@ public class MapDriver<IT, OT> implements 
PactDriver<MapFunction<IT, OT>, OT> {
        
        
        @Override
-       public void setup(PactTaskContext<MapFunction<IT, OT>, OT> context) {
+       public void setup(TaskContext<MapFunction<IT, OT>, OT> context) {
                this.taskContext = context;
                this.running = true;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
index eaab904..8792ef1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -41,16 +41,16 @@ import org.slf4j.LoggerFactory;
  * @param <IT> The mapper's input data type.
  * @param <OT> The mapper's output data type.
  */
-public class MapPartitionDriver<IT, OT> implements 
PactDriver<MapPartitionFunction<IT, OT>, OT> {
+public class MapPartitionDriver<IT, OT> implements 
Driver<MapPartitionFunction<IT, OT>, OT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(MapPartitionDriver.class);
 
-       private PactTaskContext<MapPartitionFunction<IT, OT>, OT> taskContext;
+       private TaskContext<MapPartitionFunction<IT, OT>, OT> taskContext;
 
        private boolean objectReuseEnabled = false;
 
        @Override
-       public void setup(PactTaskContext<MapPartitionFunction<IT, OT>, OT> 
context) {
+       public void setup(TaskContext<MapPartitionFunction<IT, OT>, OT> 
context) {
                this.taskContext = context;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index 1fb4813..fcd2716 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -32,18 +32,18 @@ import org.slf4j.LoggerFactory;
  * 
  * @param <T> The data type.
  */
-public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> {
+public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(MapPartitionDriver.class);
 
-       private PactTaskContext<AbstractRichFunction, T> taskContext;
+       private TaskContext<AbstractRichFunction, T> taskContext;
        
        private volatile boolean running;
 
        private boolean objectReuseEnabled = false;
 
        @Override
-       public void setup(PactTaskContext<AbstractRichFunction, T> context) {
+       public void setup(TaskContext<AbstractRichFunction, T> context) {
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
deleted file mode 100644
index 288f7ca..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * The interface to be implemented by all drivers that run alone (or as the 
primary driver) in a task.
- * A driver implements the actual code to perform a batch operation, like 
<i>map()</i>,
- * <i>reduce()</i>, <i>join()</i>, or <i>coGroup()</i>.
- *
- * @see PactTaskContext
- * 
- * @param <S> The type of stub driven by this driver.
- * @param <OT> The data type of the records produced by this driver.
- */
-public interface PactDriver<S extends Function, OT> {
-       
-       void setup(PactTaskContext<S, OT> context);
-       
-       /**
-        * Gets the number of inputs that the task has.
-        * 
-        * @return The number of inputs.
-        */
-       int getNumberOfInputs();
-       
-       /**
-        * Gets the number of comparators required for this driver.
-        * 
-        * @return The number of comparators required for this driver.
-        */
-       int getNumberOfDriverComparators();
-       
-       /**
-        * Gets the class of the stub type that is run by this task. For 
example, a <tt>MapTask</tt> should return
-        * <code>MapFunction.class</code>.   
-        * 
-        * @return The class of the stub type run by the task.
-        */
-       Class<S> getStubType();
-       
-       /**
-        * This method is called before the user code is opened. An exception 
thrown by this method
-        * signals failure of the task.
-        * 
-        * @throws Exception Exceptions may be forwarded and signal task 
failure.
-        */
-       void prepare() throws Exception;
-       
-       /**
-        * The main operation method of the task. It should call the user code 
with the data subsets until
-        * the input is depleted.
-        * 
-        * @throws Exception Any exception thrown by this method signals task 
failure. Because exceptions in the user
-        *                   code typically signal situations where this 
instance in unable to proceed, exceptions
-        *                   from the user code should be forwarded.
-        */
-       void run() throws Exception;
-       
-       /**
-        * This method is invoked in any case (clean termination and exception) 
at the end of the tasks operation.
-        * 
-        * @throws Exception Exceptions may be forwarded.
-        */
-       void cleanup() throws Exception;
-       
-       /**
-        * This method is invoked when the driver must aborted in mid 
processing. It is invoked asynchronously by a different thread.
-        * 
-        * @throws Exception Exceptions may be forwarded.
-        */
-       void cancel() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
deleted file mode 100644
index baeda3a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * The task context gives a driver (e.g., {@link MapDriver}, or {@link 
JoinDriver}) access to
- * the runtime components and configuration that they can use to fulfil their 
task.
- *
- * @param <S> The UDF type.
- * @param <OT> The produced data type.
- *
- * @see PactDriver
- */
-public interface PactTaskContext<S, OT> {
-       
-       TaskConfig getTaskConfig();
-       
-       TaskManagerRuntimeInfo getTaskManagerInfo();
-
-       ClassLoader getUserCodeClassLoader();
-       
-       MemoryManager getMemoryManager();
-       
-       IOManager getIOManager();
-
-       <X> MutableObjectIterator<X> getInput(int index);
-       
-       <X> TypeSerializerFactory<X> getInputSerializer(int index);
-       
-       <X> TypeComparator<X> getDriverComparator(int index);
-       
-       S getStub();
-
-       ExecutionConfig getExecutionConfig();
-
-       Collector<OT> getOutputCollector();
-       
-       AbstractInvokable getOwningNepheleTask();
-       
-       String formatLogString(String message);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index f990156..c77e746 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @param <T> The data type consumed and produced by the combiner.
  */
-public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, 
T> {
+public class ReduceCombineDriver<T> implements Driver<ReduceFunction<T>, T> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
@@ -53,7 +53,7 @@ public class ReduceCombineDriver<T> implements 
PactDriver<ReduceFunction<T>, T>
        private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
        
        
-       private PactTaskContext<ReduceFunction<T>, T> taskContext;
+       private TaskContext<ReduceFunction<T>, T> taskContext;
 
        private TypeSerializer<T> serializer;
 
@@ -77,7 +77,7 @@ public class ReduceCombineDriver<T> implements 
PactDriver<ReduceFunction<T>, T>
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
+       public void setup(TaskContext<ReduceFunction<T>, T> context) {
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 8d15ef2..395beab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -39,11 +39,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.common.functions.ReduceFunction
  */
-public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
+public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(ReduceDriver.class);
 
-       private PactTaskContext<ReduceFunction<T>, T> taskContext;
+       private TaskContext<ReduceFunction<T>, T> taskContext;
        
        private MutableObjectIterator<T> input;
 
@@ -58,7 +58,7 @@ public class ReduceDriver<T> implements 
PactDriver<ReduceFunction<T>, T> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
+       public void setup(TaskContext<ReduceFunction<T>, T> context) {
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
deleted file mode 100644
index 89963af..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ /dev/null
@@ -1,1499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.functions.util.FunctionUtils;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.broadcast.BroadcastVariableMaterialization;
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.chaining.ChainedDriver;
-import 
org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
-import 
org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
-import org.apache.flink.runtime.operators.shipping.OutputCollector;
-import org.apache.flink.runtime.operators.shipping.OutputEmitter;
-import org.apache.flink.runtime.operators.shipping.RecordOutputCollector;
-import org.apache.flink.runtime.operators.shipping.RecordOutputEmitter;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
-import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The base class for all tasks. Encapsulated common behavior and implements 
the main life-cycle
- * of the user code.
- */
-public class RegularPactTask<S extends Function, OT> extends AbstractInvokable 
implements PactTaskContext<S, OT> {
-
-       protected static final Logger LOG = 
LoggerFactory.getLogger(RegularPactTask.class);
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * The driver that invokes the user code (the stub implementation). The 
central driver in this task
-        * (further drivers may be chained behind this driver).
-        */
-       protected volatile PactDriver<S, OT> driver;
-
-       /**
-        * The instantiated user code of this task's main operator (driver). 
May be null if the operator has no udf.
-        */
-       protected S stub;
-
-       /**
-        * The udf's runtime context.
-        */
-       protected DistributedRuntimeUDFContext runtimeUdfContext;
-
-       /**
-        * The collector that forwards the user code's results. May forward to 
a channel or to chained drivers within
-        * this task.
-        */
-       protected Collector<OT> output;
-
-       /**
-        * The output writers for the data that this task forwards to the next 
task. The latest driver (the central, if no chained
-        * drivers exist, otherwise the last chained driver) produces its 
output to these writers.
-        */
-       protected List<RecordWriter<?>> eventualOutputs;
-
-       /**
-        * The input readers of this task.
-        */
-       protected MutableReader<?>[] inputReaders;
-
-       /**
-        * The input readers for the configured broadcast variables for this 
task.
-        */
-       protected MutableReader<?>[] broadcastInputReaders;
-       
-       /**
-        * The inputs reader, wrapped in an iterator. Prior to the local 
strategies, etc...
-        */
-       protected MutableObjectIterator<?>[] inputIterators;
-
-       /**
-        * The indices of the iterative inputs. Empty, if the task is not 
iterative. 
-        */
-       protected int[] iterativeInputs;
-       
-       /**
-        * The indices of the iterative broadcast inputs. Empty, if non of the 
inputs is iteratve. 
-        */
-       protected int[] iterativeBroadcastInputs;
-       
-       /**
-        * The local strategies that are applied on the inputs.
-        */
-       protected volatile CloseableInputProvider<?>[] localStrategies;
-
-       /**
-        * The optional temp barriers on the inputs for dead-lock breaking. Are
-        * optionally resettable.
-        */
-       protected volatile TempBarrier<?>[] tempBarriers;
-
-       /**
-        * The resettable inputs in the case where no temp barrier is needed.
-        */
-       protected volatile SpillingResettableMutableObjectIterator<?>[] 
resettableInputs;
-
-       /**
-        * The inputs to the operator. Return the readers' data after the 
application of the local strategy
-        * and the temp-table barrier.
-        */
-       protected MutableObjectIterator<?>[] inputs;
-
-       /**
-        * The serializers for the input data type.
-        */
-       protected TypeSerializerFactory<?>[] inputSerializers;
-
-       /**
-        * The serializers for the broadcast input data types.
-        */
-       protected TypeSerializerFactory<?>[] broadcastInputSerializers;
-
-       /**
-        * The comparators for the central driver.
-        */
-       protected TypeComparator<?>[] inputComparators;
-
-       /**
-        * The task configuration with the setup parameters.
-        */
-       protected TaskConfig config;
-
-       /**
-        * A list of chained drivers, if there are any.
-        */
-       protected ArrayList<ChainedDriver<?, ?>> chainedTasks;
-
-       /**
-        * Certain inputs may be excluded from resetting. For example, the 
initial partial solution
-        * in an iteration head must not be reseted (it is read through the 
back channel), when all
-        * others are reseted.
-        */
-       private boolean[] excludeFromReset;
-
-       /**
-        * Flag indicating for each input whether it is cached and can be 
reseted.
-        */
-       private boolean[] inputIsCached;
-
-       /**
-        * flag indicating for each input whether it must be asynchronously 
materialized.
-        */
-       private boolean[] inputIsAsyncMaterialized;
-
-       /**
-        * The amount of memory per input that is dedicated to the 
materialization.
-        */
-       private int[] materializationMemory;
-
-       /**
-        * The flag that tags the task as still running. Checked periodically 
to abort processing.
-        */
-       protected volatile boolean running = true;
-
-       /**
-        * The accumulator map used in the RuntimeContext.
-        */
-       protected Map<String, Accumulator<?,?>> accumulatorMap;
-
-       // 
--------------------------------------------------------------------------------------------
-       //                                  Task Interface
-       // 
--------------------------------------------------------------------------------------------
-
-
-       /**
-        * Initialization method. Runs in the execution graph setup phase in 
the JobManager
-        * and as a setup method on the TaskManager.
-        */
-       @Override
-       public void registerInputOutput() throws Exception {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(formatLogString("Start registering input and 
output."));
-               }
-
-               // obtain task configuration (including stub parameters)
-               Configuration taskConf = getTaskConfiguration();
-               this.config = new TaskConfig(taskConf);
-
-               // now get the operator class which drives the operation
-               final Class<? extends PactDriver<S, OT>> driverClass = 
this.config.getDriver();
-               this.driver = InstantiationUtil.instantiate(driverClass, 
PactDriver.class);
-
-               // initialize the readers.
-               // this does not yet trigger any stream consuming or processing.
-               initInputReaders();
-               initBroadcastInputReaders();
-
-               // initialize the writers.
-               initOutputs();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(formatLogString("Finished registering input 
and output."));
-               }
-       }
-
-
-       /**
-        * The main work method.
-        */
-       @Override
-       public void invoke() throws Exception {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(formatLogString("Start task code."));
-               }
-
-               Environment env = getEnvironment();
-
-               this.runtimeUdfContext = 
createRuntimeContext(env.getTaskName());
-
-               // whatever happens in this scope, make sure that the local 
strategies are cleaned up!
-               // note that the initialization of the local strategies is in 
the try-finally block as well,
-               // so that the thread that creates them catches its own errors 
that may happen in that process.
-               // this is especially important, since there may be 
asynchronous closes (such as through canceling).
-               try {
-                       // initialize the remaining data structures on the 
input and trigger the local processing
-                       // the local processing includes building the dams / 
caches
-                       try {
-                               int numInputs = driver.getNumberOfInputs();
-                               int numComparators = 
driver.getNumberOfDriverComparators();
-                               int numBroadcastInputs = 
this.config.getNumBroadcastInputs();
-                               
-                               initInputsSerializersAndComparators(numInputs, 
numComparators);
-                               
initBroadcastInputsSerializers(numBroadcastInputs);
-                               
-                               // set the iterative status for inputs and 
broadcast inputs
-                               {
-                                       List<Integer> iterativeInputs = new 
ArrayList<Integer>();
-                                       
-                                       for (int i = 0; i < numInputs; i++) {
-                                               final int 
numberOfEventsUntilInterrupt = 
getTaskConfig().getNumberOfEventsUntilInterruptInIterativeGate(i);
-                       
-                                               if 
(numberOfEventsUntilInterrupt < 0) {
-                                                       throw new 
IllegalArgumentException();
-                                               }
-                                               else if 
(numberOfEventsUntilInterrupt > 0) {
-                                                       
this.inputReaders[i].setIterativeReader();
-                                                       iterativeInputs.add(i);
-                               
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               
LOG.debug(formatLogString("Input [" + i + "] reads in supersteps with [" +
-                                                                               
+ numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
-                                                       }
-                                               }
-                                       }
-                                       this.iterativeInputs = 
asArray(iterativeInputs);
-                               }
-                               
-                               {
-                                       List<Integer> iterativeBcInputs = new 
ArrayList<Integer>();
-                                       
-                                       for (int i = 0; i < numBroadcastInputs; 
i++) {
-                                               final int 
numberOfEventsUntilInterrupt = 
getTaskConfig().getNumberOfEventsUntilInterruptInIterativeBroadcastGate(i);
-                                               
-                                               if 
(numberOfEventsUntilInterrupt < 0) {
-                                                       throw new 
IllegalArgumentException();
-                                               }
-                                               else if 
(numberOfEventsUntilInterrupt > 0) {
-                                                       
this.broadcastInputReaders[i].setIterativeReader();
-                                                       
iterativeBcInputs.add(i);
-                               
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               
LOG.debug(formatLogString("Broadcast input [" + i + "] reads in supersteps with 
[" +
-                                                                               
+ numberOfEventsUntilInterrupt + "] event(s) till next superstep."));
-                                                       }
-                                               }
-                                       }
-                                       this.iterativeBroadcastInputs = 
asArray(iterativeBcInputs);
-                               }
-                               
-                               initLocalStrategies(numInputs);
-                       }
-                       catch (Exception e) {
-                               throw new RuntimeException("Initializing the 
input processing failed" +
-                                               (e.getMessage() == null ? "." : 
": " + e.getMessage()), e);
-                       }
-
-                       if (!this.running) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug(formatLogString("Task 
cancelled before task code was started."));
-                               }
-                               return;
-                       }
-
-                       // pre main-function initialization
-                       initialize();
-
-                       // read the broadcast variables. they will be released 
in the finally clause 
-                       for (int i = 0; i < 
this.config.getNumBroadcastInputs(); i++) {
-                               final String name = 
this.config.getBroadcastInputName(i);
-                               readAndSetBroadcastInput(i, name, 
this.runtimeUdfContext, 1 /* superstep one for the start */);
-                       }
-
-                       // the work goes here
-                       run();
-               }
-               finally {
-                       // clean up in any case!
-                       closeLocalStrategiesAndCaches();
-
-                       clearReaders(inputReaders);
-                       clearWriters(eventualOutputs);
-
-               }
-
-               if (this.running) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug(formatLogString("Finished task 
code."));
-                       }
-               } else {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug(formatLogString("Task code 
cancelled."));
-                       }
-               }
-       }
-
-       @Override
-       public void cancel() throws Exception {
-               this.running = false;
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(formatLogString("Cancelling task code"));
-               }
-
-               try {
-                       if (this.driver != null) {
-                               this.driver.cancel();
-                       }
-               } finally {
-                       closeLocalStrategiesAndCaches();
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //                                  Main Work Methods
-       // 
--------------------------------------------------------------------------------------------
-
-       protected void initialize() throws Exception {
-               // create the operator
-               try {
-                       this.driver.setup(this);
-               }
-               catch (Throwable t) {
-                       throw new Exception("The driver setup for '" + 
this.getEnvironment().getTaskName() +
-                               "' , caused an error: " + t.getMessage(), t);
-               }
-               
-               // instantiate the UDF
-               try {
-                       final Class<? super S> userCodeFunctionType = 
this.driver.getStubType();
-                       // if the class is null, the driver has no user code
-                       if (userCodeFunctionType != null) {
-                               this.stub = initStub(userCodeFunctionType);
-                       }
-               } catch (Exception e) {
-                       throw new RuntimeException("Initializing the UDF" +
-                                       (e.getMessage() == null ? "." : ": " + 
e.getMessage()), e);
-               }
-       }
-       
-       protected <X> void readAndSetBroadcastInput(int inputNum, String 
bcVarName, DistributedRuntimeUDFContext context, int superstep) throws 
IOException {
-               
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(formatLogString("Setting broadcast variable 
'" + bcVarName + "'" + 
-                               (superstep > 1 ? ", superstep " + superstep : 
"")));
-               }
-               
-               @SuppressWarnings("unchecked")
-               final TypeSerializerFactory<X> serializerFactory =  
(TypeSerializerFactory<X>) this.broadcastInputSerializers[inputNum];
-               
-               final MutableReader<?> reader = 
this.broadcastInputReaders[inputNum];
-
-               BroadcastVariableMaterialization<X, ?> variable = 
getEnvironment().getBroadcastVariableManager().materializeBroadcastVariable(bcVarName,
 superstep, this, reader, serializerFactory);
-               context.setBroadcastVariable(bcVarName, variable);
-       }
-       
-       protected void releaseBroadcastVariables(String bcVarName, int 
superstep, DistributedRuntimeUDFContext context) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(formatLogString("Releasing broadcast variable 
'" + bcVarName + "'" + 
-                               (superstep > 1 ? ", superstep " + superstep : 
"")));
-               }
-               
-               
getEnvironment().getBroadcastVariableManager().releaseReference(bcVarName, 
superstep, this);
-               context.clearBroadcastVariable(bcVarName);
-       }
-       
-
-       protected void run() throws Exception {
-               // ---------------------------- Now, the actual processing 
starts ------------------------
-               // check for asynchronous canceling
-               if (!this.running) {
-                       return;
-               }
-
-               boolean stubOpen = false;
-
-               try {
-                       // run the data preparation
-                       try {
-                               this.driver.prepare();
-                       }
-                       catch (Throwable t) {
-                               // if the preparation caused an error, clean up
-                               // errors during clean-up are swallowed, 
because we have already a root exception
-                               throw new Exception("The data preparation for 
task '" + this.getEnvironment().getTaskName() +
-                                       "' , caused an error: " + 
t.getMessage(), t);
-                       }
-
-                       // check for canceling
-                       if (!this.running) {
-                               return;
-                       }
-
-                       // start all chained tasks
-                       RegularPactTask.openChainedTasks(this.chainedTasks, 
this);
-
-                       // open stub implementation
-                       if (this.stub != null) {
-                               try {
-                                       Configuration stubConfig = 
this.config.getStubParameters();
-                                       FunctionUtils.openFunction(this.stub, 
stubConfig);
-                                       stubOpen = true;
-                               }
-                               catch (Throwable t) {
-                                       throw new Exception("The user defined 
'open()' method caused an exception: " + t.getMessage(), t);
-                               }
-                       }
-
-                       // run the user code
-                       this.driver.run();
-
-                       // close. We close here such that a regular close 
throwing an exception marks a task as failed.
-                       if (this.running && this.stub != null) {
-                               FunctionUtils.closeFunction(this.stub);
-                               stubOpen = false;
-                       }
-
-                       this.output.close();
-
-                       // close all chained tasks letting them report failure
-                       RegularPactTask.closeChainedTasks(this.chainedTasks, 
this);
-               }
-               catch (Exception ex) {
-                       // close the input, but do not report any exceptions, 
since we already have another root cause
-                       if (stubOpen) {
-                               try {
-                                       FunctionUtils.closeFunction(this.stub);
-                               }
-                               catch (Throwable t) {
-                                       // do nothing
-                               }
-                       }
-                       
-                       // if resettable driver invoke teardown
-                       if (this.driver instanceof ResettablePactDriver) {
-                               final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
-                               try {
-                                       resDriver.teardown();
-                               } catch (Throwable t) {
-                                       throw new Exception("Error while 
shutting down an iterative operator: " + t.getMessage(), t);
-                               }
-                       }
-
-                       RegularPactTask.cancelChainedTasks(this.chainedTasks);
-
-                       ex = 
ExceptionInChainedStubException.exceptionUnwrap(ex);
-
-                       if (ex instanceof CancelTaskException) {
-                               // forward canceling exception
-                               throw ex;
-                       }
-                       else if (this.running) {
-                               // throw only if task was not cancelled. in the 
case of canceling, exceptions are expected 
-                               RegularPactTask.logAndThrowException(ex, this);
-                       }
-               }
-               finally {
-                       this.driver.cleanup();
-               }
-       }
-
-       protected void closeLocalStrategiesAndCaches() {
-               
-               // make sure that all broadcast variable references held by 
this task are released
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(formatLogString("Releasing all broadcast 
variables."));
-               }
-               
-               
getEnvironment().getBroadcastVariableManager().releaseAllReferencesFromTask(this);
-               if (runtimeUdfContext != null) {
-                       runtimeUdfContext.clearAllBroadcastVariables();
-               }
-               
-               // clean all local strategies and caches/pipeline breakers. 
-               
-               if (this.localStrategies != null) {
-                       for (int i = 0; i < this.localStrategies.length; i++) {
-                               if (this.localStrategies[i] != null) {
-                                       try {
-                                               this.localStrategies[i].close();
-                                       } catch (Throwable t) {
-                                               LOG.error("Error closing local 
strategy for input " + i, t);
-                                       }
-                               }
-                       }
-               }
-               if (this.tempBarriers != null) {
-                       for (int i = 0; i < this.tempBarriers.length; i++) {
-                               if (this.tempBarriers[i] != null) {
-                                       try {
-                                               this.tempBarriers[i].close();
-                                       } catch (Throwable t) {
-                                               LOG.error("Error closing temp 
barrier for input " + i, t);
-                                       }
-                               }
-                       }
-               }
-               if (this.resettableInputs != null) {
-                       for (int i = 0; i < this.resettableInputs.length; i++) {
-                               if (this.resettableInputs[i] != null) {
-                                       try {
-                                               
this.resettableInputs[i].close();
-                                       } catch (Throwable t) {
-                                               LOG.error("Error closing cache 
for input " + i, t);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //                                 Task Setup and Teardown
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * @return the last output collector in the collector chain
-        */
-       @SuppressWarnings("unchecked")
-       protected Collector<OT> getLastOutputCollector() {
-               int numChained = this.chainedTasks.size();
-               return (numChained == 0) ? output : (Collector<OT>) 
chainedTasks.get(numChained - 1).getOutputCollector();
-       }
-
-       /**
-        * Sets the last output {@link Collector} of the collector chain of 
this {@link RegularPactTask}.
-        * <p>
-        * In case of chained tasks, the output collector of the last {@link 
ChainedDriver} is set. Otherwise it is the
-        * single collector of the {@link RegularPactTask}.
-        *
-        * @param newOutputCollector new output collector to set as last 
collector
-        */
-       protected void setLastOutputCollector(Collector<OT> newOutputCollector) 
{
-               int numChained = this.chainedTasks.size();
-
-               if (numChained == 0) {
-                       output = newOutputCollector;
-                       return;
-               }
-
-               chainedTasks.get(numChained - 
1).setOutputCollector(newOutputCollector);
-       }
-
-       public TaskConfig getLastTasksConfig() {
-               int numChained = this.chainedTasks.size();
-               return (numChained == 0) ? config : chainedTasks.get(numChained 
- 1).getTaskConfig();
-       }
-
-       protected S initStub(Class<? super S> stubSuperClass) throws Exception {
-               try {
-                       ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
-                       S stub = 
config.<S>getStubWrapper(userCodeClassLoader).getUserCodeObject(stubSuperClass, 
userCodeClassLoader);
-                       // check if the class is a subclass, if the check is 
required
-                       if (stubSuperClass != null && 
!stubSuperClass.isAssignableFrom(stub.getClass())) {
-                               throw new RuntimeException("The class '" + 
stub.getClass().getName() + "' is not a subclass of '" + 
-                                               stubSuperClass.getName() + "' 
as is required.");
-                       }
-                       FunctionUtils.setFunctionRuntimeContext(stub, 
this.runtimeUdfContext);
-                       return stub;
-               }
-               catch (ClassCastException ccex) {
-                       throw new Exception("The stub class is not a proper 
subclass of " + stubSuperClass.getName(), ccex);
-               }
-       }
-
-       /**
-        * Creates the record readers for the number of inputs as defined by 
{@link #getNumTaskInputs()}.
-        *
-        * This method requires that the task configuration, the driver, and 
the user-code class loader are set.
-        */
-       protected void initInputReaders() throws Exception {
-               final int numInputs = getNumTaskInputs();
-               final MutableReader<?>[] inputReaders = new 
MutableReader<?>[numInputs];
-
-               int currentReaderOffset = 0;
-
-               AccumulatorRegistry registry = 
getEnvironment().getAccumulatorRegistry();
-               AccumulatorRegistry.Reporter reporter = 
registry.getReadWriteReporter();
-
-               for (int i = 0; i < numInputs; i++) {
-                       //  ---------------- create the input readers 
---------------------
-                       // in case where a logical input unions multiple 
physical inputs, create a union reader
-                       final int groupSize = this.config.getGroupSize(i);
-
-                       if (groupSize == 1) {
-                               // non-union case
-                               inputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
-                       } else if (groupSize > 1){
-                               // union case
-                               InputGate[] readers = new InputGate[groupSize];
-                               for (int j = 0; j < groupSize; ++j) {
-                                       readers[j] = 
getEnvironment().getInputGate(currentReaderOffset + j);
-                               }
-                               inputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
-                       } else {
-                               throw new Exception("Illegal input group size 
in task configuration: " + groupSize);
-                       }
-
-                       inputReaders[i].setReporter(reporter);
-
-                       currentReaderOffset += groupSize;
-               }
-               this.inputReaders = inputReaders;
-
-               // final sanity check
-               if (currentReaderOffset != this.config.getNumInputs()) {
-                       throw new Exception("Illegal configuration: Number of 
input gates and group sizes are not consistent.");
-               }
-       }
-
-       /**
-        * Creates the record readers for the extra broadcast inputs as 
configured by {@link TaskConfig#getNumBroadcastInputs()}.
-        *
-        * This method requires that the task configuration, the driver, and 
the user-code class loader are set.
-        */
-       protected void initBroadcastInputReaders() throws Exception {
-               final int numBroadcastInputs = 
this.config.getNumBroadcastInputs();
-               final MutableReader<?>[] broadcastInputReaders = new 
MutableReader<?>[numBroadcastInputs];
-
-               int currentReaderOffset = config.getNumInputs();
-
-               for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
-                       //  ---------------- create the input readers 
---------------------
-                       // in case where a logical input unions multiple 
physical inputs, create a union reader
-                       final int groupSize = 
this.config.getBroadcastGroupSize(i);
-                       if (groupSize == 1) {
-                               // non-union case
-                               broadcastInputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(getEnvironment().getInputGate(currentReaderOffset));
-                       } else if (groupSize > 1){
-                               // union case
-                               InputGate[] readers = new InputGate[groupSize];
-                               for (int j = 0; j < groupSize; ++j) {
-                                       readers[j] = 
getEnvironment().getInputGate(currentReaderOffset + j);
-                               }
-                               broadcastInputReaders[i] = new 
MutableRecordReader<IOReadableWritable>(new UnionInputGate(readers));
-                       } else {
-                               throw new Exception("Illegal input group size 
in task configuration: " + groupSize);
-                       }
-
-                       currentReaderOffset += groupSize;
-               }
-               this.broadcastInputReaders = broadcastInputReaders;
-       }
-       
-       /**
-        * Creates all the serializers and comparators.
-        */
-       protected void initInputsSerializersAndComparators(int numInputs, int 
numComparators) throws Exception {
-               this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
-               this.inputComparators = numComparators > 0 ? new 
TypeComparator<?>[numComparators] : null;
-               this.inputIterators = new MutableObjectIterator<?>[numInputs];
-
-               ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-               
-               for (int i = 0; i < numInputs; i++) {
-                       
-                       final TypeSerializerFactory<?> serializerFactory = 
this.config.getInputSerializer(i, userCodeClassLoader);
-                       this.inputSerializers[i] = serializerFactory;
-                       
-                       this.inputIterators[i] = 
createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
-               }
-               
-               //  ---------------- create the driver's comparators 
---------------------
-               for (int i = 0; i < numComparators; i++) {
-                       
-                       if (this.inputComparators != null) {
-                               final TypeComparatorFactory<?> 
comparatorFactory = this.config.getDriverComparator(i, userCodeClassLoader);
-                               this.inputComparators[i] = 
comparatorFactory.createComparator();
-                       }
-               }
-       }
-       
-       /**
-        * Creates all the serializers and iterators for the broadcast inputs.
-        */
-       protected void initBroadcastInputsSerializers(int numBroadcastInputs) 
throws Exception {
-               this.broadcastInputSerializers = new 
TypeSerializerFactory<?>[numBroadcastInputs];
-
-               ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-
-               for (int i = 0; i < numBroadcastInputs; i++) {
-                       //  ---------------- create the serializer first 
---------------------
-                       final TypeSerializerFactory<?> serializerFactory = 
this.config.getBroadcastInputSerializer(i, userCodeClassLoader);
-                       this.broadcastInputSerializers[i] = serializerFactory;
-               }
-       }
-
-       /**
-        *
-        * NOTE: This method must be invoked after the invocation of {@code 
#initInputReaders()} and
-        * {@code #initInputSerializersAndComparators(int)}!
-        *
-        * @param numInputs
-        */
-       protected void initLocalStrategies(int numInputs) throws Exception {
-
-               final MemoryManager memMan = getMemoryManager();
-               final IOManager ioMan = getIOManager();
-
-               this.localStrategies = new CloseableInputProvider<?>[numInputs];
-               this.inputs = new MutableObjectIterator<?>[numInputs];
-               this.excludeFromReset = new boolean[numInputs];
-               this.inputIsCached = new boolean[numInputs];
-               this.inputIsAsyncMaterialized = new boolean[numInputs];
-               this.materializationMemory = new int[numInputs];
-
-               // set up the local strategies first, such that the can work 
before any temp barrier is created
-               for (int i = 0; i < numInputs; i++) {
-                       initInputLocalStrategy(i);
-               }
-
-               // we do another loop over the inputs, because we want to 
instantiate all
-               // sorters, etc before requesting the first input (as this call 
may block)
-
-               // we have two types of materialized inputs, and both are 
replayable (can act as a cache)
-               // The first variant materializes in a different thread and 
hence
-               // acts as a pipeline breaker. this one should only be there, 
if a pipeline breaker is needed.
-               // the second variant spills to the side and will not read 
unless the result is also consumed
-               // in a pipelined fashion.
-               this.resettableInputs = new 
SpillingResettableMutableObjectIterator<?>[numInputs];
-               this.tempBarriers = new TempBarrier<?>[numInputs];
-
-               for (int i = 0; i < numInputs; i++) {
-                       final int memoryPages;
-                       final boolean async = 
this.config.isInputAsynchronouslyMaterialized(i);
-                       final boolean cached =  this.config.isInputCached(i);
-
-                       this.inputIsAsyncMaterialized[i] = async;
-                       this.inputIsCached[i] = cached;
-
-                       if (async || cached) {
-                               memoryPages = 
memMan.computeNumberOfPages(this.config.getRelativeInputMaterializationMemory(i));
-                               if (memoryPages <= 0) {
-                                       throw new Exception("Input marked as 
materialized/cached, but no memory for materialization provided.");
-                               }
-                               this.materializationMemory[i] = memoryPages;
-                       } else {
-                               memoryPages = 0;
-                       }
-
-                       if (async) {
-                               @SuppressWarnings({ "unchecked", "rawtypes" })
-                               TempBarrier<?> barrier = new TempBarrier(this, 
getInput(i), this.inputSerializers[i], memMan, ioMan, memoryPages);
-                               barrier.startReading();
-                               this.tempBarriers[i] = barrier;
-                               this.inputs[i] = null;
-                       } else if (cached) {
-                               @SuppressWarnings({ "unchecked", "rawtypes" })
-                               SpillingResettableMutableObjectIterator<?> iter 
= new SpillingResettableMutableObjectIterator(
-                                       getInput(i), 
this.inputSerializers[i].getSerializer(), getMemoryManager(), getIOManager(), 
memoryPages, this);
-                               this.resettableInputs[i] = iter;
-                               this.inputs[i] = iter;
-                       }
-               }
-       }
-
-       protected void resetAllInputs() throws Exception {
-
-               // first we need to make sure that caches consume remaining data
-               // NOTE: we need to do this before closing the local strategies
-               for (int i = 0; i < this.inputs.length; i++) {
-
-                       if (this.inputIsCached[i] && this.resettableInputs[i] 
!= null) {
-                               
this.resettableInputs[i].consumeAndCacheRemainingData();
-                       }
-               }
-
-               // close all local-strategies. they will either get 
re-initialized, or we have
-               // read them now and their data is cached
-               for (int i = 0; i < this.localStrategies.length; i++) {
-                       if (this.localStrategies[i] != null) {
-                               this.localStrategies[i].close();
-                               this.localStrategies[i] = null;
-                       }
-               }
-
-               final MemoryManager memMan = getMemoryManager();
-               final IOManager ioMan = getIOManager();
-
-               // reset the caches, or re-run the input local strategy
-               for (int i = 0; i < this.inputs.length; i++) {
-                       if (this.excludeFromReset[i]) {
-                               if (this.tempBarriers[i] != null) {
-                                       this.tempBarriers[i].close();
-                                       this.tempBarriers[i] = null;
-                               } else if (this.resettableInputs[i] != null) {
-                                       this.resettableInputs[i].close();
-                                       this.resettableInputs[i] = null;
-                               }
-                       } else {
-                               // make sure the input is not available 
directly, but are lazily fetched again
-                               this.inputs[i] = null;
-
-                               if (this.inputIsCached[i]) {
-                                       if (this.tempBarriers[i] != null) {
-                                               this.inputs[i] = 
this.tempBarriers[i].getIterator();
-                                       } else if (this.resettableInputs[i] != 
null) {
-                                               
this.resettableInputs[i].reset();
-                                               this.inputs[i] = 
this.resettableInputs[i];
-                                       } else {
-                                               throw new 
RuntimeException("Found a resettable input, but no temp barrier and no 
resettable iterator.");
-                                       }
-                               } else {
-                                       // close the async barrier if there is 
one
-                                       if (this.tempBarriers[i] != null) {
-                                               this.tempBarriers[i].close();
-                                       }
-
-                                       // recreate the local strategy
-                                       initInputLocalStrategy(i);
-
-                                       if (this.inputIsAsyncMaterialized[i]) {
-                                               final int pages = 
this.materializationMemory[i];
-                                               @SuppressWarnings({ 
"unchecked", "rawtypes" })
-                                               TempBarrier<?> barrier = new 
TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, pages);
-                                               barrier.startReading();
-                                               this.tempBarriers[i] = barrier;
-                                               this.inputs[i] = null;
-                                       }
-                               }
-                       }
-               }
-       }
-
-       protected void excludeFromReset(int inputNum) {
-               this.excludeFromReset[inputNum] = true;
-       }
-
-       private void initInputLocalStrategy(int inputNum) throws Exception {
-               // check if there is already a strategy
-               if (this.localStrategies[inputNum] != null) {
-                       throw new IllegalStateException();
-               }
-
-               // now set up the local strategy
-               final LocalStrategy localStrategy = 
this.config.getInputLocalStrategy(inputNum);
-               if (localStrategy != null) {
-                       switch (localStrategy) {
-                       case NONE:
-                               // the input is as it is
-                               this.inputs[inputNum] = 
this.inputIterators[inputNum];
-                               break;
-                       case SORT:
-                               @SuppressWarnings({ "rawtypes", "unchecked" })
-                               UnilateralSortMerger<?> sorter = new 
UnilateralSortMerger(getMemoryManager(), getIOManager(),
-                                       this.inputIterators[inputNum], this, 
this.inputSerializers[inputNum], getLocalStrategyComparator(inputNum),
-                                       
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                       
this.config.getSpillingThresholdInput(inputNum), 
this.getExecutionConfig().isObjectReuseEnabled());
-                               // set the input to null such that it will be 
lazily fetched from the input strategy
-                               this.inputs[inputNum] = null;
-                               this.localStrategies[inputNum] = sorter;
-                               break;
-                       case COMBININGSORT:
-                               // sanity check this special case!
-                               // this still breaks a bit of the abstraction!
-                               // we should have nested configurations for the 
local strategies to solve that
-                               if (inputNum != 0) {
-                                       throw new 
IllegalStateException("Performing combining sort outside a (group)reduce 
task!");
-                               }
-
-                               // instantiate ourselves a combiner. we should 
not use the stub, because the sort and the
-                               // subsequent (group)reduce would otherwise 
share it multi-threaded
-                               final Class<S> userCodeFunctionType = 
this.driver.getStubType();
-                               if (userCodeFunctionType == null) {
-                                       throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
-                               }
-                               final S localStub;
-                               try {
-                                       localStub = 
initStub(userCodeFunctionType);
-                               } catch (Exception e) {
-                                       throw new 
RuntimeException("Initializing the user code and the configuration failed" +
-                                                       (e.getMessage() == null 
? "." : ": " + e.getMessage()), e);
-                               }
-                               
-                               if (!(localStub instanceof 
GroupCombineFunction)) {
-                                       throw new 
IllegalStateException("Performing combining sort outside a reduce task!");
-                               }
-
-                               @SuppressWarnings({ "rawtypes", "unchecked" })
-                               CombiningUnilateralSortMerger<?> cSorter = new 
CombiningUnilateralSortMerger(
-                                       (GroupCombineFunction) localStub, 
getMemoryManager(), getIOManager(), this.inputIterators[inputNum],
-                                       this, this.inputSerializers[inputNum], 
getLocalStrategyComparator(inputNum),
-                                       
this.config.getRelativeMemoryInput(inputNum), 
this.config.getFilehandlesInput(inputNum),
-                                       
this.config.getSpillingThresholdInput(inputNum), 
this.getExecutionConfig().isObjectReuseEnabled());
-                               
cSorter.setUdfConfiguration(this.config.getStubParameters());
-
-                               // set the input to null such that it will be 
lazily fetched from the input strategy
-                               this.inputs[inputNum] = null;
-                               this.localStrategies[inputNum] = cSorter;
-                               break;
-                       default:
-                               throw new Exception("Unrecognized local 
strategy provided: " + localStrategy.name());
-                       }
-               } else {
-                       // no local strategy in the config
-                       this.inputs[inputNum] = this.inputIterators[inputNum];
-               }
-       }
-
-       private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) 
throws Exception {
-               TypeComparatorFactory<T> compFact = 
this.config.getInputComparator(inputNum, getUserCodeClassLoader());
-               if (compFact == null) {
-                       throw new Exception("Missing comparator factory for 
local strategy on input " + inputNum);
-               }
-               return compFact.createComparator();
-       }
-       
-       protected MutableObjectIterator<?> createInputIterator(MutableReader<?> 
inputReader, TypeSerializerFactory<?> serializerFactory) {
-               @SuppressWarnings("unchecked")
-               MutableReader<DeserializationDelegate<?>> reader = 
(MutableReader<DeserializationDelegate<?>>) inputReader;
-               @SuppressWarnings({ "unchecked", "rawtypes" })
-               final MutableObjectIterator<?> iter = new 
ReaderIterator(reader, serializerFactory.getSerializer());
-               return iter;
-       }
-
-       protected int getNumTaskInputs() {
-               return this.driver.getNumberOfInputs();
-       }
-
-       /**
-        * Creates a writer for each output. Creates an OutputCollector which 
forwards its input to all writers.
-        * The output collector applies the configured shipping strategies for 
each writer.
-        */
-       protected void initOutputs() throws Exception {
-               this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
-               this.eventualOutputs = new ArrayList<RecordWriter<?>>();
-
-               ClassLoader userCodeClassLoader = getUserCodeClassLoader();
-
-               AccumulatorRegistry accumulatorRegistry = 
getEnvironment().getAccumulatorRegistry();
-               AccumulatorRegistry.Reporter reporter = 
accumulatorRegistry.getReadWriteReporter();
-
-               this.accumulatorMap = accumulatorRegistry.getUserMap();
-
-               this.output = initOutputs(this, userCodeClassLoader, 
this.config, this.chainedTasks, this.eventualOutputs,
-                               this.getExecutionConfig(), reporter, 
this.accumulatorMap);
-       }
-
-       public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
-               Environment env = getEnvironment();
-
-               return new DistributedRuntimeUDFContext(taskName, 
env.getNumberOfSubtasks(),
-                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(),
-                               env.getDistributedCacheEntries(), 
this.accumulatorMap);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //                                   Task Context Signature
-       // 
-------------------------------------------------------------------------------------------
-
-       @Override
-       public TaskConfig getTaskConfig() {
-               return this.config;
-       }
-
-       @Override
-       public TaskManagerRuntimeInfo getTaskManagerInfo() {
-               return getEnvironment().getTaskManagerInfo();
-       }
-
-       @Override
-       public MemoryManager getMemoryManager() {
-               return getEnvironment().getMemoryManager();
-       }
-
-       @Override
-       public IOManager getIOManager() {
-               return getEnvironment().getIOManager();
-       }
-
-       @Override
-       public S getStub() {
-               return this.stub;
-       }
-
-       @Override
-       public Collector<OT> getOutputCollector() {
-               return this.output;
-       }
-
-       @Override
-       public AbstractInvokable getOwningNepheleTask() {
-               return this;
-       }
-
-       @Override
-       public String formatLogString(String message) {
-               return constructLogString(message, 
getEnvironment().getTaskName(), this);
-       }
-
-       @Override
-       public <X> MutableObjectIterator<X> getInput(int index) {
-               if (index < 0 || index > this.driver.getNumberOfInputs()) {
-                       throw new IndexOutOfBoundsException();
-               }
-
-               // check for lazy assignment from input strategies
-               if (this.inputs[index] != null) {
-                       @SuppressWarnings("unchecked")
-                       MutableObjectIterator<X> in = 
(MutableObjectIterator<X>) this.inputs[index];
-                       return in;
-               } else {
-                       final MutableObjectIterator<X> in;
-                       try {
-                               if (this.tempBarriers[index] != null) {
-                                       @SuppressWarnings("unchecked")
-                                       MutableObjectIterator<X> iter = 
(MutableObjectIterator<X>) this.tempBarriers[index].getIterator();
-                                       in = iter;
-                               } else if (this.localStrategies[index] != null) 
{
-                                       @SuppressWarnings("unchecked")
-                                       MutableObjectIterator<X> iter = 
(MutableObjectIterator<X>) this.localStrategies[index].getIterator();
-                                       in = iter;
-                               } else {
-                                       throw new RuntimeException("Bug: null 
input iterator, null temp barrier, and null local strategy.");
-                               }
-                               this.inputs[index] = in;
-                               return in;
-                       } catch (InterruptedException iex) {
-                               throw new RuntimeException("Interrupted while 
waiting for input " + index + " to become available.");
-                       } catch (IOException ioex) {
-                               throw new RuntimeException("An I/O Exception 
occurred while obtaining input " + index + ".");
-                       }
-               }
-       }
-
-
-       @Override
-       public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
-               if (index < 0 || index >= this.driver.getNumberOfInputs()) {
-                       throw new IndexOutOfBoundsException();
-               }
-
-               @SuppressWarnings("unchecked")
-               final TypeSerializerFactory<X> serializerFactory = 
(TypeSerializerFactory<X>) this.inputSerializers[index];
-               return serializerFactory;
-       }
-
-
-       @Override
-       public <X> TypeComparator<X> getDriverComparator(int index) {
-               if (this.inputComparators == null) {
-                       throw new IllegalStateException("Comparators have not 
been created!");
-               }
-               else if (index < 0 || index >= 
this.driver.getNumberOfDriverComparators()) {
-                       throw new IndexOutOfBoundsException();
-               }
-
-               @SuppressWarnings("unchecked")
-               final TypeComparator<X> comparator = (TypeComparator<X>) 
this.inputComparators[index];
-               return comparator;
-       }
-
-       // 
============================================================================================
-       //                                     Static Utilities
-       //
-       //            Utilities are consolidated here to ensure a uniform way 
of running,
-       //                   logging, exception handling, and error messages.
-       // 
============================================================================================
-
-       // 
--------------------------------------------------------------------------------------------
-       //                                       Logging
-       // 
--------------------------------------------------------------------------------------------
-       /**
-        * Utility function that composes a string for logging purposes. The 
string includes the given message,
-        * the given name of the task and the index in its subtask group as 
well as the number of instances
-        * that exist in its subtask group.
-        *
-        * @param message The main message for the log.
-        * @param taskName The name of the task.
-        * @param parent The nephele task that contains the code producing the 
message.
-        *
-        * @return The string for logging.
-        */
-       public static String constructLogString(String message, String 
taskName, AbstractInvokable parent) {
-               return message + ":  " + taskName + " (" + 
(parent.getEnvironment().getIndexInSubtaskGroup() + 1) +
-                               '/' + 
parent.getEnvironment().getNumberOfSubtasks() + ')';
-       }
-
-       /**
-        * Prints an error message and throws the given exception. If the 
exception is of the type
-        * {@link ExceptionInChainedStubException} then the chain of contained 
exceptions is followed
-        * until an exception of a different type is found.
-        *
-        * @param ex The exception to be thrown.
-        * @param parent The parent task, whose information is included in the 
log message.
-        * @throws Exception Always thrown.
-        */
-       public static void logAndThrowException(Exception ex, AbstractInvokable 
parent) throws Exception {
-               String taskName;
-               if (ex instanceof ExceptionInChainedStubException) {
-                       do {
-                               ExceptionInChainedStubException cex = 
(ExceptionInChainedStubException) ex;
-                               taskName = cex.getTaskName();
-                               ex = cex.getWrappedException();
-                       } while (ex instanceof ExceptionInChainedStubException);
-               } else {
-                       taskName = parent.getEnvironment().getTaskName();
-               }
-
-               if (LOG.isErrorEnabled()) {
-                       LOG.error(constructLogString("Error in task code", 
taskName, parent), ex);
-               }
-
-               throw ex;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //                             Result Shipping and Chained Tasks
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Creates the {@link Collector} for the given task, as described by 
the given configuration. The
-        * output collector contains the writers that forward the data to the 
different tasks that the given task
-        * is connected to. Each writer applies a the partitioning as described 
in the configuration.
-        *
-        * @param task The task that the output collector is created for.
-        * @param config The configuration describing the output shipping 
strategies.
-        * @param cl The classloader used to load user defined types.
-        * @param eventualOutputs The output writers that this task forwards to 
the next task for each output.
-        * @param outputOffset The offset to start to get the writers for the 
outputs
-        * @param numOutputs The number of outputs described in the 
configuration.
-        *
-        * @return The OutputCollector that data produced in this task is 
submitted to.
-        */
-       public static <T> Collector<T> getOutputCollector(AbstractInvokable 
task, TaskConfig config, ClassLoader cl,
-                       List<RecordWriter<?>> eventualOutputs, int 
outputOffset, int numOutputs, AccumulatorRegistry.Reporter reporter) throws 
Exception
-       {
-               if (numOutputs == 0) {
-                       return null;
-               }
-
-               // get the factory for the serializer
-               final TypeSerializerFactory<T> serializerFactory = 
config.getOutputSerializer(cl);
-
-               // special case the Record
-               if (serializerFactory.getDataType().equals(Record.class)) {
-                       final List<RecordWriter<Record>> writers = new 
ArrayList<RecordWriter<Record>>(numOutputs);
-
-                       // create a writer for each output
-                       for (int i = 0; i < numOutputs; i++) {
-                               // create the OutputEmitter from output ship 
strategy
-                               final ShipStrategyType strategy = 
config.getOutputShipStrategy(i);
-                               final TypeComparatorFactory<?> compFact = 
config.getOutputComparator(i, cl);
-                               final RecordOutputEmitter oe;
-                               if (compFact == null) {
-                                       oe = new RecordOutputEmitter(strategy);
-                               } else {
-                                       @SuppressWarnings("unchecked")
-                                       TypeComparator<Record> comparator = 
(TypeComparator<Record>) compFact.createComparator();
-                                       if 
(!comparator.supportsCompareAgainstReference()) {
-                                               throw new 
Exception("Incompatibe serializer-/comparator factories.");
-                                       }
-                                       final DataDistribution distribution = 
config.getOutputDataDistribution(i, cl);
-                                       final Partitioner<?> partitioner = 
config.getOutputPartitioner(i, cl);
-
-                                       oe = new RecordOutputEmitter(strategy, 
comparator, partitioner, distribution);
-                               }
-
-                               // setup accumulator counters
-                               final RecordWriter<Record> recordWriter = new 
RecordWriter<Record>(task.getEnvironment().getWriter(outputOffset + i), oe);
-                               recordWriter.setReporter(reporter);
-
-                               writers.add(recordWriter);
-                       }
-                       if (eventualOutputs != null) {
-                               eventualOutputs.addAll(writers);
-                       }
-
-                       @SuppressWarnings("unchecked")
-                       final Collector<T> outColl = (Collector<T>) new 
RecordOutputCollector(writers);
-                       return outColl;
-               }
-               else {
-                       // generic case
-                       final List<RecordWriter<SerializationDelegate<T>>> 
writers = new ArrayList<RecordWriter<SerializationDelegate<T>>>(numOutputs);
-
-                       // create a writer for each output
-                       for (int i = 0; i < numOutputs; i++)
-                       {
-                               // create the OutputEmitter from output ship 
strategy
-                               final ShipStrategyType strategy = 
config.getOutputShipStrategy(i);
-                               final TypeComparatorFactory<T> compFactory = 
config.getOutputComparator(i, cl);
-
-                               final ChannelSelector<SerializationDelegate<T>> 
oe;
-                               if (compFactory == null) {
-                                       oe = new OutputEmitter<T>(strategy);
-                               }
-                               else {
-                                       final DataDistribution dataDist = 
config.getOutputDataDistribution(i, cl);
-                                       final Partitioner<?> partitioner = 
config.getOutputPartitioner(i, cl);
-
-                                       final TypeComparator<T> comparator = 
compFactory.createComparator();
-                                       oe = new OutputEmitter<T>(strategy, 
comparator, partitioner, dataDist);
-                               }
-
-                               final RecordWriter<SerializationDelegate<T>> 
recordWriter =
-                                               new 
RecordWriter<SerializationDelegate<T>>(task.getEnvironment().getWriter(outputOffset
 + i), oe);
-
-                               // setup live accumulator counters
-                               recordWriter.setReporter(reporter);
-
-                               writers.add(recordWriter);
-                       }
-                       if (eventualOutputs != null) {
-                               eventualOutputs.addAll(writers);
-                       }
-                       return new OutputCollector<T>(writers, 
serializerFactory.getSerializer());
-               }
-       }
-
-       /**
-        * Creates a writer for each output. Creates an OutputCollector which 
forwards its input to all writers.
-        * The output collector applies the configured shipping strategy.
-        */
-       @SuppressWarnings("unchecked")
-       public static <T> Collector<T> initOutputs(AbstractInvokable 
nepheleTask, ClassLoader cl, TaskConfig config,
-                                                                               
List<ChainedDriver<?, ?>> chainedTasksTarget,
-                                                                               
List<RecordWriter<?>> eventualOutputs,
-                                                                               
ExecutionConfig executionConfig,
-                                                                               
AccumulatorRegistry.Reporter reporter,
-                                                                               
Map<String, Accumulator<?,?>> accumulatorMap)
-       throws Exception
-       {
-               final int numOutputs = config.getNumOutputs();
-
-               // check whether we got any chained tasks
-               final int numChained = config.getNumberOfChainedStubs();
-               if (numChained > 0) {
-                       // got chained stubs. that means that this one may only 
have a single forward connection
-                       if (numOutputs != 1 || config.getOutputShipStrategy(0) 
!= ShipStrategyType.FORWARD) {
-                               throw new RuntimeException("Plan Generation 
Bug: Found a chained stub that is not connected via an only forward 
connection.");
-                       }
-
-                       // instantiate each task
-                       @SuppressWarnings("rawtypes")
-                       Collector previous = null;
-                       for (int i = numChained - 1; i >= 0; --i)
-                       {
-                               // get the task first
-                               final ChainedDriver<?, ?> ct;
-                               try {
-                                       Class<? extends ChainedDriver<?, ?>> 
ctc = config.getChainedTask(i);
-                                       ct = ctc.newInstance();
-                               }
-                               catch (Exception ex) {
-                                       throw new RuntimeException("Could not 
instantiate chained task driver.", ex);
-                               }
-
-                               // get the configuration for the task
-                               final TaskConfig chainedStubConf = 
config.getChainedStubConfig(i);
-                               final String taskName = 
config.getChainedTaskName(i);
-
-                               if (i == numChained - 1) {
-                                       // last in chain, instantiate the 
output collector for this task
-                                       previous = 
getOutputCollector(nepheleTask, chainedStubConf, cl, eventualOutputs, 0, 
chainedStubConf.getNumOutputs(), reporter);
-                               }
-
-                               ct.setup(chainedStubConf, taskName, previous, 
nepheleTask, cl, executionConfig, accumulatorMap);
-                               chainedTasksTarget.add(0, ct);
-
-                               previous = ct;
-                       }
-                       // the collector of the first in the chain is the 
collector for the nephele task
-                       return (Collector<T>) previous;
-               }
-               // else
-
-               // instantiate the output collector the default way from this 
configuration
-               return getOutputCollector(nepheleTask , config, cl, 
eventualOutputs, 0, numOutputs, reporter);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                  User Code LifeCycle
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Opens the given stub using its {@link 
org.apache.flink.api.common.functions.RichFunction#open(Configuration)} method. 
If the open call produces
-        * an exception, a new exception with a standard error message is 
created, using the encountered exception
-        * as its cause.
-        * 
-        * @param stub The user code instance to be opened.
-        * @param parameters The parameters supplied to the user code.
-        * 
-        * @throws Exception Thrown, if the user code's open method produces an 
exception.
-        */
-       public static void openUserCode(Function stub, Configuration 
parameters) throws Exception {
-               try {
-                       FunctionUtils.openFunction(stub, parameters);
-               } catch (Throwable t) {
-                       throw new Exception("The user defined 
'open(Configuration)' method in " + stub.getClass().toString() + " caused an 
exception: " + t.getMessage(), t);
-               }
-       }
-       
-       /**
-        * Closes the given stub using its {@link 
org.apache.flink.api.common.functions.RichFunction#close()} method. If the 
close call produces
-        * an exception, a new exception with a standard error message is 
created, using the encountered exception
-        * as its cause.
-        * 
-        * @param stub The user code instance to be closed.
-        * 
-        * @throws Exception Thrown, if the user code's close method produces 
an exception.
-        */
-       public static void closeUserCode(Function stub) throws Exception {
-               try {
-                       FunctionUtils.closeFunction(stub);
-               } catch (Throwable t) {
-                       throw new Exception("The user defined 'close()' method 
caused an exception: " + t.getMessage(), t);
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                               Chained Task LifeCycle
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Opens all chained tasks, in the order as they are stored in the 
array. The opening process
-        * creates a standardized log info message.
-        * 
-        * @param tasks The tasks to be opened.
-        * @param parent The parent task, used to obtain parameters to include 
in the log message.
-        * @throws Exception Thrown, if the opening encounters an exception.
-        */
-       public static void openChainedTasks(List<ChainedDriver<?, ?>> tasks, 
AbstractInvokable parent) throws Exception {
-               // start all chained tasks
-               for (int i = 0; i < tasks.size(); i++) {
-                       final ChainedDriver<?, ?> task = tasks.get(i);
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug(constructLogString("Start task code", 
task.getTaskName(), parent));
-                       }
-                       task.openTask();
-               }
-       }
-       
-       /**
-        * Closes all chained tasks, in the order as they are stored in the 
array. The closing process
-        * creates a standardized log info message.
-        * 
-        * @param tasks The tasks to be closed.
-        * @param parent The parent task, used to obtain parameters to include 
in the log message.
-        * @throws Exception Thrown, if the closing encounters an exception.
-        */
-       public static void closeChainedTasks(List<ChainedDriver<?, ?>> tasks, 
AbstractInvokable parent) throws Exception {
-               for (int i = 0; i < tasks.size(); i++) {
-                       final ChainedDriver<?, ?> task = tasks.get(i);
-                       task.closeTask();
-                       
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug(constructLogString("Finished task 
code", task.getTaskName(), parent));
-                       }
-               }
-       }
-       
-       /**
-        * Cancels all tasks via their {@link ChainedDriver#cancelTask()} 
method. Any occurring exception
-        * and error is suppressed, such that the canceling method of every 
task is invoked in all cases.
-        * 
-        * @param tasks The tasks to be canceled.
-        */
-       public static void cancelChainedTasks(List<ChainedDriver<?, ?>> tasks) {
-               for (int i = 0; i < tasks.size(); i++) {
-                       try {
-                               tasks.get(i).cancelTask();
-                       } catch (Throwable t) {
-                               // do nothing
-                       }
-               }
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //                                     Miscellaneous Utilities
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Instantiates a user code class from is definition in the task 
configuration.
-        * The class is instantiated without arguments using the null-ary 
constructor. Instantiation
-        * will fail if this constructor does not exist or is not public.
-        * 
-        * @param <T> The generic type of the user code class.
-        * @param config The task configuration containing the class 
description.
-        * @param cl The class loader to be used to load the class.
-        * @param superClass The super class that the user code class extends 
or implements, for type checking.
-        * 
-        * @return An instance of the user code class.
-        */
-       public static <T> T instantiateUserCode(TaskConfig config, ClassLoader 
cl, Class<? super T> superClass) {
-               try {
-                       T stub = 
config.<T>getStubWrapper(cl).getUserCodeObject(superClass, cl);
-                       // check if the class is a subclass, if the check is 
required
-                       if (superClass != null && 
!superClass.isAssignableFrom(stub.getClass())) {
-                               throw new RuntimeException("The class '" + 
stub.getClass().getName() + "' is not a subclass of '" + 
-                                               superClass.getName() + "' as is 
required.");
-                       }
-                       return stub;
-               }
-               catch (ClassCastException ccex) {
-                       throw new RuntimeException("The UDF class is not a 
proper subclass of " + superClass.getName(), ccex);
-               }
-       }
-       
-       private static int[] asArray(List<Integer> list) {
-               int[] a = new int[list.size()];
-               
-               int i = 0;
-               for (int val : list) {
-                       a[i++] = val;
-               }
-               return a;
-       }
-
-       public static void clearWriters(List<RecordWriter<?>> writers) {
-               for (RecordWriter<?> writer : writers) {
-                       writer.clearBuffers();
-               }
-       }
-
-       public static void clearReaders(MutableReader<?>[] readers) {
-               for (MutableReader<?> reader : readers) {
-                       reader.clearBuffers();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java
new file mode 100644
index 0000000..0ca7994
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettableDriver.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.functions.Function;
+
+
+/**
+ * This interface marks a {@code Driver} as resettable, meaning that will 
reset part of their internal state but
+ * otherwise reuse existing data structures.
+ *
+ * @see Driver
+ * @see TaskContext
+ * 
+ * @param <S> The type of stub driven by this driver.
+ * @param <OT> The data type of the records produced by this driver.
+ */
+public interface ResettableDriver<S extends Function, OT> extends Driver<S, 
OT> {
+       
+       boolean isInputResettable(int inputNum);
+       
+       void initialize() throws Exception;
+       
+       void reset() throws Exception;
+       
+       void teardown() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java
deleted file mode 100644
index 85cde1b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ResettablePactDriver.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.functions.Function;
-
-
-/**
- * This interface marks a {@code PactDriver} as resettable, meaning that will 
reset part of their internal state but
- * otherwise reuse existing data structures.
- *
- * @see PactDriver
- * @see PactTaskContext
- * 
- * @param <S> The type of stub driven by this driver.
- * @param <OT> The data type of the records produced by this driver.
- */
-public interface ResettablePactDriver<S extends Function, OT> extends 
PactDriver<S, OT> {
-       
-       boolean isInputResettable(int inputNum);
-       
-       void initialize() throws Exception;
-       
-       void reset() throws Exception;
-       
-       void teardown() throws Exception;
-}

Reply via email to