This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new a1f9502  [FLINK-15312][tests] Remove PlanExposingEnvironment
a1f9502 is described below

commit a1f950209e68221bb1d4bc25b61c501308bf5a9a
Author: tison <[email protected]>
AuthorDate: Thu Dec 19 20:15:41 2019 +0800

    [FLINK-15312][tests] Remove PlanExposingEnvironment
---
 .../optimizer/examples/KMeansSingleStepTest.java   |  26 +--
 .../examples/RelationalQueryCompilerTest.java      |  33 ++--
 .../iterations/ConnectedComponentsCoGroupTest.java |  23 +--
 .../optimizer/jsonplan/DumpCompiledPlanTest.java   | 179 +++++++--------------
 .../optimizer/jsonplan/PreviewPlanDumpTest.java    | 179 +++++++--------------
 .../apache/flink/test/planning/LargePlanTest.java  |  13 +-
 .../flink/test/util/PlanExposingEnvironment.java   |  59 -------
 7 files changed, 147 insertions(+), 365 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
index d5b103f..5e9a4da 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -40,7 +39,6 @@ import org.apache.flink.optimizer.util.OperatorResolver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.test.util.PlanExposingEnvironment;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -70,7 +68,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
        private final FieldList set0 = new FieldList(0);
 
        @Test
-       public void testCompileKMeansSingleStepWithStats() {
+       public void testCompileKMeansSingleStepWithStats() throws Exception {
 
                Plan p = getKMeansPlan();
                p.setExecutionConfig(new ExecutionConfig());
@@ -86,8 +84,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
        }
 
        @Test
-       public void testCompileKMeansSingleStepWithOutStats() {
-
+       public void testCompileKMeansSingleStepWithOutStats() throws Exception {
                Plan p = getKMeansPlan();
                p.setExecutionConfig(new ExecutionConfig());
                OptimizedPlan plan = compileNoStats(p);
@@ -141,22 +138,11 @@ public class KMeansSingleStepTest extends 
CompilerTestBase {
                assertEquals(LocalStrategy.NONE, 
sink.getInput().getLocalStrategy());
        }
 
-       public static Plan getKMeansPlan() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       kmeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("KMeans failed with an exception");
-               }
-               return env.getPlan();
+       public static Plan getKMeansPlan() throws Exception {
+               return kmeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
        }
 
-       public static void kmeans(String[] args) throws Exception {
+       public static Plan kmeans(String[] args) throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                DataSet<Point> points = env.readCsvFile(args[0])
@@ -191,7 +177,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 
                recomputeClusterCenter.project(0, 1).writeAsCsv(args[2], "\n", 
" ").name(SINK);
 
-               env.execute("KMeans Example");
+               return env.createProgramPlan("KMeans Example");
        }
 
        /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
index 84684a9..a35b520 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -45,7 +44,6 @@ import org.apache.flink.optimizer.util.OperatorResolver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.test.util.PlanExposingEnvironment;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -108,7 +106,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         * indicate this to be the smaller one.
         */
        @Test
-       public void testQueryAnyValidPlan() {
+       public void testQueryAnyValidPlan() throws Exception {
                testQueryGeneric(1024 * 1024 * 1024L, 8 * 1024 * 1024 * 1024L, 
0.05f, 0.05f, true, true, true, false, true);
        }
 
@@ -116,7 +114,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         * Verifies that the plan compiles in the presence of empty size=0 
estimates.
         */
        @Test
-       public void testQueryWithSizeZeroInputs() {
+       public void testQueryWithSizeZeroInputs() throws Exception {
                testQueryGeneric(0, 0, 0.1f, 0.5f, true, true, true, false, 
true);
        }
 
@@ -124,7 +122,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         * Statistics that push towards a broadcast join.
         */
        @Test
-       public void testQueryWithStatsForBroadcastHash() {
+       public void testQueryWithStatsForBroadcastHash() throws Exception {
                testQueryGeneric(1024L * 1024 * 1024 * 1024, 1024L * 1024 * 
1024 * 1024, 0.01f, 0.05f, true, false, true, false, false);
        }
 
@@ -132,7 +130,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         * Statistics that push towards a broadcast join.
         */
        @Test
-       public void testQueryWithStatsForRepartitionAny() {
+       public void testQueryWithStatsForRepartitionAny() throws Exception {
                testQueryGeneric(100L * 1024 * 1024 * 1024 * 1024, 100L * 1024 
* 1024 * 1024 * 1024, 0.1f, 0.5f, false, true, true, true, true);
        }
 
@@ -141,7 +139,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
         * re-exploiting the sorted order is cheaper.
         */
        @Test
-       public void testQueryWithStatsForRepartitionMerge() {
+       public void testQueryWithStatsForRepartitionMerge() throws Exception {
                Plan p = getTPCH3Plan();
                p.setExecutionConfig(defaultExecutionConfig);
                // set compiler hints
@@ -156,7 +154,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
        private void testQueryGeneric(long orderSize, long lineItemSize,
                        float ordersFilterFactor, float joinFilterFactor,
                        boolean broadcastOkay, boolean partitionedOkay,
-                       boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, 
boolean mergeJoinOkay) {
+                       boolean hashJoinFirstOkay, boolean hashJoinSecondOkay, 
boolean mergeJoinOkay) throws Exception {
                Plan p = getTPCH3Plan();
                p.setExecutionConfig(defaultExecutionConfig);
                testQueryGeneric(p, orderSize, lineItemSize, 
ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay, 
hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
@@ -347,22 +345,11 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
                }
        }
 
-       public static Plan getTPCH3Plan() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       tcph3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, 
IN_FILE, OUT_FILE});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("tcph3 failed with an exception");
-               }
-               return env.getPlan();
+       public static Plan getTPCH3Plan() throws Exception {
+               return tcph3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, 
IN_FILE, OUT_FILE});
        }
 
-       public static void tcph3(String[] args) throws Exception {
+       public static Plan tcph3(String[] args) throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(Integer.parseInt(args[0]));
 
@@ -388,7 +375,7 @@ public class RelationalQueryCompilerTest extends 
CompilerTestBase {
 
                aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
 
-               env.execute();
+               return env.createProgramPlan();
        }
 
        @ForwardedFields("f0; f4->f1")
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
index 3809b2b..33c1916 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -44,7 +43,6 @@ import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.test.util.PlanExposingEnvironment;
 import org.apache.flink.util.Collector;
 
 import org.junit.Assert;
@@ -71,7 +69,7 @@ public class ConnectedComponentsCoGroupTest extends 
CompilerTestBase {
        private final FieldList set0 = new FieldList(0);
 
        @Test
-       public void testWorksetConnectedComponents() {
+       public void testWorksetConnectedComponents() throws Exception {
                Plan plan = getConnectedComponentsCoGroupPlan();
                plan.setExecutionConfig(new ExecutionConfig());
                OptimizedPlan optPlan = compileNoStats(plan);
@@ -145,22 +143,11 @@ public class ConnectedComponentsCoGroupTest extends 
CompilerTestBase {
                jgg.compileJobGraph(optPlan);
        }
 
-       public static Plan getConnectedComponentsCoGroupPlan() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       connectedComponentsWithCoGroup(new 
String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
-               } catch (ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("connectedComponentsWithCoGroup failed with 
an exception");
-               }
-               return env.getPlan();
+       public static Plan getConnectedComponentsCoGroupPlan() throws Exception 
{
+               return connectedComponentsWithCoGroup(new 
String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
        }
 
-       public static void connectedComponentsWithCoGroup(String[] args) throws 
Exception {
+       public static Plan connectedComponentsWithCoGroup(String[] args) throws 
Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(Integer.parseInt(args[0]));
 
@@ -183,7 +170,7 @@ public class ConnectedComponentsCoGroupTest extends 
CompilerTestBase {
 
                iteration.closeWith(minAndUpdate, 
minAndUpdate).writeAsCsv(args[3]).name(SINK);
 
-               env.execute();
+               return env.createProgramPlan();
        }
 
        private static class DummyMapFunction implements 
FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 8602a3d..0f31481 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.test.optimizer.jsonplan;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.PageRank;
@@ -30,153 +31,91 @@ import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.util.PlanExposingEnvironment;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 
-import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * The tests in this class simply invokes the JSON dump code for the optimized 
plan.
  */
 public class DumpCompiledPlanTest extends CompilerTestBase {
 
        @Test
-       public void dumpWordCount() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       WordCount.main(new String[] {
-                                       "--input", IN_FILE,
-                                       "--output", OUT_FILE});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("WordCount failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpWordCount() throws Exception {
+               verifyOptimizedPlan(WordCount.class,
+                       "--input", IN_FILE,
+                       "--output", OUT_FILE);
        }
 
        @Test
-       public void dumpTPCH3() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       TPCHQuery3.main(new String[] {
-                                       "--lineitem", IN_FILE,
-                                       "--customer", IN_FILE,
-                                       "--orders", OUT_FILE,
-                                       "--output", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("TPCH3 failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpTPCH3() throws Exception {
+               verifyOptimizedPlan(TPCHQuery3.class,
+                       "--lineitem", IN_FILE,
+                       "--customer", IN_FILE,
+                       "--orders", OUT_FILE,
+                       "--output", "123");
        }
 
        @Test
-       public void dumpIterativeKMeans() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       KMeans.main(new String[] {
-                               "--points ", IN_FILE,
-                               "--centroids ", IN_FILE,
-                               "--output ", OUT_FILE,
-                               "--iterations", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("KMeans failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpIterativeKMeans() throws Exception {
+               verifyOptimizedPlan(KMeans.class,
+                       "--points ", IN_FILE,
+                       "--centroids ", IN_FILE,
+                       "--output ", OUT_FILE,
+                       "--iterations", "123");
        }
 
        @Test
-       public void dumpWebLogAnalysis() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       WebLogAnalysis.main(new String[] {
-                                       "--documents", IN_FILE,
-                                       "--ranks", IN_FILE,
-                                       "--visits", OUT_FILE,
-                                       "--output", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("WebLogAnalysis failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpWebLogAnalysis() throws Exception {
+               verifyOptimizedPlan(WebLogAnalysis.class,
+                       "--documents", IN_FILE,
+                       "--ranks", IN_FILE,
+                       "--visits", OUT_FILE,
+                       "--output", "123");
        }
 
        @Test
-       public void dumpBulkIterationKMeans() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       ConnectedComponents.main(new String[] {
-                                       "--vertices", IN_FILE,
-                                       "--edges", IN_FILE,
-                                       "--output", OUT_FILE,
-                                       "--iterations", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("ConnectedComponents failed with an 
exception");
-               }
-               dump(env.getPlan());
+       public void dumpBulkIterationKMeans() throws Exception {
+               verifyOptimizedPlan(ConnectedComponents.class,
+                       "--vertices", IN_FILE,
+                       "--edges", IN_FILE,
+                       "--output", OUT_FILE,
+                       "--iterations", "123");
        }
 
        @Test
-       public void dumpPageRank() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       PageRank.main(new String[]{
-                                       "--pages", IN_FILE,
-                                       "--links", IN_FILE,
-                                       "--output", OUT_FILE,
-                                       "--numPages", "10",
-                                       "--iterations", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("PageRank failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpPageRank() throws Exception {
+               verifyOptimizedPlan(PageRank.class,
+                       "--pages", IN_FILE,
+                       "--links", IN_FILE,
+                       "--output", OUT_FILE,
+                       "--numPages", "10",
+                       "--iterations", "123");
        }
 
-       private void dump(Plan p) {
-               p.setExecutionConfig(new ExecutionConfig());
-               try {
-                       OptimizedPlan op = compileNoStats(p);
-                       PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();
-                       String json = dumper.getOptimizerPlanAsJSON(op);
-                       JsonParser parser = new 
JsonFactory().createJsonParser(json);
-                       while (parser.nextToken() != null) {}
-               } catch (JsonParseException e) {
-                       e.printStackTrace();
-                       Assert.fail("JSON Generator produced malformatted 
output: " + e.getMessage());
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An error occurred in the test: " + 
e.getMessage());
+       private void verifyOptimizedPlan(Class<?> entrypoint, String... args) 
throws Exception {
+               final PackagedProgram program = PackagedProgram
+                       .newBuilder()
+                       .setEntryPointClassName(entrypoint.getName())
+                       .setArguments(args)
+                       .build();
+
+               final Pipeline pipeline = 
PackagedProgramUtils.getPipelineFromProgram(program, 1);
+
+               assertTrue(pipeline instanceof Plan);
+
+               final Plan plan = (Plan) pipeline;
+
+               final OptimizedPlan op = compileNoStats(plan);
+               final PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();
+               final String json = dumper.getOptimizerPlanAsJSON(op);
+               try (JsonParser parser = new JsonFactory().createParser(json)) {
+                       while (parser.nextToken() != null) {
+                       }
                }
        }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index f80ee25..0591e41 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -19,166 +19,107 @@
 package org.apache.flink.test.optimizer.jsonplan;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.PageRank;
 import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.util.PlanExposingEnvironment;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
 
+import static org.junit.Assert.assertTrue;
+
 /**
  * The tests in this class simply invokes the JSON dump code for the original 
plan.
  */
 public class PreviewPlanDumpTest extends CompilerTestBase {
 
        @Test
-       public void dumpWordCount() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       WordCount.main(new String[] {
-                                       "--input", IN_FILE,
-                                       "--output", OUT_FILE});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("WordCount failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpWordCount() throws Exception {
+               verifyPlanDump(WordCount.class,
+                       "--input", IN_FILE,
+                       "--output", OUT_FILE);
        }
 
        @Test
-       public void dumpTPCH3() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       TPCHQuery3.main(new String[] {
-                                       "--lineitem", IN_FILE,
-                                       "--customer", IN_FILE,
-                                       "--orders", OUT_FILE,
-                                       "--output", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("TPCH3 failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpTPCH3() throws Exception {
+               verifyPlanDump(TPCHQuery3.class,
+                       "--lineitem", IN_FILE,
+                       "--customer", IN_FILE,
+                       "--orders", OUT_FILE,
+                       "--output", "123");
        }
 
        @Test
-       public void dumpIterativeKMeans() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       KMeans.main(new String[] {
-                               "--points ", IN_FILE,
-                               "--centroids ", IN_FILE,
-                               "--output ", OUT_FILE,
-                               "--iterations", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("KMeans failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpIterativeKMeans() throws Exception {
+               verifyPlanDump(KMeans.class,
+                       "--points ", IN_FILE,
+                       "--centroids ", IN_FILE,
+                       "--output ", OUT_FILE,
+                       "--iterations", "123");
        }
 
        @Test
-       public void dumpWebLogAnalysis() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       
org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] {
-                                       "--documents", IN_FILE,
-                                       "--ranks", IN_FILE,
-                                       "--visits", OUT_FILE,
-                                       "--output", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("WebLogAnalysis failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpWebLogAnalysis() throws Exception {
+               verifyPlanDump(WebLogAnalysis.class,
+                       "--documents", IN_FILE,
+                       "--ranks", IN_FILE,
+                       "--visits", OUT_FILE,
+                       "--output", "123");
        }
 
        @Test
-       public void dumpBulkIterationKMeans() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       ConnectedComponents.main(new String[] {
-                                       "--vertices", IN_FILE,
-                                       "--edges", IN_FILE,
-                                       "--output", OUT_FILE,
-                                       "--iterations", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("ConnectedComponents failed with an 
exception");
-               }
-               dump(env.getPlan());
+       public void dumpBulkIterationKMeans() throws Exception {
+               verifyPlanDump(ConnectedComponents.class,
+                       "--vertices", IN_FILE,
+                       "--edges", IN_FILE,
+                       "--output", OUT_FILE,
+                       "--iterations", "123");
        }
 
        @Test
-       public void dumpPageRank() {
-               // prepare the test environment
-               PlanExposingEnvironment env = new PlanExposingEnvironment();
-               env.setAsContext();
-               try {
-                       // --pages <path> --links <path> --output <path> 
--numPages <n> --iterations <n>
-                       PageRank.main(new String[]{
-                                       "--pages", IN_FILE,
-                                       "--links", IN_FILE,
-                                       "--output", OUT_FILE,
-                                       "--numPages", "10",
-                                       "--iterations", "123"});
-               } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
-                       // all good.
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("PageRank failed with an exception");
-               }
-               dump(env.getPlan());
+       public void dumpPageRank() throws Exception {
+               verifyPlanDump(PageRank.class,
+                       "--pages", IN_FILE,
+                       "--links", IN_FILE,
+                       "--output", OUT_FILE,
+                       "--numPages", "10",
+                       "--iterations", "123");
        }
 
-       private void dump(Plan p) {
-               try {
-                       List<DataSinkNode> sinks = 
Optimizer.createPreOptimizedPlan(p);
-                       PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();
-                       String json = dumper.getPactPlanAsJSON(sinks);
-                       try (JsonParser parser = new 
JsonFactory().createParser(json)) {
-                               while (parser.nextToken() != null) {}
+       private static void verifyPlanDump(Class<?> entrypoint, String... args) 
throws Exception {
+               final PackagedProgram program = PackagedProgram
+                       .newBuilder()
+                       .setEntryPointClassName(entrypoint.getName())
+                       .setArguments(args)
+                       .build();
+
+               final Pipeline pipeline = 
PackagedProgramUtils.getPipelineFromProgram(program, 1);
+
+               assertTrue(pipeline instanceof Plan);
+
+               final Plan plan = (Plan) pipeline;
+
+               final List<DataSinkNode> sinks = 
Optimizer.createPreOptimizedPlan(plan);
+               final PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();
+               final String json = dumper.getPactPlanAsJSON(sinks);
+
+               try (JsonParser parser = new JsonFactory().createParser(json)) {
+                       while (parser.nextToken() != null) {
                        }
-               } catch (JsonParseException e) {
-                       e.printStackTrace();
-                       Assert.fail("JSON Generator produced malformatted 
output: " + e.getMessage());
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail("An error occurred in the test: " + 
e.getMessage());
                }
        }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java 
b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
index b5d9bbb..18a19fe 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
-import org.apache.flink.test.util.PlanExposingEnvironment;
 
 import org.junit.Test;
 
@@ -34,12 +32,14 @@ import org.junit.Test;
  */
 public class LargePlanTest {
 
-       @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, 
timeout = 30_000)
+       @Test(timeout = 30_000)
        public void testPlanningOfLargePlan() throws Exception {
-               runProgram(new PlanExposingEnvironment(), 10, 20);
+               runProgram(10, 20);
        }
 
-       private static void runProgram(ExecutionEnvironment env, int depth, int 
width) throws Exception {
+       private static void runProgram(int depth, int width) throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
                DataSet<String> input = env.fromElements("a", "b", "c");
                DataSet<String> stats = null;
 
@@ -48,7 +48,8 @@ public class LargePlanTest {
                }
 
                stats.output(new DiscardingOutputFormat<>());
-               env.execute("depth " + depth + " width " + width);
+
+               env.createProgramPlan("depth " + depth + " width " + width);
        }
 
        private static DataSet<String> analyze(DataSet<String> input, 
DataSet<String> stats, int branches) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/util/PlanExposingEnvironment.java
 
b/flink-tests/src/test/java/org/apache/flink/test/util/PlanExposingEnvironment.java
deleted file mode 100644
index 9488732..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/util/PlanExposingEnvironment.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
-
-/**
- * Environment to extract the pre-optimized plan.
- */
-public final class PlanExposingEnvironment extends ExecutionEnvironment {
-
-       private Plan plan;
-
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               this.plan = createProgramPlan(jobName);
-
-               // do not go on with anything now!
-               throw new OptimizerPlanEnvironment.ProgramAbortException();
-       }
-
-       public void setAsContext() {
-               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                       @Override
-                       public ExecutionEnvironment 
createExecutionEnvironment() {
-                               return PlanExposingEnvironment.this;
-                       }
-               };
-               initializeContextEnvironment(factory);
-       }
-
-       public void unsetAsContext() {
-               resetContextEnvironment();
-       }
-
-       public Plan getPlan() {
-               return plan;
-       }
-}

Reply via email to