This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 144d92e [FLINK-19687][table] Support JSON_EXECUTION_PLAN for the
explain result
144d92e is described below
commit 144d92e49d6ed834c04bae902adb92cb0bd262aa
Author: V1ncentzzZ <[email protected]>
AuthorDate: Mon Dec 7 17:51:33 2020 +0800
[FLINK-19687][table] Support JSON_EXECUTION_PLAN for the explain result
This closes #14165
---
.../org/apache/flink/table/api/ExplainDetail.java | 7 +-
.../org/apache/flink/table/api/StatementSet.java | 2 +-
.../apache/flink/table/api/TableEnvironment.java | 2 +-
.../org/apache/flink/table/delegation/Planner.java | 2 +-
.../table/planner/delegation/BatchPlanner.scala | 7 +-
.../table/planner/delegation/StreamPlanner.scala | 7 +-
.../explain/testStatementSetExecutionExplain.out | 74 +++++++++++++
.../testStreamTableEnvironmentExecutionExplain.out | 74 +++++++++++++
.../flink/table/api/TableEnvironmentTest.scala | 43 ++++++++
.../flink/table/planner/utils/TableTestBase.scala | 8 ++
.../table/api/internal/BatchTableEnvImpl.scala | 11 +-
.../apache/flink/table/planner/StreamPlanner.scala | 11 +-
.../apache/flink/table/api/batch/ExplainTest.scala | 53 +++++++++
.../flink/table/api/stream/ExplainTest.scala | 51 +++++++++
.../testBatchTableEnvironmentExecutionExplain.out | 121 +++++++++++++++++++++
.../testStatementSetExecutionExplain0.out | 63 +++++++++++
.../testStatementSetExecutionExplain1.out | 121 +++++++++++++++++++++
.../testStreamTableEnvironmentExecutionExplain.out | 63 +++++++++++
18 files changed, 710 insertions(+), 10 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 60cd777..1e4cc3a 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -35,5 +35,10 @@ public enum ExplainDetail {
* The changelog mode produced by a physical rel node.
* e.g. GroupAggregate(..., changelogMode=[I,UA,D])
*/
- CHANGELOG_MODE
+ CHANGELOG_MODE,
+
+ /**
+ * The execution plan in json format of the program.
+ */
+ JSON_EXECUTION_PLAN
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
index a465fce..ea10042 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
@@ -51,7 +51,7 @@ public interface StatementSet {
* all statements and Tables.
*
* @param extraDetails The extra explain details which the explain
result should include,
- * e.g. estimated cost, changelog mode for streaming
+ * e.g. estimated cost, changelog mode for
streaming, displaying execution plan in json format
* @return AST and the execution plan.
*/
String explain(ExplainDetail... extraDetails);
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index cb5b8b8..fbde34e 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -803,7 +803,7 @@ public interface TableEnvironment {
*
* @param statement The statement for which the AST and execution plan
will be returned.
* @param extraDetails The extra explain details which the explain
result should include,
- * e.g. estimated cost, changelog mode for streaming
+ * e.g. estimated cost, changelog mode for streaming, displaying
execution plan in json format
* @return AST and the execution plan.
*/
String explainSql(String statement, ExplainDetail... extraDetails);
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index fb3b940..474bf80 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -81,7 +81,7 @@ public interface Planner {
* @param operations The collection of relational queries for which the
AST
* and execution plan will be returned.
* @param extraDetails The extra explain details which the explain
result should include,
- * e.g. estimated cost, changelog mode for streaming
+ * e.g. estimated cost, changelog mode for streaming, displaying
execution plan in json format
*/
String explain(List<Operation> operations, ExplainDetail...
extraDetails);
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index def8c11..dc7f0c8 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -148,7 +148,12 @@ class BatchPlanner(
sb.append("== Physical Execution Plan ==")
sb.append(System.lineSeparator)
- sb.append(executionPlan)
+ if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+ sb.append(streamGraph.getStreamingPlanAsJSON)
+ } else {
+ sb.append(executionPlan)
+ }
+
sb.toString()
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 7784c7a..3f946e9 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -128,7 +128,12 @@ class StreamPlanner(
sb.append("== Physical Execution Plan ==")
sb.append(System.lineSeparator)
- sb.append(executionPlan)
+ if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+ sb.append(streamGraph.getStreamingPlanAsJSON)
+ } else {
+ sb.append(executionPlan)
+ }
+
sb.toString()
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSetExecutionExplain.out
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSetExecutionExplain.out
new file mode 100644
index 0000000..93cd515
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSetExecutionExplain.out
@@ -0,0 +1,74 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`],
fields=[first])
++- LogicalProject(last=[$3])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: first, id, score, last)]]])
+
+== Optimized Logical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`],
fields=[first])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: last)]]], fields=[last])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: Custom File source",
+ "pact" : "Data Source",
+ "contents" : "Source: Custom File source",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "CsvTableSource(read fields: last)",
+ "pact" : "Operator",
+ "contents" : "CsvTableSource(read fields: last)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" :
"SourceConversion(table=[default_catalog.default_database.MyTable, source:
[CsvTableSource(read fields: last)]], fields=[last])",
+ "pact" : "Operator",
+ "contents" :
"SourceConversion(table=[default_catalog.default_database.MyTable, source:
[CsvTableSource(read fields: last)]], fields=[last])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "SinkConversionToRow",
+ "pact" : "Operator",
+ "contents" : "SinkConversionToRow",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Map",
+ "pact" : "Operator",
+ "contents" : "Map",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Sink: CsvTableSink(first)",
+ "pact" : "Data Sink",
+ "contents" : "Sink: CsvTableSink(first)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExecutionExplain.out
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExecutionExplain.out
new file mode 100644
index 0000000..88e290f
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExecutionExplain.out
@@ -0,0 +1,74 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`],
fields=[first])
++- LogicalProject(first=[$0])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: first, id, score, last)]]])
+
+== Optimized Logical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`],
fields=[first])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [CsvTableSource(read fields: first)]]], fields=[first])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: Custom File source",
+ "pact" : "Data Source",
+ "contents" : "Source: Custom File source",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "CsvTableSource(read fields: first)",
+ "pact" : "Operator",
+ "contents" : "CsvTableSource(read fields: first)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" :
"SourceConversion(table=[default_catalog.default_database.MyTable, source:
[CsvTableSource(read fields: first)]], fields=[first])",
+ "pact" : "Operator",
+ "contents" :
"SourceConversion(table=[default_catalog.default_database.MyTable, source:
[CsvTableSource(read fields: first)]], fields=[first])",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "SinkConversionToRow",
+ "pact" : "Operator",
+ "contents" : "SinkConversionToRow",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Map",
+ "pact" : "Operator",
+ "contents" : "Map",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Sink: CsvTableSink(first)",
+ "pact" : "Data Sink",
+ "contents" : "Sink: CsvTableSink(first)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 19dd431..bef138b 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -104,6 +104,49 @@ class TableEnvironmentTest {
}
@Test
+ def testStreamTableEnvironmentExecutionExplain(): Unit = {
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ execEnv.setParallelism(1)
+ val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+ val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+ TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")
+
+ TestTableSourceSinks.createCsvTemporarySinkTable(
+ tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
+
+ val expected =
+
TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExecutionExplain.out")
+ val actual = tEnv.explainSql("insert into MySink select first from
MyTable",
+ ExplainDetail.JSON_EXECUTION_PLAN)
+
+ assertEquals(TableTestUtil.replaceStreamNodeId(expected),
+ TableTestUtil.replaceStreamNodeId(actual))
+ }
+
+ @Test
+ def testStatementSetExecutionExplain(): Unit = {
+ val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+ execEnv.setParallelism(1)
+ val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+ val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+ TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")
+
+ TestTableSourceSinks.createCsvTemporarySinkTable(
+ tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
+
+ val expected =
+
TableTestUtil.readFromResource("/explain/testStatementSetExecutionExplain.out")
+ val statementSet = tEnv.createStatementSet()
+ statementSet.addInsertSql("insert into MySink select last from MyTable")
+ val actual = statementSet.explain(ExplainDetail.JSON_EXECUTION_PLAN)
+
+ assertEquals(TableTestUtil.replaceStreamNodeId(expected),
+ TableTestUtil.replaceStreamNodeId(actual))
+ }
+
+ @Test
def testExecuteSqlWithCreateAlterDropTable(): Unit = {
val createTableStmt =
"""
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 3d1ec39..2019688 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1204,4 +1204,12 @@ object TableTestUtil {
def replaceStageId(s: String): String = {
s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
}
+
+ /**
+ * Stream node {id} is ignored, because id keeps incrementing in test class
+ * while StreamExecutionEnvironment is up
+ */
+ def replaceStreamNodeId(s: String): String = {
+ s.replaceAll("\"id\" : \\d+", "\"id\" : ").trim
+ }
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 3d93e7d..83b6dff 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -297,7 +297,7 @@ abstract class BatchTableEnvImpl(
val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST)
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
- s"== Abstract Syntax Tree ==" +
+ val explanation = s"== Abstract Syntax Tree ==" +
System.lineSeparator +
s"$astPlan" +
System.lineSeparator +
@@ -306,8 +306,15 @@ abstract class BatchTableEnvImpl(
s"$optimizedPlan" +
System.lineSeparator +
s"== Physical Execution Plan ==" +
- System.lineSeparator +
+ System.lineSeparator
+
+ if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+ s"$explanation" +
+ s"$jasonSqlPlan"
+ } else {
+ s"$explanation" +
s"$sqlPlan"
+ }
}
override def execute(jobName: String): JobExecutionResult = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 19154c5..373788e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -162,7 +162,7 @@ class StreamPlanner(
val jsonSqlPlan = env.getExecutionPlan
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
- s"== Abstract Syntax Tree ==" +
+ val explanation = s"== Abstract Syntax Tree ==" +
System.lineSeparator +
s"$astPlan" +
System.lineSeparator +
@@ -171,8 +171,15 @@ class StreamPlanner(
s"$optimizedPlan" +
System.lineSeparator +
s"== Physical Execution Plan ==" +
- System.lineSeparator +
+ System.lineSeparator
+
+ if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+ s"$explanation" +
+ s"$jsonSqlPlan"
+ } else {
+ s"$explanation" +
s"$sqlPlan"
+ }
}
override def getCompletionHints(
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index a9f0847..ac75777 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -177,6 +177,54 @@ class ExplainTest
assertEquals(replaceStageId(expected), replaceStageId(result))
}
+ @Test
+ def testBatchTableEnvironmentExecutionExplain(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = BatchTableEnvironment.create(env)
+
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ "sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(),
Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ "targetTable", sink.configure(fieldNames, fieldTypes))
+
+ val actual = tEnv.explainSql("INSERT INTO targetTable SELECT first, id
FROM sourceTable",
+ ExplainDetail.JSON_EXECUTION_PLAN)
+ val expected =
readFromResource("testBatchTableEnvironmentExecutionExplain.out")
+
+ assertEquals(replaceStreamNodeIdAndEstimatedCostValue(expected),
+ replaceStreamNodeIdAndEstimatedCostValue(actual))
+ }
+
+ @Test
+ def testStatementSetExecutionExplain(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = BatchTableEnvironment.create(env)
+
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ "sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(),
Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ "targetTable", sink.configure(fieldNames, fieldTypes))
+
+ val statementSet = tEnv.createStatementSet()
+ statementSet.addInsertSql("INSERT INTO targetTable SELECT first, id FROM
sourceTable")
+
+ val actual = statementSet.explain(ExplainDetail.JSON_EXECUTION_PLAN)
+ val expected = readFromResource("testStatementSetExecutionExplain1.out")
+
+ assertEquals(replaceStreamNodeIdAndEstimatedCostValue(expected),
+ replaceStreamNodeIdAndEstimatedCostValue(actual))
+ }
+
def replaceString(s: String, t1: Table, t2: Table): String = {
replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
}
@@ -195,4 +243,9 @@ class ExplainTest
def replaceString(s: String): String = {
s.replaceAll("\\r\\n", "\n")
}
+
+ def replaceStreamNodeIdAndEstimatedCostValue(s: String): String = {
+ s.replaceAll("\"id\": \\d+", "\"id\": ")
+ .replaceAll("\"value\": \"([0-9]+)(\\.[\\d]+)?\"", "\"value\":
\"0.0\"").trim
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index b38ff12..46f4835 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -114,6 +114,54 @@ class ExplainTest extends AbstractTestBase {
assertEquals(replaceStageId(source), replaceStageId(result))
}
+ @Test
+ def testStreamTableEnvironmentExecutionExplain(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+ val tEnv = StreamTableEnvironment.create(env, settings)
+
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ "sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(),
Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ "targetTable", sink.configure(fieldNames, fieldTypes))
+
+ val actual = tEnv.explainSql("INSERT INTO targetTable SELECT first, id
FROM sourceTable",
+ ExplainDetail.JSON_EXECUTION_PLAN)
+ val expected =
readFromResource("testStreamTableEnvironmentExecutionExplain.out")
+
+ assertEquals(replaceStreamNodeId(expected), replaceStreamNodeId(actual))
+ }
+
+ @Test
+ def testStatementSetExecutionExplain(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+ val tEnv = StreamTableEnvironment.create(env, settings)
+
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+ "sourceTable", CommonTestData.getCsvTableSource)
+
+ val fieldNames = Array("d", "e")
+ val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(),
Types.INT())
+ val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+ tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+ "targetTable", sink.configure(fieldNames, fieldTypes))
+
+ val statementSet = tEnv.createStatementSet()
+ statementSet.addInsertSql("INSERT INTO targetTable SELECT first, id FROM
sourceTable")
+
+ val actual = statementSet.explain(ExplainDetail.JSON_EXECUTION_PLAN)
+ val expected = readFromResource("testStatementSetExecutionExplain0.out")
+
+ assertEquals(replaceStreamNodeId(expected), replaceStreamNodeId(actual))
+ }
+
def replaceString(s: String, t1: Table, t2: Table): String = {
replaceSourceNode(replaceSourceNode(replaceStageId(s), t1, 0), t2, 1)
}
@@ -130,4 +178,7 @@ class ExplainTest extends AbstractTestBase {
.replace(s"%sourceNode$idx%", streamTableNode(t))
}
+ def replaceStreamNodeId(s: String): String = {
+ s.replaceAll("\"id\" : \\d+", "\"id\" : ").trim
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/resources/testBatchTableEnvironmentExecutionExplain.out
b/flink-table/flink-table-planner/src/test/scala/resources/testBatchTableEnvironmentExecutionExplain.out
new file mode 100644
index 0000000..54ebc93
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/scala/resources/testBatchTableEnvironmentExecutionExplain.out
@@ -0,0 +1,121 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`],
fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`],
fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database,
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first,
id)])
+
+== Physical Execution Plan ==
+{
+ "nodes": [
+
+ {
+ "id": ,
+ "type": "source",
+ "pact": "Data Source",
+ "contents": "CsvTableSource(read fields: first, id)",
+ "parallelism": "1",
+ "global_properties": [
+ { "name": "Partitioning", "value": "RANDOM_PARTITIONED"
},
+ { "name": "Partitioning Order", "value": "(none)" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "local_properties": [
+ { "name": "Order", "value": "(none)" },
+ { "name": "Grouping", "value": "not grouped" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "estimates": [
+ { "name": "Est. Output Size", "value": "0.0" },
+ { "name": "Est. Cardinality", "value": "0.0" }
],
+ "costs": [
+ { "name": "Network", "value": "0.0" },
+ { "name": "Disk I/O", "value": "0.0" },
+ { "name": "CPU", "value": "0.0" },
+ { "name": "Cumulative Network", "value": "0.0" },
+ { "name": "Cumulative Disk I/O", "value": "0.0" },
+ { "name": "Cumulative CPU", "value": "0.0" }
+ ],
+ "compiler_hints": [
+ { "name": "Output Size (bytes)", "value": "(none)" },
+ { "name": "Output Cardinality", "value": "(none)" },
+ { "name": "Avg. Output Record Size (bytes)", "value":
"(none)" },
+ { "name": "Filter Factor", "value": "(none)" }
]
+ },
+ {
+ "id": ,
+ "type": "pact",
+ "pact": "Map",
+ "contents": "to: Row",
+ "parallelism": "1",
+ "predecessors": [
+ {"id": , "ship_strategy": "Forward", "exchange_mode":
"PIPELINED"}
+ ],
+ "driver_strategy": "Map",
+ "global_properties": [
+ { "name": "Partitioning", "value": "RANDOM_PARTITIONED"
},
+ { "name": "Partitioning Order", "value": "(none)" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "local_properties": [
+ { "name": "Order", "value": "(none)" },
+ { "name": "Grouping", "value": "not grouped" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "estimates": [
+ { "name": "Est. Output Size", "value": "(unknown)" },
+ { "name": "Est. Cardinality", "value": "0.0" }
],
+ "costs": [
+ { "name": "Network", "value": "0.0" },
+ { "name": "Disk I/O", "value": "0.0" },
+ { "name": "CPU", "value": "0.0" },
+ { "name": "Cumulative Network", "value": "0.0" },
+ { "name": "Cumulative Disk I/O", "value": "0.0" },
+ { "name": "Cumulative CPU", "value": "0.0" }
+ ],
+ "compiler_hints": [
+ { "name": "Output Size (bytes)", "value": "(none)" },
+ { "name": "Output Cardinality", "value": "(none)" },
+ { "name": "Avg. Output Record Size (bytes)", "value":
"(none)" },
+ { "name": "Filter Factor", "value": "(none)" }
]
+ },
+ {
+ "id": ,
+ "type": "sink",
+ "pact": "Data Sink",
+ "contents": "UnsafeMemoryAppendTableSink(d, e)",
+ "parallelism": "1",
+ "predecessors": [
+ {"id": , "ship_strategy": "Forward", "exchange_mode":
"PIPELINED"}
+ ],
+ "global_properties": [
+ { "name": "Partitioning", "value": "RANDOM_PARTITIONED"
},
+ { "name": "Partitioning Order", "value": "(none)" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "local_properties": [
+ { "name": "Order", "value": "(none)" },
+ { "name": "Grouping", "value": "not grouped" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "estimates": [
+ { "name": "Est. Output Size", "value": "(unknown)" },
+ { "name": "Est. Cardinality", "value": "0.0" }
],
+ "costs": [
+ { "name": "Network", "value": "0.0" },
+ { "name": "Disk I/O", "value": "0.0" },
+ { "name": "CPU", "value": "0.0" },
+ { "name": "Cumulative Network", "value": "0.0" },
+ { "name": "Cumulative Disk I/O", "value": "0.0" },
+ { "name": "Cumulative CPU", "value": "0.0" }
+ ],
+ "compiler_hints": [
+ { "name": "Output Size (bytes)", "value": "(none)" },
+ { "name": "Output Cardinality", "value": "(none)" },
+ { "name": "Avg. Output Record Size (bytes)", "value":
"(none)" },
+ { "name": "Filter Factor", "value": "(none)" }
]
+ }
+ ]
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain0.out
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain0.out
new file mode 100644
index 0000000..e75e320
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain0.out
@@ -0,0 +1,63 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable], fields=[d,
e])
+ StreamTableSourceScan(table=[[default_catalog, default_database,
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first,
id)])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: Custom File source",
+ "pact" : "Data Source",
+ "contents" : "Source: Custom File source",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "CsvTableSource(read fields: first, id)",
+ "pact" : "Operator",
+ "contents" : "CsvTableSource(read fields: first, id)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Map",
+ "pact" : "Operator",
+ "contents" : "Map",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "to: Row",
+ "pact" : "Operator",
+ "contents" : "to: Row",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+ "pact" : "Data Sink",
+ "contents" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain1.out
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain1.out
new file mode 100644
index 0000000..54ebc93
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain1.out
@@ -0,0 +1,121 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`],
fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`],
fields=[d, e])
+ BatchTableSourceScan(table=[[default_catalog, default_database,
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first,
id)])
+
+== Physical Execution Plan ==
+{
+ "nodes": [
+
+ {
+ "id": ,
+ "type": "source",
+ "pact": "Data Source",
+ "contents": "CsvTableSource(read fields: first, id)",
+ "parallelism": "1",
+ "global_properties": [
+ { "name": "Partitioning", "value": "RANDOM_PARTITIONED"
},
+ { "name": "Partitioning Order", "value": "(none)" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "local_properties": [
+ { "name": "Order", "value": "(none)" },
+ { "name": "Grouping", "value": "not grouped" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "estimates": [
+ { "name": "Est. Output Size", "value": "0.0" },
+ { "name": "Est. Cardinality", "value": "0.0" }
],
+ "costs": [
+ { "name": "Network", "value": "0.0" },
+ { "name": "Disk I/O", "value": "0.0" },
+ { "name": "CPU", "value": "0.0" },
+ { "name": "Cumulative Network", "value": "0.0" },
+ { "name": "Cumulative Disk I/O", "value": "0.0" },
+ { "name": "Cumulative CPU", "value": "0.0" }
+ ],
+ "compiler_hints": [
+ { "name": "Output Size (bytes)", "value": "(none)" },
+ { "name": "Output Cardinality", "value": "(none)" },
+ { "name": "Avg. Output Record Size (bytes)", "value":
"(none)" },
+ { "name": "Filter Factor", "value": "(none)" }
]
+ },
+ {
+ "id": ,
+ "type": "pact",
+ "pact": "Map",
+ "contents": "to: Row",
+ "parallelism": "1",
+ "predecessors": [
+ {"id": , "ship_strategy": "Forward", "exchange_mode":
"PIPELINED"}
+ ],
+ "driver_strategy": "Map",
+ "global_properties": [
+ { "name": "Partitioning", "value": "RANDOM_PARTITIONED"
},
+ { "name": "Partitioning Order", "value": "(none)" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "local_properties": [
+ { "name": "Order", "value": "(none)" },
+ { "name": "Grouping", "value": "not grouped" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "estimates": [
+ { "name": "Est. Output Size", "value": "(unknown)" },
+ { "name": "Est. Cardinality", "value": "0.0" }
],
+ "costs": [
+ { "name": "Network", "value": "0.0" },
+ { "name": "Disk I/O", "value": "0.0" },
+ { "name": "CPU", "value": "0.0" },
+ { "name": "Cumulative Network", "value": "0.0" },
+ { "name": "Cumulative Disk I/O", "value": "0.0" },
+ { "name": "Cumulative CPU", "value": "0.0" }
+ ],
+ "compiler_hints": [
+ { "name": "Output Size (bytes)", "value": "(none)" },
+ { "name": "Output Cardinality", "value": "(none)" },
+ { "name": "Avg. Output Record Size (bytes)", "value":
"(none)" },
+ { "name": "Filter Factor", "value": "(none)" }
]
+ },
+ {
+ "id": ,
+ "type": "sink",
+ "pact": "Data Sink",
+ "contents": "UnsafeMemoryAppendTableSink(d, e)",
+ "parallelism": "1",
+ "predecessors": [
+ {"id": , "ship_strategy": "Forward", "exchange_mode":
"PIPELINED"}
+ ],
+ "global_properties": [
+ { "name": "Partitioning", "value": "RANDOM_PARTITIONED"
},
+ { "name": "Partitioning Order", "value": "(none)" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "local_properties": [
+ { "name": "Order", "value": "(none)" },
+ { "name": "Grouping", "value": "not grouped" },
+ { "name": "Uniqueness", "value": "not unique" }
+ ],
+ "estimates": [
+ { "name": "Est. Output Size", "value": "(unknown)" },
+ { "name": "Est. Cardinality", "value": "0.0" }
],
+ "costs": [
+ { "name": "Network", "value": "0.0" },
+ { "name": "Disk I/O", "value": "0.0" },
+ { "name": "CPU", "value": "0.0" },
+ { "name": "Cumulative Network", "value": "0.0" },
+ { "name": "Cumulative Disk I/O", "value": "0.0" },
+ { "name": "Cumulative CPU", "value": "0.0" }
+ ],
+ "compiler_hints": [
+ { "name": "Output Size (bytes)", "value": "(none)" },
+ { "name": "Output Cardinality", "value": "(none)" },
+ { "name": "Avg. Output Record Size (bytes)", "value":
"(none)" },
+ { "name": "Filter Factor", "value": "(none)" }
]
+ }
+ ]
+}
diff --git
a/flink-table/flink-table-planner/src/test/scala/resources/testStreamTableEnvironmentExecutionExplain.out
b/flink-table/flink-table-planner/src/test/scala/resources/testStreamTableEnvironmentExecutionExplain.out
new file mode 100644
index 0000000..e75e320
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/scala/resources/testStreamTableEnvironmentExecutionExplain.out
@@ -0,0 +1,63 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+ LogicalProject(first=[$0], id=[$1])
+ LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable], fields=[d,
e])
+ StreamTableSourceScan(table=[[default_catalog, default_database,
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first,
id)])
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: Custom File source",
+ "pact" : "Data Source",
+ "contents" : "Source: Custom File source",
+ "parallelism" : 1
+ }, {
+ "id" : ,
+ "type" : "CsvTableSource(read fields: first, id)",
+ "pact" : "Operator",
+ "contents" : "CsvTableSource(read fields: first, id)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Map",
+ "pact" : "Operator",
+ "contents" : "Map",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "to: Row",
+ "pact" : "Operator",
+ "contents" : "to: Row",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+ "pact" : "Data Sink",
+ "contents" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+ "parallelism" : 1,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "FORWARD",
+ "side" : "second"
+ } ]
+ } ]
+}