This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new a1f9502 [FLINK-15312][tests] Remove PlanExposingEnvironment
a1f9502 is described below
commit a1f950209e68221bb1d4bc25b61c501308bf5a9a
Author: tison <[email protected]>
AuthorDate: Thu Dec 19 20:15:41 2019 +0800
[FLINK-15312][tests] Remove PlanExposingEnvironment
---
.../optimizer/examples/KMeansSingleStepTest.java | 26 +--
.../examples/RelationalQueryCompilerTest.java | 33 ++--
.../iterations/ConnectedComponentsCoGroupTest.java | 23 +--
.../optimizer/jsonplan/DumpCompiledPlanTest.java | 179 +++++++--------------
.../optimizer/jsonplan/PreviewPlanDumpTest.java | 179 +++++++--------------
.../apache/flink/test/planning/LargePlanTest.java | 13 +-
.../flink/test/util/PlanExposingEnvironment.java | 59 -------
7 files changed, 147 insertions(+), 365 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
index d5b103f..5e9a4da 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -40,7 +39,6 @@ import org.apache.flink.optimizer.util.OperatorResolver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.test.util.PlanExposingEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Assert;
@@ -70,7 +68,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
private final FieldList set0 = new FieldList(0);
@Test
- public void testCompileKMeansSingleStepWithStats() {
+ public void testCompileKMeansSingleStepWithStats() throws Exception {
Plan p = getKMeansPlan();
p.setExecutionConfig(new ExecutionConfig());
@@ -86,8 +84,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
}
@Test
- public void testCompileKMeansSingleStepWithOutStats() {
-
+ public void testCompileKMeansSingleStepWithOutStats() throws Exception {
Plan p = getKMeansPlan();
p.setExecutionConfig(new ExecutionConfig());
OptimizedPlan plan = compileNoStats(p);
@@ -141,22 +138,11 @@ public class KMeansSingleStepTest extends
CompilerTestBase {
assertEquals(LocalStrategy.NONE,
sink.getInput().getLocalStrategy());
}
- public static Plan getKMeansPlan() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- kmeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("KMeans failed with an exception");
- }
- return env.getPlan();
+ public static Plan getKMeansPlan() throws Exception {
+ return kmeans(new String[]{IN_FILE, IN_FILE, OUT_FILE, "20"});
}
- public static void kmeans(String[] args) throws Exception {
+ public static Plan kmeans(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<Point> points = env.readCsvFile(args[0])
@@ -191,7 +177,7 @@ public class KMeansSingleStepTest extends CompilerTestBase {
recomputeClusterCenter.project(0, 1).writeAsCsv(args[2], "\n",
" ").name(SINK);
- env.execute("KMeans Example");
+ return env.createProgramPlan("KMeans Example");
}
/**
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
index 84684a9..a35b520 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java
@@ -35,7 +35,6 @@ import
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
@@ -45,7 +44,6 @@ import org.apache.flink.optimizer.util.OperatorResolver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.test.util.PlanExposingEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Assert;
@@ -108,7 +106,7 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
* indicate this to be the smaller one.
*/
@Test
- public void testQueryAnyValidPlan() {
+ public void testQueryAnyValidPlan() throws Exception {
testQueryGeneric(1024 * 1024 * 1024L, 8 * 1024 * 1024 * 1024L,
0.05f, 0.05f, true, true, true, false, true);
}
@@ -116,7 +114,7 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
* Verifies that the plan compiles in the presence of empty size=0
estimates.
*/
@Test
- public void testQueryWithSizeZeroInputs() {
+ public void testQueryWithSizeZeroInputs() throws Exception {
testQueryGeneric(0, 0, 0.1f, 0.5f, true, true, true, false,
true);
}
@@ -124,7 +122,7 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
* Statistics that push towards a broadcast join.
*/
@Test
- public void testQueryWithStatsForBroadcastHash() {
+ public void testQueryWithStatsForBroadcastHash() throws Exception {
testQueryGeneric(1024L * 1024 * 1024 * 1024, 1024L * 1024 *
1024 * 1024, 0.01f, 0.05f, true, false, true, false, false);
}
@@ -132,7 +130,7 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
* Statistics that push towards a broadcast join.
*/
@Test
- public void testQueryWithStatsForRepartitionAny() {
+ public void testQueryWithStatsForRepartitionAny() throws Exception {
testQueryGeneric(100L * 1024 * 1024 * 1024 * 1024, 100L * 1024
* 1024 * 1024 * 1024, 0.1f, 0.5f, false, true, true, true, true);
}
@@ -141,7 +139,7 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
* re-exploiting the sorted order is cheaper.
*/
@Test
- public void testQueryWithStatsForRepartitionMerge() {
+ public void testQueryWithStatsForRepartitionMerge() throws Exception {
Plan p = getTPCH3Plan();
p.setExecutionConfig(defaultExecutionConfig);
// set compiler hints
@@ -156,7 +154,7 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
private void testQueryGeneric(long orderSize, long lineItemSize,
float ordersFilterFactor, float joinFilterFactor,
boolean broadcastOkay, boolean partitionedOkay,
- boolean hashJoinFirstOkay, boolean hashJoinSecondOkay,
boolean mergeJoinOkay) {
+ boolean hashJoinFirstOkay, boolean hashJoinSecondOkay,
boolean mergeJoinOkay) throws Exception {
Plan p = getTPCH3Plan();
p.setExecutionConfig(defaultExecutionConfig);
testQueryGeneric(p, orderSize, lineItemSize,
ordersFilterFactor, joinFilterFactor, broadcastOkay, partitionedOkay,
hashJoinFirstOkay, hashJoinSecondOkay, mergeJoinOkay);
@@ -347,22 +345,11 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
}
}
- public static Plan getTPCH3Plan() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- tcph3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE,
IN_FILE, OUT_FILE});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("tcph3 failed with an exception");
- }
- return env.getPlan();
+ public static Plan getTPCH3Plan() throws Exception {
+ return tcph3(new String[]{DEFAULT_PARALLELISM_STRING, IN_FILE,
IN_FILE, OUT_FILE});
}
- public static void tcph3(String[] args) throws Exception {
+ public static Plan tcph3(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(Integer.parseInt(args[0]));
@@ -388,7 +375,7 @@ public class RelationalQueryCompilerTest extends
CompilerTestBase {
aggLiO.writeAsCsv(args[3], "\n", "|").name(SINK);
- env.execute();
+ return env.createProgramPlan();
}
@ForwardedFields("f0; f4->f1")
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
index 3809b2b..33c1916 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java
@@ -31,7 +31,6 @@ import
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSec
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
-import
org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -44,7 +43,6 @@ import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.test.util.PlanExposingEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Assert;
@@ -71,7 +69,7 @@ public class ConnectedComponentsCoGroupTest extends
CompilerTestBase {
private final FieldList set0 = new FieldList(0);
@Test
- public void testWorksetConnectedComponents() {
+ public void testWorksetConnectedComponents() throws Exception {
Plan plan = getConnectedComponentsCoGroupPlan();
plan.setExecutionConfig(new ExecutionConfig());
OptimizedPlan optPlan = compileNoStats(plan);
@@ -145,22 +143,11 @@ public class ConnectedComponentsCoGroupTest extends
CompilerTestBase {
jgg.compileJobGraph(optPlan);
}
- public static Plan getConnectedComponentsCoGroupPlan() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- connectedComponentsWithCoGroup(new
String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
- } catch (ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("connectedComponentsWithCoGroup failed with
an exception");
- }
- return env.getPlan();
+ public static Plan getConnectedComponentsCoGroupPlan() throws Exception
{
+ return connectedComponentsWithCoGroup(new
String[]{DEFAULT_PARALLELISM_STRING, IN_FILE, IN_FILE, OUT_FILE, "100"});
}
- public static void connectedComponentsWithCoGroup(String[] args) throws
Exception {
+ public static Plan connectedComponentsWithCoGroup(String[] args) throws
Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(Integer.parseInt(args[0]));
@@ -183,7 +170,7 @@ public class ConnectedComponentsCoGroupTest extends
CompilerTestBase {
iteration.closeWith(minAndUpdate,
minAndUpdate).writeAsCsv(args[3]).name(SINK);
- env.execute();
+ return env.createProgramPlan();
}
private static class DummyMapFunction implements
FlatMapFunction<Tuple1<Long>, Tuple2<Long, Long>> {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 8602a3d..0f31481 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -18,9 +18,10 @@
package org.apache.flink.test.optimizer.jsonplan;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.PageRank;
@@ -30,153 +31,91 @@ import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.util.PlanExposingEnvironment;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
/**
* The tests in this class simply invokes the JSON dump code for the optimized
plan.
*/
public class DumpCompiledPlanTest extends CompilerTestBase {
@Test
- public void dumpWordCount() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- WordCount.main(new String[] {
- "--input", IN_FILE,
- "--output", OUT_FILE});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("WordCount failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpWordCount() throws Exception {
+ verifyOptimizedPlan(WordCount.class,
+ "--input", IN_FILE,
+ "--output", OUT_FILE);
}
@Test
- public void dumpTPCH3() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- TPCHQuery3.main(new String[] {
- "--lineitem", IN_FILE,
- "--customer", IN_FILE,
- "--orders", OUT_FILE,
- "--output", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("TPCH3 failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpTPCH3() throws Exception {
+ verifyOptimizedPlan(TPCHQuery3.class,
+ "--lineitem", IN_FILE,
+ "--customer", IN_FILE,
+ "--orders", OUT_FILE,
+ "--output", "123");
}
@Test
- public void dumpIterativeKMeans() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- KMeans.main(new String[] {
- "--points ", IN_FILE,
- "--centroids ", IN_FILE,
- "--output ", OUT_FILE,
- "--iterations", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("KMeans failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpIterativeKMeans() throws Exception {
+ verifyOptimizedPlan(KMeans.class,
+ "--points ", IN_FILE,
+ "--centroids ", IN_FILE,
+ "--output ", OUT_FILE,
+ "--iterations", "123");
}
@Test
- public void dumpWebLogAnalysis() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- WebLogAnalysis.main(new String[] {
- "--documents", IN_FILE,
- "--ranks", IN_FILE,
- "--visits", OUT_FILE,
- "--output", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("WebLogAnalysis failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpWebLogAnalysis() throws Exception {
+ verifyOptimizedPlan(WebLogAnalysis.class,
+ "--documents", IN_FILE,
+ "--ranks", IN_FILE,
+ "--visits", OUT_FILE,
+ "--output", "123");
}
@Test
- public void dumpBulkIterationKMeans() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- ConnectedComponents.main(new String[] {
- "--vertices", IN_FILE,
- "--edges", IN_FILE,
- "--output", OUT_FILE,
- "--iterations", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("ConnectedComponents failed with an
exception");
- }
- dump(env.getPlan());
+ public void dumpBulkIterationKMeans() throws Exception {
+ verifyOptimizedPlan(ConnectedComponents.class,
+ "--vertices", IN_FILE,
+ "--edges", IN_FILE,
+ "--output", OUT_FILE,
+ "--iterations", "123");
}
@Test
- public void dumpPageRank() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- PageRank.main(new String[]{
- "--pages", IN_FILE,
- "--links", IN_FILE,
- "--output", OUT_FILE,
- "--numPages", "10",
- "--iterations", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("PageRank failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpPageRank() throws Exception {
+ verifyOptimizedPlan(PageRank.class,
+ "--pages", IN_FILE,
+ "--links", IN_FILE,
+ "--output", OUT_FILE,
+ "--numPages", "10",
+ "--iterations", "123");
}
- private void dump(Plan p) {
- p.setExecutionConfig(new ExecutionConfig());
- try {
- OptimizedPlan op = compileNoStats(p);
- PlanJSONDumpGenerator dumper = new
PlanJSONDumpGenerator();
- String json = dumper.getOptimizerPlanAsJSON(op);
- JsonParser parser = new
JsonFactory().createJsonParser(json);
- while (parser.nextToken() != null) {}
- } catch (JsonParseException e) {
- e.printStackTrace();
- Assert.fail("JSON Generator produced malformatted
output: " + e.getMessage());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An error occurred in the test: " +
e.getMessage());
+ private void verifyOptimizedPlan(Class<?> entrypoint, String... args)
throws Exception {
+ final PackagedProgram program = PackagedProgram
+ .newBuilder()
+ .setEntryPointClassName(entrypoint.getName())
+ .setArguments(args)
+ .build();
+
+ final Pipeline pipeline =
PackagedProgramUtils.getPipelineFromProgram(program, 1);
+
+ assertTrue(pipeline instanceof Plan);
+
+ final Plan plan = (Plan) pipeline;
+
+ final OptimizedPlan op = compileNoStats(plan);
+ final PlanJSONDumpGenerator dumper = new
PlanJSONDumpGenerator();
+ final String json = dumper.getOptimizerPlanAsJSON(op);
+ try (JsonParser parser = new JsonFactory().createParser(json)) {
+ while (parser.nextToken() != null) {
+ }
}
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
index f80ee25..0591e41 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java
@@ -19,166 +19,107 @@
package org.apache.flink.test.optimizer.jsonplan;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.PageRank;
import org.apache.flink.examples.java.relational.TPCHQuery3;
+import org.apache.flink.examples.java.relational.WebLogAnalysis;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.dag.DataSinkNode;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.apache.flink.test.util.PlanExposingEnvironment;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.junit.Assert;
import org.junit.Test;
import java.util.List;
+import static org.junit.Assert.assertTrue;
+
/**
* The tests in this class simply invokes the JSON dump code for the original
plan.
*/
public class PreviewPlanDumpTest extends CompilerTestBase {
@Test
- public void dumpWordCount() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- WordCount.main(new String[] {
- "--input", IN_FILE,
- "--output", OUT_FILE});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("WordCount failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpWordCount() throws Exception {
+ verifyPlanDump(WordCount.class,
+ "--input", IN_FILE,
+ "--output", OUT_FILE);
}
@Test
- public void dumpTPCH3() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- TPCHQuery3.main(new String[] {
- "--lineitem", IN_FILE,
- "--customer", IN_FILE,
- "--orders", OUT_FILE,
- "--output", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("TPCH3 failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpTPCH3() throws Exception {
+ verifyPlanDump(TPCHQuery3.class,
+ "--lineitem", IN_FILE,
+ "--customer", IN_FILE,
+ "--orders", OUT_FILE,
+ "--output", "123");
}
@Test
- public void dumpIterativeKMeans() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- KMeans.main(new String[] {
- "--points ", IN_FILE,
- "--centroids ", IN_FILE,
- "--output ", OUT_FILE,
- "--iterations", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("KMeans failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpIterativeKMeans() throws Exception {
+ verifyPlanDump(KMeans.class,
+ "--points ", IN_FILE,
+ "--centroids ", IN_FILE,
+ "--output ", OUT_FILE,
+ "--iterations", "123");
}
@Test
- public void dumpWebLogAnalysis() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
-
org.apache.flink.examples.java.relational.WebLogAnalysis.main(new String[] {
- "--documents", IN_FILE,
- "--ranks", IN_FILE,
- "--visits", OUT_FILE,
- "--output", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("WebLogAnalysis failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpWebLogAnalysis() throws Exception {
+ verifyPlanDump(WebLogAnalysis.class,
+ "--documents", IN_FILE,
+ "--ranks", IN_FILE,
+ "--visits", OUT_FILE,
+ "--output", "123");
}
@Test
- public void dumpBulkIterationKMeans() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- ConnectedComponents.main(new String[] {
- "--vertices", IN_FILE,
- "--edges", IN_FILE,
- "--output", OUT_FILE,
- "--iterations", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("ConnectedComponents failed with an
exception");
- }
- dump(env.getPlan());
+ public void dumpBulkIterationKMeans() throws Exception {
+ verifyPlanDump(ConnectedComponents.class,
+ "--vertices", IN_FILE,
+ "--edges", IN_FILE,
+ "--output", OUT_FILE,
+ "--iterations", "123");
}
@Test
- public void dumpPageRank() {
- // prepare the test environment
- PlanExposingEnvironment env = new PlanExposingEnvironment();
- env.setAsContext();
- try {
- // --pages <path> --links <path> --output <path>
--numPages <n> --iterations <n>
- PageRank.main(new String[]{
- "--pages", IN_FILE,
- "--links", IN_FILE,
- "--output", OUT_FILE,
- "--numPages", "10",
- "--iterations", "123"});
- } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
- // all good.
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("PageRank failed with an exception");
- }
- dump(env.getPlan());
+ public void dumpPageRank() throws Exception {
+ verifyPlanDump(PageRank.class,
+ "--pages", IN_FILE,
+ "--links", IN_FILE,
+ "--output", OUT_FILE,
+ "--numPages", "10",
+ "--iterations", "123");
}
- private void dump(Plan p) {
- try {
- List<DataSinkNode> sinks =
Optimizer.createPreOptimizedPlan(p);
- PlanJSONDumpGenerator dumper = new
PlanJSONDumpGenerator();
- String json = dumper.getPactPlanAsJSON(sinks);
- try (JsonParser parser = new
JsonFactory().createParser(json)) {
- while (parser.nextToken() != null) {}
+ private static void verifyPlanDump(Class<?> entrypoint, String... args)
throws Exception {
+ final PackagedProgram program = PackagedProgram
+ .newBuilder()
+ .setEntryPointClassName(entrypoint.getName())
+ .setArguments(args)
+ .build();
+
+ final Pipeline pipeline =
PackagedProgramUtils.getPipelineFromProgram(program, 1);
+
+ assertTrue(pipeline instanceof Plan);
+
+ final Plan plan = (Plan) pipeline;
+
+ final List<DataSinkNode> sinks =
Optimizer.createPreOptimizedPlan(plan);
+ final PlanJSONDumpGenerator dumper = new
PlanJSONDumpGenerator();
+ final String json = dumper.getPactPlanAsJSON(sinks);
+
+ try (JsonParser parser = new JsonFactory().createParser(json)) {
+ while (parser.nextToken() != null) {
}
- } catch (JsonParseException e) {
- e.printStackTrace();
- Assert.fail("JSON Generator produced malformatted
output: " + e.getMessage());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An error occurred in the test: " +
e.getMessage());
}
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
index b5d9bbb..18a19fe 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
-import org.apache.flink.test.util.PlanExposingEnvironment;
import org.junit.Test;
@@ -34,12 +32,14 @@ import org.junit.Test;
*/
public class LargePlanTest {
- @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class,
timeout = 30_000)
+ @Test(timeout = 30_000)
public void testPlanningOfLargePlan() throws Exception {
- runProgram(new PlanExposingEnvironment(), 10, 20);
+ runProgram(10, 20);
}
- private static void runProgram(ExecutionEnvironment env, int depth, int
width) throws Exception {
+ private static void runProgram(int depth, int width) throws Exception {
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+
DataSet<String> input = env.fromElements("a", "b", "c");
DataSet<String> stats = null;
@@ -48,7 +48,8 @@ public class LargePlanTest {
}
stats.output(new DiscardingOutputFormat<>());
- env.execute("depth " + depth + " width " + width);
+
+ env.createProgramPlan("depth " + depth + " width " + width);
}
private static DataSet<String> analyze(DataSet<String> input,
DataSet<String> stats, int branches) {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/util/PlanExposingEnvironment.java
b/flink-tests/src/test/java/org/apache/flink/test/util/PlanExposingEnvironment.java
deleted file mode 100644
index 9488732..0000000
---
a/flink-tests/src/test/java/org/apache/flink/test/util/PlanExposingEnvironment.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.util;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
-
-/**
- * Environment to extract the pre-optimized plan.
- */
-public final class PlanExposingEnvironment extends ExecutionEnvironment {
-
- private Plan plan;
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- this.plan = createProgramPlan(jobName);
-
- // do not go on with anything now!
- throw new OptimizerPlanEnvironment.ProgramAbortException();
- }
-
- public void setAsContext() {
- ExecutionEnvironmentFactory factory = new
ExecutionEnvironmentFactory() {
- @Override
- public ExecutionEnvironment
createExecutionEnvironment() {
- return PlanExposingEnvironment.this;
- }
- };
- initializeContextEnvironment(factory);
- }
-
- public void unsetAsContext() {
- resetContextEnvironment();
- }
-
- public Plan getPlan() {
- return plan;
- }
-}