http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
index 3cf081f..d6b9444 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
index f6885c5..3af64fc 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReduceAllTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.optimizer.util.DummyInputFormat;
 import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityReduce;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
index 230cc6b..25643a4 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index 1fe16bb..3a24ce1 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.util.Visitor;
 import org.junit.Assert;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
index 40b54e0..65e5025 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SortPartialReuseTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
index 92b4fc5..1001626 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionBetweenDynamicAndStaticPathTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
index 5d15ed8..2e52565 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionPropertyPropagationTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Visitor;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index 1e4124c..e81e0ec 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
index 80c0bda..321ca5a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationCornerCasesTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
index 6e7c0a3..27f367f 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/WorksetIterationsRecordApiCompilerTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.optimizer.util.DummyOutputFormat;
 import org.apache.flink.optimizer.util.IdentityMap;
 import org.apache.flink.optimizer.util.IdentityReduce;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.types.LongValue;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
index 0273659..b4e95fb 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/BinaryCustomPartitioningCompatibilityTest.java
@@ -26,13 +26,13 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
index 08f7388..d52181d 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CoGroupCustomPartitioningTest.java
@@ -29,13 +29,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
index 9fd676f..5758c86 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -28,12 +28,12 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
index f865a9f..00fd587 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/CustomPartitioningTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityPartitionerMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
index 360487b..0408ca9 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingKeySelectorTranslationTest.java
@@ -30,12 +30,12 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
index 8cd4809..74e5c8c 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingPojoTranslationTest.java
@@ -26,12 +26,12 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
index 779b8e5..72fb81b 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/GroupingTupleTranslationTest.java
@@ -29,12 +29,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
index eae40cf..8eedee1 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/custompartition/JoinCustomPartitioningTest.java
@@ -30,13 +30,13 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
index cb4bd78..5f69336 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeClosedBranchingTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -35,6 +34,7 @@ import 
org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.optimizer.testfunctions.IdentityFlatMapper;
 import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
 import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
index 5175d8c..13ec51a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeForwardTest.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
index 6b2691a..99f8c81 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataexchange/DataExchangeModeOpenBranchingTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
index b359e6b..ab83dba 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DeltaIterationDependenciesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.fail;
 
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 import org.apache.flink.api.common.Plan;
@@ -28,7 +29,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
 
 
 @SuppressWarnings({"serial", "unchecked"})

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
index 2f9b32f..96758b1 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/DistinctAndGroupingOptimizerTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.optimizer.java;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
index c0e2fa7..1bd4b8a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/GroupReduceCompilationTest.java
@@ -24,12 +24,12 @@ import 
org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
index 57d2d54..796d4ab 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/IterationCompilerTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
 import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -36,6 +35,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
index 0a62132..14d863d 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/JoinTranslationTest.java
@@ -27,10 +27,10 @@ import 
org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Visitor;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
index cd63b72..3f18e62 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/OpenIterationTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.testfunctions.IdentityMapper;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
index 8bb9a76..95ee4de 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/PartitionOperatorTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index e1b18f9..4197abb 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
index f1c2233..46eb48a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/WorksetIterationsJavaApiCompilerTest.java
@@ -33,11 +33,11 @@ import 
org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
index e7807c9..fb7a80f 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupOnConflictingPartitioningsTest.java
@@ -25,10 +25,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings({"serial", "unchecked"})

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
index 9171cc7..8a4786f 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinOnConflictingPartitioningsTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.CompilerTestBase;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 
 @SuppressWarnings({"serial", "unchecked"})

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java
new file mode 100644
index 0000000..35c50d3
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/CompilerTestBase.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Visitor;
+import org.junit.Before;
+
+/**
+ * Base class for Optimizer tests. Offers utility methods to trigger 
optimization
+ * of a program and to fetch the nodes in an optimizer plan that correspond
+ * the the node in the program plan.
+ */
+public abstract class CompilerTestBase implements java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       protected static final String IN_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/random";
+
+       protected static final String OUT_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/null";
+
+       protected static final int DEFAULT_PARALLELISM = 8;
+
+       protected static final String DEFAULT_PARALLELISM_STRING = 
String.valueOf(DEFAULT_PARALLELISM);
+
+       private static final String CACHE_KEY = "cachekey";
+
+       // 
------------------------------------------------------------------------
+
+       protected transient DataStatistics dataStats;
+
+       protected transient Optimizer withStatsCompiler;
+
+       protected transient Optimizer noStatsCompiler;
+
+       private transient int statCounter;
+
+       // 
------------------------------------------------------------------------
+
+       @Before
+       public void setup() {
+               Configuration flinkConf = new Configuration();
+               this.dataStats = new DataStatistics();
+               this.withStatsCompiler = new Optimizer(this.dataStats, new 
DefaultCostEstimator(), flinkConf);
+               
this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
+
+               this.noStatsCompiler = new Optimizer(null, new 
DefaultCostEstimator(), flinkConf);
+               this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public OptimizedPlan compileWithStats(Plan p) {
+               return this.withStatsCompiler.compile(p);
+       }
+
+       public OptimizedPlan compileNoStats(Plan p) {
+               return this.noStatsCompiler.compile(p);
+       }
+
+       public static OperatorResolver getContractResolver(Plan plan) {
+               return new OperatorResolver(plan);
+       }
+
+       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
long size, float recordWidth) {
+               setSourceStatistics(source, new 
FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
+       }
+
+       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
FileBaseStatistics stats) {
+               final String key = CACHE_KEY + this.statCounter++;
+               this.dataStats.cacheBaseStatistics(stats, key);
+               source.setStatisticsKey(key);
+       }
+
+
+       public static OptimizerPlanNodeResolver 
getOptimizerPlanNodeResolver(OptimizedPlan plan) {
+               return new OptimizerPlanNodeResolver(plan);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static final class OptimizerPlanNodeResolver {
+
+               private final Map<String, ArrayList<PlanNode>> map;
+
+               public OptimizerPlanNodeResolver(OptimizedPlan p) {
+                       HashMap<String, ArrayList<PlanNode>> map = new 
HashMap<String, ArrayList<PlanNode>>();
+
+                       for (PlanNode n : p.getAllNodes()) {
+                               Operator<?> c = 
n.getOriginalOptimizerNode().getOperator();
+                               String name = c.getName();
+
+                               ArrayList<PlanNode> list = map.get(name);
+                               if (list == null) {
+                                       list = new ArrayList<PlanNode>(2);
+                                       map.put(name, list);
+                               }
+
+                               // check whether this node is a child of a node 
with the same contract (aka combiner)
+                               boolean shouldAdd = true;
+                               for (Iterator<PlanNode> iter = list.iterator(); 
iter.hasNext();) {
+                                       PlanNode in = iter.next();
+                                       if 
(in.getOriginalOptimizerNode().getOperator() == c) {
+                                               // is this the child or is our 
node the child
+                                               if (in instanceof 
SingleInputPlanNode && n instanceof SingleInputPlanNode) {
+                                                       SingleInputPlanNode 
thisNode = (SingleInputPlanNode) n;
+                                                       SingleInputPlanNode 
otherNode = (SingleInputPlanNode) in;
+
+                                                       if 
(thisNode.getPredecessor() == otherNode) {
+                                                               // other node 
is child, remove it
+                                                               iter.remove();
+                                                       } else if 
(otherNode.getPredecessor() == thisNode) {
+                                                               shouldAdd = 
false;
+                                                       }
+                                               } else {
+                                                       throw new 
RuntimeException("Unrecodnized case in test.");
+                                               }
+                                       }
+                               }
+
+                               if (shouldAdd) {
+                                       list.add(n);
+                               }
+                       }
+
+                       this.map = map;
+               }
+
+
+               @SuppressWarnings("unchecked")
+               public <T extends PlanNode> T getNode(String name) {
+                       List<PlanNode> nodes = this.map.get(name);
+                       if (nodes == null || nodes.isEmpty()) {
+                               throw new RuntimeException("No node found with 
the given name.");
+                       } else if (nodes.size() != 1) {
+                               throw new RuntimeException("Multiple nodes 
found with the given name.");
+                       } else {
+                               return (T) nodes.get(0);
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               public <T extends PlanNode> T getNode(String name, Class<? 
extends Function> stubClass) {
+                       List<PlanNode> nodes = this.map.get(name);
+                       if (nodes == null || nodes.isEmpty()) {
+                               throw new RuntimeException("No node found with 
the given name and stub class.");
+                       } else {
+                               PlanNode found = null;
+                               for (PlanNode node : nodes) {
+                                       if (node.getClass() == stubClass) {
+                                               if (found == null) {
+                                                       found = node;
+                                               } else {
+                                                       throw new 
RuntimeException("Multiple nodes found with the given name and stub class.");
+                                               }
+                                       }
+                               }
+                               if (found == null) {
+                                       throw new RuntimeException("No node 
found with the given name and stub class.");
+                               } else {
+                                       return (T) found;
+                               }
+                       }
+               }
+
+               public List<PlanNode> getNodes(String name) {
+                       List<PlanNode> nodes = this.map.get(name);
+                       if (nodes == null || nodes.isEmpty()) {
+                               throw new RuntimeException("No node found with 
the given name.");
+                       } else {
+                               return new ArrayList<PlanNode>(nodes);
+                       }
+               }
+       }
+
+       /**
+        * Collects all DataSources of a plan to add statistics
+        *
+        */
+       public static class SourceCollectorVisitor implements 
Visitor<Operator<?>> {
+
+               protected final List<GenericDataSourceBase<?, ?>> sources = new 
ArrayList<GenericDataSourceBase<?, ?>>(4);
+
+               @Override
+               public boolean preVisit(Operator<?> visitable) {
+
+                       if(visitable instanceof GenericDataSourceBase) {
+                               sources.add((GenericDataSourceBase<?, ?>) 
visitable);
+                       }
+                       else if(visitable instanceof BulkIterationBase) {
+                               ((BulkIterationBase<?>) 
visitable).getNextPartialSolution().accept(this);
+                       }
+
+                       return true;
+               }
+
+               @Override
+               public void postVisit(Operator<?> visitable) {}
+
+               public List<GenericDataSourceBase<?, ?>> getSources() {
+                       return this.sources;
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
new file mode 100644
index 0000000..920b713
--- /dev/null
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/util/OperatorResolver.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.java.record.operators.BulkIteration;
+import org.apache.flink.api.java.record.operators.DeltaIteration;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Utility to get operator instances from plans via name.
+ */
+@SuppressWarnings("deprecation")
+public class OperatorResolver implements Visitor<Operator<?>> {
+       
+       private final Map<String, List<Operator<?>>> map;
+       private Set<Operator<?>> seen;
+       
+       public OperatorResolver(Plan p) {
+               this.map = new HashMap<String, List<Operator<?>>>();
+               this.seen = new HashSet<Operator<?>>();
+               
+               p.accept(this);
+               this.seen = null;
+       }
+       
+       
+       @SuppressWarnings("unchecked")
+       public <T extends Operator<?>> T getNode(String name) {
+               List<Operator<?>> nodes = this.map.get(name);
+               if (nodes == null || nodes.isEmpty()) {
+                       throw new RuntimeException("No nodes found with the 
given name.");
+               } else if (nodes.size() != 1) {
+                       throw new RuntimeException("Multiple nodes found with 
the given name.");
+               } else {
+                       return (T) nodes.get(0);
+               }
+       }
+       
+       @SuppressWarnings("unchecked")
+       public <T extends Operator<?>> T getNode(String name, Class<? extends 
RichFunction> stubClass) {
+               List<Operator<?>> nodes = this.map.get(name);
+               if (nodes == null || nodes.isEmpty()) {
+                       throw new RuntimeException("No node found with the 
given name and stub class.");
+               } else {
+                       Operator<?> found = null;
+                       for (Operator<?> node : nodes) {
+                               if (node.getClass() == stubClass) {
+                                       if (found == null) {
+                                               found = node;
+                                       } else {
+                                               throw new 
RuntimeException("Multiple nodes found with the given name and stub class.");
+                                       }
+                               }
+                       }
+                       if (found == null) {
+                               throw new RuntimeException("No node found with 
the given name and stub class.");
+                       } else {
+                               return (T) found;
+                       }
+               }
+       }
+       
+       public List<Operator<?>> getNodes(String name) {
+               List<Operator<?>> nodes = this.map.get(name);
+               if (nodes == null || nodes.isEmpty()) {
+                       throw new RuntimeException("No node found with the 
given name.");
+               } else {
+                       return new ArrayList<Operator<?>>(nodes);
+               }
+       }
+
+       @Override
+       public boolean preVisit(Operator<?> visitable) {
+               if (this.seen.add(visitable)) {
+                       // add to  the map
+                       final String name = visitable.getName();
+                       List<Operator<?>> list = this.map.get(name);
+                       if (list == null) {
+                               list = new ArrayList<Operator<?>>(2);
+                               this.map.put(name, list);
+                       }
+                       list.add(visitable);
+                       
+                       // recurse into bulk iterations
+                       if (visitable instanceof BulkIteration) {
+                               ((BulkIteration) 
visitable).getNextPartialSolution().accept(this);
+                       } else if (visitable instanceof DeltaIteration) {
+                               ((DeltaIteration) 
visitable).getSolutionSetDelta().accept(this);
+                               ((DeltaIteration) 
visitable).getNextWorkset().accept(this);
+                       }
+                       
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       @Override
+       public void postVisit(Operator<?> visitable) {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 78b99ad..4f3b4b2 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -59,7 +59,7 @@ under the License.
                                <artifactId>maven-archetype-plugin</artifactId>
                                <version>2.2</version><!--$NO-MVN-MAN-VER$-->
                                <configuration>
-                                       <skip>${skipTests}</skip>
+                                       <skip>true</skip>
                                </configuration>
                        </plugin>
                        <!-- deactivate the shade plugin for the quickstart 
archetypes -->

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
index fd91d65..f345d6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index cb799c4..1c76e08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -32,7 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import static 
org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 78e9707..626d21f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index b2f11db..50b1f24 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -29,7 +29,7 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.security.MessageDigest;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 5db5ef6..69687da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.slf4j.Logger;
 
 import java.io.EOFException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
index 9f72db0..842870d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobCancellationException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * An exception which is thrown by the JobClient if a job is aborted as a 
result of a user

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
index 99c3f89..56ccef5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobExecutionException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This exception is the base exception for all exceptions that denote any 
failure during

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
index d2c81a5..c33ed8f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
index 3d672a5..3cb0b9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This exception denotes an error while submitting a job to the JobManager

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
index 10ef601..18c8c31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobTimeoutException.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.client;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * An exception which is thrown by the JobClient if the job manager is no 
longer reachable.

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index f7518bd..5d96903 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 556bb11..503a0b9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index c6270b2..3fb7493 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index d394e2d..848f619 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -35,7 +35,7 @@ import java.util.TimerTask;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 532107f..66bda45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 63d85b4..52a8048 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import java.io.File;
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d0615b3..5ce89b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ad72d13..acbc17a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 8dd341f..1b91089 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index b838aa4..3449100 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.IOUtils;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index e27b7ea..324629f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -27,7 +27,7 @@ import java.util.Set;
 
 import akka.actor.ActorRef;
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
 import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index eab4344..7bd70a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroupAssignment;
 
 import java.util.HashSet;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index c89b7f5..56be9d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 08d6441..bf8464c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.util.AbstractID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d490784..55b89b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnFailure;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -35,7 +36,6 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 92e27d3..88020a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
index 0ea3a1c..6e84b4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionConsumableNotifier.java
@@ -18,7 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+
+import org.apache.flink.api.common.JobID;
 
 public interface ResultPartitionConsumableNotifier {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f8f22b0..d43533b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java
deleted file mode 100644
index 7c8d365..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobID.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobgraph;
-
-import javax.xml.bind.DatatypeConverter;
-
-import org.apache.flink.util.AbstractID;
-
-import java.nio.ByteBuffer;
-
-public final class JobID extends AbstractID {
-
-       private static final long serialVersionUID = 1L;
-       
-       public JobID() {
-               super();
-       }
-
-       public JobID(long lowerPart, long upperPart) {
-               super(lowerPart, upperPart);
-       }
-
-       public JobID(byte[] bytes) {
-               super(bytes);
-       }
-
-       public static JobID generate() {
-               return new JobID();
-       }
-
-       public static JobID fromByteArray(byte[] bytes) {
-               return new JobID(bytes);
-       }
-
-       public static JobID fromByteBuffer(ByteBuffer buf) {
-               long lower = buf.getLong();
-               long upper = buf.getLong();
-               return new JobID(lower, upper);
-       }
-
-       public static JobID fromHexString(String hexString) {
-               return new JobID(DatatypeConverter.parseHexBinary(hexString));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
index 3130354..5bf8ebe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/accumulators/AccumulatorManager.java
@@ -24,7 +24,7 @@ import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This class manages the accumulators for different jobs. Either the jobs are

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index ffb0dd4..008be65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -56,7 +56,7 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.util.EnvironmentInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
index 8f16e4f..4c4a20e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/JobManagerProfiler.java
@@ -20,7 +20,7 @@
 package org.apache.flink.runtime.profiling;
 
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * This interface must be implemented by profiling components

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
index 85dedb7..d5ce137 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/EnvironmentThreadSet.java
@@ -25,7 +25,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.profiling.impl.types.InternalExecutionVertexThreadProfilingData;
 

Reply via email to