Repository: flink
Updated Branches:
  refs/heads/master e494c2795 -> b08669abf


http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
new file mode 100644
index 0000000..fd5d238
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/TaskContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+
+/**
+ * The task context gives a driver (e.g., {@link MapDriver}, or {@link 
JoinDriver}) access to
+ * the runtime components and configuration that they can use to fulfil their 
task.
+ *
+ * @param <S> The UDF type.
+ * @param <OT> The produced data type.
+ *
+ * @see Driver
+ */
+public interface TaskContext<S, OT> {
+       
+       TaskConfig getTaskConfig();
+       
+       TaskManagerRuntimeInfo getTaskManagerInfo();
+
+       ClassLoader getUserCodeClassLoader();
+       
+       MemoryManager getMemoryManager();
+       
+       IOManager getIOManager();
+
+       <X> MutableObjectIterator<X> getInput(int index);
+       
+       <X> TypeSerializerFactory<X> getInputSerializer(int index);
+       
+       <X> TypeComparator<X> getDriverComparator(int index);
+       
+       S getStub();
+
+       ExecutionConfig getExecutionConfig();
+
+       Collector<OT> getOutputCollector();
+       
+       AbstractInvokable getOwningNepheleTask();
+       
+       String formatLogString(String message);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
index 098686c..4791761 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/UnionWithTempOperator.java
@@ -22,18 +22,18 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class UnionWithTempOperator<T> implements PactDriver<Function, T> {
+public class UnionWithTempOperator<T> implements Driver<Function, T> {
        
        private static final int CACHED_INPUT = 0;
        private static final int STREAMED_INPUT = 1;
        
-       private PactTaskContext<Function, T> taskContext;
+       private TaskContext<Function, T> taskContext;
        
        private volatile boolean running;
        
        
        @Override
-       public void setup(PactTaskContext<Function, T> context) {
+       public void setup(TaskContext<Function, T> context) {
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
index 4641fce..46ee41b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedAllReduceDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +41,7 @@ public class ChainedAllReduceDriver<IT> extends 
ChainedDriver<IT, IT> {
        @Override
        public void setup(AbstractInvokable parent) {
                @SuppressWarnings("unchecked")
-               final ReduceFunction<IT> red = 
RegularPactTask.instantiateUserCode(this.config, userCodeClassLoader, 
ReduceFunction.class);
+               final ReduceFunction<IT> red = 
BatchTask.instantiateUserCode(this.config, userCodeClassLoader, 
ReduceFunction.class);
                this.reducer = red;
                FunctionUtils.setFunctionRuntimeContext(red, 
getUdfRuntimeContext());
 
@@ -56,12 +56,12 @@ public class ChainedAllReduceDriver<IT> extends 
ChainedDriver<IT, IT> {
        @Override
        public void openTask() throws Exception {
                Configuration stubConfig = this.config.getStubParameters();
-               RegularPactTask.openUserCode(this.reducer, stubConfig);
+               BatchTask.openUserCode(this.reducer, stubConfig);
        }
 
        @Override
        public void closeTask() throws Exception {
-               RegularPactTask.closeUserCode(this.reducer);
+               BatchTask.closeUserCode(this.reducer);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
index 482103c..8900ed7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.GenericCollectorMap;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 @SuppressWarnings("deprecation")
 public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
@@ -35,7 +35,7 @@ public class ChainedCollectorMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        public void setup(AbstractInvokable parent) {
                @SuppressWarnings("unchecked")
                final GenericCollectorMap<IT, OT> mapper =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, GenericCollectorMap.class);
+                       BatchTask.instantiateUserCode(this.config, 
userCodeClassLoader, GenericCollectorMap.class);
                this.mapper = mapper;
                mapper.setRuntimeContext(getUdfRuntimeContext());
        }
@@ -43,12 +43,12 @@ public class ChainedCollectorMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        @Override
        public void openTask() throws Exception {
                Configuration stubConfig = this.config.getStubParameters();
-               RegularPactTask.openUserCode(this.mapper, stubConfig);
+               BatchTask.openUserCode(this.mapper, stubConfig);
        }
 
        @Override
        public void closeTask() throws Exception {
-               RegularPactTask.closeUserCode(this.mapper);
+               BatchTask.closeUserCode(this.mapper);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index ea6cfe3..6edeb84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
@@ -32,7 +32,7 @@ import org.apache.flink.util.Collector;
 import java.util.Map;
 
 /**
- * The interface to be implemented by drivers that do not run in an own pact 
task context, but are chained to other
+ * The interface to be implemented by drivers that do not run in an own task 
context, but are chained to other
  * tasks.
  */
 public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
@@ -63,8 +63,8 @@ public abstract class ChainedDriver<IT, OT> implements 
Collector<IT> {
 
                Environment env = parent.getEnvironment();
 
-               if (parent instanceof RegularPactTask) {
-                       this.udfContext = ((RegularPactTask<?, ?>) 
parent).createRuntimeContext(taskName);
+               if (parent instanceof BatchTask) {
+                       this.udfContext = ((BatchTask<?, ?>) 
parent).createRuntimeContext(taskName);
                } else {
                        this.udfContext = new 
DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(),
                                        env.getIndexInSubtaskGroup(), 
userCodeClassLoader, parent.getExecutionConfig(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
index bc3b6a1..f51cb68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedFlatMapDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 public class ChainedFlatMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
@@ -36,7 +36,7 @@ public class ChainedFlatMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        public void setup(AbstractInvokable parent) {
                @SuppressWarnings("unchecked")
                final FlatMapFunction<IT, OT> mapper =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, FlatMapFunction.class);
+                       BatchTask.instantiateUserCode(this.config, 
userCodeClassLoader, FlatMapFunction.class);
                this.mapper = mapper;
                FunctionUtils.setFunctionRuntimeContext(mapper, 
getUdfRuntimeContext());
        }
@@ -44,12 +44,12 @@ public class ChainedFlatMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        @Override
        public void openTask() throws Exception {
                Configuration stubConfig = this.config.getStubParameters();
-               RegularPactTask.openUserCode(this.mapper, stubConfig);
+               BatchTask.openUserCode(this.mapper, stubConfig);
        }
 
        @Override
        public void closeTask() throws Exception {
-               RegularPactTask.closeUserCode(this.mapper);
+               BatchTask.closeUserCode(this.mapper);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
index db192df..9b888f2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
@@ -36,7 +36,7 @@ public class ChainedMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        public void setup(AbstractInvokable parent) {
                @SuppressWarnings("unchecked")
                final MapFunction<IT, OT> mapper =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, MapFunction.class);
+                       BatchTask.instantiateUserCode(this.config, 
userCodeClassLoader, MapFunction.class);
                this.mapper = mapper;
                FunctionUtils.setFunctionRuntimeContext(mapper, 
getUdfRuntimeContext());
        }
@@ -44,12 +44,12 @@ public class ChainedMapDriver<IT, OT> extends 
ChainedDriver<IT, OT> {
        @Override
        public void openTask() throws Exception {
                Configuration stubConfig = this.config.getStubParameters();
-               RegularPactTask.openUserCode(this.mapper, stubConfig);
+               BatchTask.openUserCode(this.mapper, stubConfig);
        }
 
        @Override
        public void closeTask() throws Exception {
-               RegularPactTask.closeUserCode(this.mapper);
+               BatchTask.closeUserCode(this.mapper);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
index cf0fc85..4a04fb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/GroupCombineChainedDriver.java
@@ -29,7 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
 import org.apache.flink.runtime.operators.sort.InMemorySorter;
 import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
@@ -87,7 +87,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends 
ChainedDriver<IN, OUT> {
 
                @SuppressWarnings("unchecked")
                final GroupReduceFunction<IN, OUT> combiner =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, GroupReduceFunction.class);
+                       BatchTask.instantiateUserCode(this.config, 
userCodeClassLoader, GroupReduceFunction.class);
                this.reducer = combiner;
                FunctionUtils.setFunctionRuntimeContext(combiner, 
getUdfRuntimeContext());
        }
@@ -96,7 +96,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends 
ChainedDriver<IN, OUT> {
        public void openTask() throws Exception {
                // open the stub first
                final Configuration stubConfig = 
this.config.getStubParameters();
-               RegularPactTask.openUserCode(this.reducer, stubConfig);
+               BatchTask.openUserCode(this.reducer, stubConfig);
 
                // ----------------- Set up the sorter -------------------------
 
@@ -135,7 +135,7 @@ public class GroupCombineChainedDriver<IN, OUT> extends 
ChainedDriver<IN, OUT> {
                
this.parent.getEnvironment().getMemoryManager().release(this.memory);
 
                if (this.running) {
-                       RegularPactTask.closeUserCode(this.reducer);
+                       BatchTask.closeUserCode(this.reducer);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index da9698c..408abc2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
 import org.apache.flink.runtime.operators.sort.InMemorySorter;
 import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
@@ -87,7 +87,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends 
ChainedDriver<IN,
 
                @SuppressWarnings("unchecked")
                final GroupCombineFunction<IN, OUT> combiner =
-                       RegularPactTask.instantiateUserCode(this.config, 
userCodeClassLoader, GroupCombineFunction.class);
+                       BatchTask.instantiateUserCode(this.config, 
userCodeClassLoader, GroupCombineFunction.class);
                this.combiner = combiner;
                FunctionUtils.setFunctionRuntimeContext(combiner, 
getUdfRuntimeContext());
        }
@@ -96,7 +96,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> extends 
ChainedDriver<IN,
        public void openTask() throws Exception {
                // open the stub first
                final Configuration stubConfig = 
this.config.getStubParameters();
-               RegularPactTask.openUserCode(this.combiner, stubConfig);
+               BatchTask.openUserCode(this.combiner, stubConfig);
 
                // ----------------- Set up the sorter -------------------------
 
@@ -134,7 +134,7 @@ public class SynchronousChainedCombineDriver<IN, OUT> 
extends ChainedDriver<IN,
                
this.parent.getEnvironment().getMemoryManager().release(this.memory);
 
                if (this.running) {
-                       RegularPactTask.closeUserCode(this.combiner);
+                       BatchTask.closeUserCode(this.combiner);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 6c97097..0254c8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -48,7 +48,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.types.Value;
@@ -307,11 +307,11 @@ public class TaskConfig implements Serializable {
        //                                      Driver
        // 
--------------------------------------------------------------------------------------------
        
-       public void setDriver(@SuppressWarnings("rawtypes") Class<? extends 
PactDriver> driver) {
+       public void setDriver(@SuppressWarnings("rawtypes") Class<? extends 
Driver> driver) {
                this.config.setString(DRIVER_CLASS, driver.getName());
        }
        
-       public <S extends Function, OT> Class<? extends PactDriver<S, OT>> 
getDriver() {
+       public <S extends Function, OT> Class<? extends Driver<S, OT>> 
getDriver() {
                final String className = this.config.getString(DRIVER_CLASS, 
null);
                if (className == null) {
                        throw new CorruptConfigurationException("The pact 
driver class is missing.");
@@ -319,7 +319,7 @@ public class TaskConfig implements Serializable {
                
                try {
                        @SuppressWarnings("unchecked")
-                       final Class<PactDriver<S, OT>> pdClazz = 
(Class<PactDriver<S, OT>>) (Class<?>) PactDriver.class;
+                       final Class<Driver<S, OT>> pdClazz = (Class<Driver<S, 
OT>>) (Class<?>) Driver.class;
                        return Class.forName(className).asSubclass(pdClazz);
                } catch (ClassNotFoundException cnfex) {
                        throw new CorruptConfigurationException("The given 
driver class cannot be found.");

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 3a36fe8..58755f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
@@ -47,7 +47,7 @@ public class TaskDeploymentDescriptorTest {
                        final int currentNumberOfSubtasks = 1;
                        final Configuration jobConfiguration = new 
Configuration();
                        final Configuration taskConfiguration = new 
Configuration();
-                       final Class<? extends AbstractInvokable> invokableClass 
= RegularPactTask.class;
+                       final Class<? extends AbstractInvokable> invokableClass 
= BatchTask.class;
                        final List<ResultPartitionDeploymentDescriptor> 
producedResults = new ArrayList<ResultPartitionDeploymentDescriptor>(0);
                        final List<InputGateDeploymentDescriptor> inputGates = 
new ArrayList<InputGateDeploymentDescriptor>(0);
                        final List<BlobKey> requiredJars = new 
ArrayList<BlobKey>(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index e3fc852..bea7c22 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
@@ -69,10 +69,10 @@ public class ExecutionGraphDeploymentTest {
                        v3.setParallelism(10);
                        v4.setParallelism(10);
 
-                       v1.setInvokableClass(RegularPactTask.class);
-                       v2.setInvokableClass(RegularPactTask.class);
-                       v3.setInvokableClass(RegularPactTask.class);
-                       v4.setInvokableClass(RegularPactTask.class);
+                       v1.setInvokableClass(BatchTask.class);
+                       v2.setInvokableClass(BatchTask.class);
+                       v3.setInvokableClass(BatchTask.class);
+                       v4.setInvokableClass(BatchTask.class);
 
                        v2.connectNewDataSetAsInput(v1, 
DistributionPattern.ALL_TO_ALL);
                        v3.connectNewDataSetAsInput(v2, 
DistributionPattern.ALL_TO_ALL);
@@ -111,7 +111,7 @@ public class ExecutionGraphDeploymentTest {
                        assertEquals(jid2, descr.getVertexID());
                        assertEquals(3, descr.getIndexInSubtaskGroup());
                        assertEquals(10, descr.getNumberOfSubtasks());
-                       assertEquals(RegularPactTask.class.getName(), 
descr.getInvokableClassName());
+                       assertEquals(BatchTask.class.getName(), 
descr.getInvokableClassName());
                        assertEquals("v2", descr.getTaskName());
 
                        List<ResultPartitionDeploymentDescriptor> 
producedPartitions = descr.getProducedPartitions();
@@ -276,8 +276,8 @@ public class ExecutionGraphDeploymentTest {
                v1.setParallelism(dop1);
                v2.setParallelism(dop2);
 
-               v1.setInvokableClass(RegularPactTask.class);
-               v2.setInvokableClass(RegularPactTask.class);
+               v1.setInvokableClass(BatchTask.class);
+               v2.setInvokableClass(BatchTask.class);
 
                // execution graph that executes actions synchronously
                ExecutionGraph eg = new ExecutionGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 88a71c4..1f19699 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub;
 import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -102,8 +102,8 @@ public class ChainTaskTest extends TaskTestBase {
                        
                        // chained map+combine
                        {
-                               RegularPactTask<GenericCollectorMap<Record, 
Record>, Record> testTask = 
-                                                                               
        new RegularPactTask<GenericCollectorMap<Record, Record>, Record>();
+                               BatchTask<GenericCollectorMap<Record, Record>, 
Record> testTask =
+                                                                               
        new BatchTask<GenericCollectorMap<Record, Record>, Record>();
                                registerTask(testTask, 
CollectorMapDriver.class, MockMapStub.class);
                                
                                try {
@@ -163,8 +163,8 @@ public class ChainTaskTest extends TaskTestBase {
                        
                        // chained map+combine
                        {
-                               final 
RegularPactTask<GenericCollectorMap<Record, Record>, Record> testTask = 
-                                                                               
        new RegularPactTask<GenericCollectorMap<Record, Record>, Record>();
+                               final BatchTask<GenericCollectorMap<Record, 
Record>, Record> testTask =
+                                                                               
        new BatchTask<GenericCollectorMap<Record, Record>, Record>();
                                
                                super.registerTask(testTask, 
CollectorMapDriver.class, MockMapStub.class);
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 0a02f30..9be957a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -29,14 +29,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
+public class TestTaskContext<S, T> implements TaskContext<S, T> {
        
        private final AbstractInvokable owner = new DummyInvokable();
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 5136aea..7043a63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -51,7 +51,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends 
TestLogger implements PactTaskContext<S, OUT> {
+public class BinaryOperatorTestBase<S extends Function, IN, OUT> extends 
TestLogger implements TaskContext<S, OUT> {
        
        protected static final int PAGE_SIZE = 32 * 1024;
        
@@ -81,7 +81,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLog
        
        private S stub;
        
-       private PactDriver<S, IN> driver;
+       private Driver<S, IN> driver;
        
        private volatile boolean running = true;
        
@@ -176,12 +176,12 @@ public class BinaryOperatorTestBase<S extends Function, 
IN, OUT> extends TestLog
        }
        
        @SuppressWarnings("rawtypes")
-       public void testDriver(PactDriver driver, Class stubClass) throws 
Exception {
+       public void testDriver(Driver driver, Class stubClass) throws Exception 
{
                testDriverInternal(driver, stubClass);
        }
        
        @SuppressWarnings({"unchecked", "rawtypes"})
-       public void testDriverInternal(PactDriver driver, Class stubClass) 
throws Exception {
+       public void testDriverInternal(Driver driver, Class stubClass) throws 
Exception {
                
                this.driver = driver;
                driver.setup(this);
@@ -232,8 +232,8 @@ public class BinaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLog
                        }
                        
                        // if resettable driver invoke tear down
-                       if (this.driver instanceof ResettablePactDriver) {
-                               final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
+                       if (this.driver instanceof ResettableDriver) {
+                               final ResettableDriver<?, ?> resDriver = 
(ResettableDriver<?, ?>) this.driver;
                                try {
                                        resDriver.teardown();
                                } catch (Throwable t) {
@@ -252,7 +252,7 @@ public class BinaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLog
        }
        
        @SuppressWarnings({"unchecked", "rawtypes"})
-       public void testResettableDriver(ResettablePactDriver driver, Class 
stubClass, int iterations) throws Exception {
+       public void testResettableDriver(ResettableDriver driver, Class 
stubClass, int iterations) throws Exception {
                driver.setup(this);
                
                for (int i = 0; i < iterations; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 116fdec..c442940 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -38,9 +39,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Record;
@@ -51,7 +51,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class DriverTestBase<S extends Function> extends TestLogger implements 
PactTaskContext<S, Record> {
+public class DriverTestBase<S extends Function> extends TestLogger implements 
TaskContext<S, Record> {
        
        protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
        
@@ -83,7 +83,7 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
        
        private S stub;
        
-       private PactDriver<S, Record> driver;
+       private Driver<S, Record> driver;
        
        private volatile boolean running = true;
 
@@ -168,12 +168,12 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
        }
 
        @SuppressWarnings("rawtypes")
-       public void testDriver(PactDriver driver, Class stubClass) throws 
Exception {
+       public void testDriver(Driver driver, Class stubClass) throws Exception 
{
                testDriverInternal(driver, stubClass);
        }
 
        @SuppressWarnings({"unchecked","rawtypes"})
-       public void testDriverInternal(PactDriver driver, Class stubClass) 
throws Exception {
+       public void testDriverInternal(Driver driver, Class stubClass) throws 
Exception {
 
                this.driver = driver;
                driver.setup(this);
@@ -226,8 +226,8 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
                        }
 
                        // if resettable driver invoke tear down
-                       if (this.driver instanceof ResettablePactDriver) {
-                               final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
+                       if (this.driver instanceof ResettableDriver) {
+                               final ResettableDriver<?, ?> resDriver = 
(ResettableDriver<?, ?>) this.driver;
                                try {
                                        resDriver.teardown();
                                } catch (Throwable t) {
@@ -247,7 +247,7 @@ public class DriverTestBase<S extends Function> extends 
TestLogger implements Pa
        }
 
        @SuppressWarnings({"unchecked","rawtypes"})
-       public void testResettableDriver(ResettablePactDriver driver, Class 
stubClass, int iterations) throws Exception {
+       public void testResettableDriver(ResettableDriver driver, Class 
stubClass, int iterations) throws Exception {
 
                driver.setup(this);
                

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 4662762..777bfc8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -30,7 +30,7 @@ import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Record;
@@ -89,7 +89,7 @@ public abstract class TaskTestBase extends TestLogger {
        }
 
        public void registerTask(AbstractInvokable task, 
-                                                               
@SuppressWarnings("rawtypes") Class<? extends PactDriver> driver,
+                                                               
@SuppressWarnings("rawtypes") Class<? extends Driver> driver,
                                                                Class<? extends 
RichFunction> stubClass) {
                
                final TaskConfig config = new 
TaskConfig(this.mockEnv.getTaskConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index e2b2430..886c881 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.TaskContext;
+import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -51,7 +51,7 @@ import java.util.Collection;
 import java.util.List;
 
 @RunWith(Parameterized.class)
-public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends 
TestLogger implements PactTaskContext<S, OUT> {
+public class UnaryOperatorTestBase<S extends Function, IN, OUT> extends 
TestLogger implements TaskContext<S, OUT> {
        
        protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
        
@@ -85,7 +85,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLogg
        
        private S stub;
        
-       private PactDriver<S, OUT> driver;
+       private Driver<S, OUT> driver;
        
        private volatile boolean running;
 
@@ -170,12 +170,12 @@ public class UnaryOperatorTestBase<S extends Function, 
IN, OUT> extends TestLogg
        }
 
        @SuppressWarnings("rawtypes")
-       public void testDriver(PactDriver driver, Class stubClass) throws 
Exception {
+       public void testDriver(Driver driver, Class stubClass) throws Exception 
{
                testDriverInternal(driver, stubClass);
        }
 
        @SuppressWarnings({"unchecked","rawtypes"})
-       public void testDriverInternal(PactDriver driver, Class stubClass) 
throws Exception {
+       public void testDriverInternal(Driver driver, Class stubClass) throws 
Exception {
 
                this.driver = driver;
                driver.setup(this);
@@ -227,8 +227,8 @@ public class UnaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLogg
                        }
 
                        // if resettable driver invoke tear-down
-                       if (this.driver instanceof ResettablePactDriver) {
-                               final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
+                       if (this.driver instanceof ResettableDriver) {
+                               final ResettableDriver<?, ?> resDriver = 
(ResettableDriver<?, ?>) this.driver;
                                try {
                                        resDriver.teardown();
                                } catch (Throwable t) {
@@ -248,7 +248,7 @@ public class UnaryOperatorTestBase<S extends Function, IN, 
OUT> extends TestLogg
        }
 
        @SuppressWarnings({"unchecked","rawtypes"})
-       public void testResettableDriver(ResettablePactDriver driver, Class 
stubClass, int iterations) throws Exception {
+       public void testResettableDriver(ResettableDriver driver, Class 
stubClass, int iterations) throws Exception {
                driver.setup(this);
                
                for (int i = 0; i < iterations; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/logback-test.xml 
b/flink-runtime/src/test/resources/logback-test.xml
index 17f7020..1d64d46 100644
--- a/flink-runtime/src/test/resources/logback-test.xml
+++ b/flink-runtime/src/test/resources/logback-test.xml
@@ -31,7 +31,7 @@
          throw error to test failing scenarios. Logging those would overflow 
the log. -->
          <!---->
     <logger name="org.apache.flink.runtime.operators.DataSinkTask" 
level="OFF"/>
-    <logger name="org.apache.flink.runtime.operators.RegularPactTask" 
level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
     <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-ml/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/resources/logback-test.xml 
b/flink-staging/flink-ml/src/test/resources/logback-test.xml
index 17f7020..1d64d46 100644
--- a/flink-staging/flink-ml/src/test/resources/logback-test.xml
+++ b/flink-staging/flink-ml/src/test/resources/logback-test.xml
@@ -31,7 +31,7 @@
          throw error to test failing scenarios. Logging those would overflow 
the log. -->
          <!---->
     <logger name="org.apache.flink.runtime.operators.DataSinkTask" 
level="OFF"/>
-    <logger name="org.apache.flink.runtime.operators.RegularPactTask" 
level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
     <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
index b117bab..287129d 100644
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
+++ 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
 import org.apache.flink.tez.util.EncodingUtils;
 import org.apache.flink.util.InstantiationUtil;
@@ -93,8 +93,8 @@ public class RegularProcessor<S extends Function, OT> extends 
AbstractLogicalIOP
 
                this.inputs = inputs;
                this.outputs = outputs;
-               final Class<? extends PactDriver<S, OT>> driverClass = 
this.task.getTaskConfig().getDriver();
-               PactDriver<S,OT> driver = 
InstantiationUtil.instantiate(driverClass, PactDriver.class);
+               final Class<? extends Driver<S, OT>> driverClass = 
this.task.getTaskConfig().getDriver();
+               Driver<S,OT> driver = 
InstantiationUtil.instantiate(driverClass, Driver.class);
                this.numInputs = driver.getNumberOfInputs();
                this.numOutputs = outputs.size();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
index b7cbfb4..89e4642 100644
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
+++ 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezTask.java
@@ -36,8 +36,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -64,7 +64,7 @@ import java.util.Arrays;
 import java.util.List;
 
 
-public class TezTask<S extends Function,OT>  implements PactTaskContext<S, OT> 
{
+public class TezTask<S extends Function,OT>  implements TaskContext<S, OT> {
 
        protected static final Log LOG = LogFactory.getLog(TezTask.class);
 
@@ -74,7 +74,7 @@ public class TezTask<S extends Function,OT>  implements 
PactTaskContext<S, OT> {
         * The driver that invokes the user code (the stub implementation). The 
central driver in this task
         * (further drivers may be chained behind this driver).
         */
-       protected volatile PactDriver<S, OT> driver;
+       protected volatile Driver<S, OT> driver;
 
        /**
         * The instantiated user code of this task's main operator (driver). 
May be null if the operator has no udf.
@@ -150,8 +150,8 @@ public class TezTask<S extends Function,OT>  implements 
PactTaskContext<S, OT> {
 
        public TezTask(TezTaskConfig config, RuntimeUDFContext 
runtimeUdfContext, long availableMemory) {
                this.config = config;
-               final Class<? extends PactDriver<S, OT>> driverClass = 
this.config.getDriver();
-               this.driver = InstantiationUtil.instantiate(driverClass, 
PactDriver.class);
+               final Class<? extends Driver<S, OT>> driverClass = 
this.config.getDriver();
+               this.driver = InstantiationUtil.instantiate(driverClass, 
Driver.class);
                
                LOG.info("ClassLoader URLs: " + 
Arrays.toString(((URLClassLoader) this.userCodeClassLoader).getURLs()));
                
@@ -244,7 +244,7 @@ public class TezTask<S extends Function,OT>  implements 
PactTaskContext<S, OT> {
 
 
        // --------------------------------------------------------------------
-       // PactTaskContext interface
+       // TaskContext interface
        // --------------------------------------------------------------------
 
        @Override
@@ -356,7 +356,7 @@ public class TezTask<S extends Function,OT>  implements 
PactTaskContext<S, OT> {
 
 
        // --------------------------------------------------------------------
-       // Adapted from RegularPactTask
+       // Adapted from BatchTask
        // --------------------------------------------------------------------
 
        private void initInputLocalStrategy(int inputNum) throws Exception {
@@ -402,7 +402,7 @@ public class TezTask<S extends Function,OT>  implements 
PactTaskContext<S, OT> {
                                                localStub = 
initStub(userCodeFunctionType);
                                        } catch (Exception e) {
                                                throw new 
RuntimeException("Initializing the user code and the configuration failed" +
-                                                               e.getMessage() 
== null ? "." : ": " + e.getMessage(), e);
+                                                               (e.getMessage() 
== null ? "." : ": " + e.getMessage()), e);
                                        }
 
                                        if (!(localStub instanceof 
GroupCombineFunction)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-staging/flink-tez/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/test/resources/logback-test.xml 
b/flink-staging/flink-tez/src/test/resources/logback-test.xml
index 9c2e75f..48e4374 100644
--- a/flink-staging/flink-tez/src/test/resources/logback-test.xml
+++ b/flink-staging/flink-tez/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
         <appender-ref ref="STDOUT"/>
     </root>
 
-    <!--<logger name="org.apache.flink.runtime.operators.RegularPactTask" 
level="OFF"/>-->
+    <!--<logger name="org.apache.flink.runtime.operators.BatchTask" 
level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.client.JobClient" 
level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.taskmanager.Task" 
level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" 
level="OFF"/>-->

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-tests/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/logback-test.xml 
b/flink-tests/src/test/resources/logback-test.xml
index 9c2e75f..48e4374 100644
--- a/flink-tests/src/test/resources/logback-test.xml
+++ b/flink-tests/src/test/resources/logback-test.xml
@@ -27,7 +27,7 @@
         <appender-ref ref="STDOUT"/>
     </root>
 
-    <!--<logger name="org.apache.flink.runtime.operators.RegularPactTask" 
level="OFF"/>-->
+    <!--<logger name="org.apache.flink.runtime.operators.BatchTask" 
level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.client.JobClient" 
level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.taskmanager.Task" 
level="OFF"/>-->
     <!--<logger name="org.apache.flink.runtime.jobmanager.JobManager" 
level="OFF"/>-->

Reply via email to