This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 144d92e  [FLINK-19687][table] Support JSON_EXECUTION_PLAN for the 
explain result
144d92e is described below

commit 144d92e49d6ed834c04bae902adb92cb0bd262aa
Author: V1ncentzzZ <[email protected]>
AuthorDate: Mon Dec 7 17:51:33 2020 +0800

    [FLINK-19687][table] Support JSON_EXECUTION_PLAN for the explain result
    
    This closes #14165
---
 .../org/apache/flink/table/api/ExplainDetail.java  |   7 +-
 .../org/apache/flink/table/api/StatementSet.java   |   2 +-
 .../apache/flink/table/api/TableEnvironment.java   |   2 +-
 .../org/apache/flink/table/delegation/Planner.java |   2 +-
 .../table/planner/delegation/BatchPlanner.scala    |   7 +-
 .../table/planner/delegation/StreamPlanner.scala   |   7 +-
 .../explain/testStatementSetExecutionExplain.out   |  74 +++++++++++++
 .../testStreamTableEnvironmentExecutionExplain.out |  74 +++++++++++++
 .../flink/table/api/TableEnvironmentTest.scala     |  43 ++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |   8 ++
 .../table/api/internal/BatchTableEnvImpl.scala     |  11 +-
 .../apache/flink/table/planner/StreamPlanner.scala |  11 +-
 .../apache/flink/table/api/batch/ExplainTest.scala |  53 +++++++++
 .../flink/table/api/stream/ExplainTest.scala       |  51 +++++++++
 .../testBatchTableEnvironmentExecutionExplain.out  | 121 +++++++++++++++++++++
 .../testStatementSetExecutionExplain0.out          |  63 +++++++++++
 .../testStatementSetExecutionExplain1.out          | 121 +++++++++++++++++++++
 .../testStreamTableEnvironmentExecutionExplain.out |  63 +++++++++++
 18 files changed, 710 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 60cd777..1e4cc3a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -35,5 +35,10 @@ public enum ExplainDetail {
         * The changelog mode produced by a physical rel node.
         * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
         */
-       CHANGELOG_MODE
+       CHANGELOG_MODE,
+
+       /**
+        * The execution plan in json format of the program.
+        */
+       JSON_EXECUTION_PLAN
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
index a465fce..ea10042 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/StatementSet.java
@@ -51,7 +51,7 @@ public interface StatementSet {
         * all statements and Tables.
         *
         * @param extraDetails The extra explain details which the explain 
result should include,
-        *                     e.g. estimated cost, changelog mode for streaming
+        *                     e.g. estimated cost, changelog mode for 
streaming, displaying execution plan in json format
         * @return AST and the execution plan.
         */
        String explain(ExplainDetail... extraDetails);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index cb5b8b8..fbde34e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -803,7 +803,7 @@ public interface TableEnvironment {
         *
         * @param statement The statement for which the AST and execution plan 
will be returned.
         * @param extraDetails The extra explain details which the explain 
result should include,
-        *   e.g. estimated cost, changelog mode for streaming
+        *   e.g. estimated cost, changelog mode for streaming, displaying 
execution plan in json format
         * @return AST and the execution plan.
         */
        String explainSql(String statement, ExplainDetail... extraDetails);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index fb3b940..474bf80 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -81,7 +81,7 @@ public interface Planner {
         * @param operations The collection of relational queries for which the 
AST
         * and execution plan will be returned.
         * @param extraDetails The extra explain details which the explain 
result should include,
-        *   e.g. estimated cost, changelog mode for streaming
+        *   e.g. estimated cost, changelog mode for streaming, displaying 
execution plan in json format
         */
        String explain(List<Operation> operations, ExplainDetail... 
extraDetails);
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index def8c11..dc7f0c8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -148,7 +148,12 @@ class BatchPlanner(
 
     sb.append("== Physical Execution Plan ==")
     sb.append(System.lineSeparator)
-    sb.append(executionPlan)
+    if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+      sb.append(streamGraph.getStreamingPlanAsJSON)
+    } else {
+      sb.append(executionPlan)
+    }
+
     sb.toString()
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 7784c7a..3f946e9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -128,7 +128,12 @@ class StreamPlanner(
 
     sb.append("== Physical Execution Plan ==")
     sb.append(System.lineSeparator)
-    sb.append(executionPlan)
+    if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+      sb.append(streamGraph.getStreamingPlanAsJSON)
+    } else {
+      sb.append(executionPlan)
+    }
+
     sb.toString()
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSetExecutionExplain.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSetExecutionExplain.out
new file mode 100644
index 0000000..93cd515
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStatementSetExecutionExplain.out
@@ -0,0 +1,74 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], 
fields=[first])
++- LogicalProject(last=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: first, id, score, last)]]])
+
+== Optimized Logical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], 
fields=[first])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: last)]]], fields=[last])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Custom File source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom File source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "CsvTableSource(read fields: last)",
+    "pact" : "Operator",
+    "contents" : "CsvTableSource(read fields: last)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CsvTableSource(read fields: last)]], fields=[last])",
+    "pact" : "Operator",
+    "contents" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CsvTableSource(read fields: last)]], fields=[last])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "SinkConversionToRow",
+    "pact" : "Operator",
+    "contents" : "SinkConversionToRow",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Map",
+    "pact" : "Operator",
+    "contents" : "Map",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: CsvTableSink(first)",
+    "pact" : "Data Sink",
+    "contents" : "Sink: CsvTableSink(first)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExecutionExplain.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExecutionExplain.out
new file mode 100644
index 0000000..88e290f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testStreamTableEnvironmentExecutionExplain.out
@@ -0,0 +1,74 @@
+== Abstract Syntax Tree ==
+LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink`], 
fields=[first])
++- LogicalProject(first=[$0])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: first, id, score, last)]]])
+
+== Optimized Logical Plan ==
+LegacySink(name=[`default_catalog`.`default_database`.`MySink`], 
fields=[first])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CsvTableSource(read fields: first)]]], fields=[first])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Custom File source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom File source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "CsvTableSource(read fields: first)",
+    "pact" : "Operator",
+    "contents" : "CsvTableSource(read fields: first)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CsvTableSource(read fields: first)]], fields=[first])",
+    "pact" : "Operator",
+    "contents" : 
"SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CsvTableSource(read fields: first)]], fields=[first])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "SinkConversionToRow",
+    "pact" : "Operator",
+    "contents" : "SinkConversionToRow",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Map",
+    "pact" : "Operator",
+    "contents" : "Map",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: CsvTableSink(first)",
+    "pact" : "Data Sink",
+    "contents" : "Sink: CsvTableSink(first)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 19dd431..bef138b 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -104,6 +104,49 @@ class TableEnvironmentTest {
   }
 
   @Test
+  def testStreamTableEnvironmentExecutionExplain(): Unit = {
+    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    execEnv.setParallelism(1)
+    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+    val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+    TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")
+
+    TestTableSourceSinks.createCsvTemporarySinkTable(
+      tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
+
+    val expected =
+      
TableTestUtil.readFromResource("/explain/testStreamTableEnvironmentExecutionExplain.out")
+    val actual = tEnv.explainSql("insert into MySink select first from 
MyTable",
+      ExplainDetail.JSON_EXECUTION_PLAN)
+
+    assertEquals(TableTestUtil.replaceStreamNodeId(expected),
+      TableTestUtil.replaceStreamNodeId(actual))
+  }
+
+  @Test
+  def testStatementSetExecutionExplain(): Unit = {
+    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
+    execEnv.setParallelism(1)
+    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
+    val tEnv = StreamTableEnvironment.create(execEnv, settings)
+
+    TestTableSourceSinks.createPersonCsvTemporaryTable(tEnv, "MyTable")
+
+    TestTableSourceSinks.createCsvTemporarySinkTable(
+      tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink", -1)
+
+    val expected =
+      
TableTestUtil.readFromResource("/explain/testStatementSetExecutionExplain.out")
+    val statementSet = tEnv.createStatementSet()
+    statementSet.addInsertSql("insert into MySink select last from MyTable")
+    val actual = statementSet.explain(ExplainDetail.JSON_EXECUTION_PLAN)
+
+    assertEquals(TableTestUtil.replaceStreamNodeId(expected),
+      TableTestUtil.replaceStreamNodeId(actual))
+  }
+
+  @Test
   def testExecuteSqlWithCreateAlterDropTable(): Unit = {
     val createTableStmt =
       """
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 3d1ec39..2019688 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1204,4 +1204,12 @@ object TableTestUtil {
   def replaceStageId(s: String): String = {
     s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
   }
+
+  /**
+   * Stream node {id} is ignored, because id keeps incrementing in test class
+   * while StreamExecutionEnvironment is up
+   */
+  def replaceStreamNodeId(s: String): String = {
+    s.replaceAll("\"id\" : \\d+", "\"id\" : ").trim
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 3d93e7d..83b6dff 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -297,7 +297,7 @@ abstract class BatchTableEnvImpl(
     val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST)
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
-    s"== Abstract Syntax Tree ==" +
+    val explanation = s"== Abstract Syntax Tree ==" +
       System.lineSeparator +
       s"$astPlan" +
       System.lineSeparator +
@@ -306,8 +306,15 @@ abstract class BatchTableEnvImpl(
       s"$optimizedPlan" +
       System.lineSeparator +
       s"== Physical Execution Plan ==" +
-      System.lineSeparator +
+      System.lineSeparator
+
+    if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+      s"$explanation" +
+      s"$jasonSqlPlan"
+    } else {
+      s"$explanation" +
       s"$sqlPlan"
+    }
   }
 
   override def execute(jobName: String): JobExecutionResult = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 19154c5..373788e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -162,7 +162,7 @@ class StreamPlanner(
     val jsonSqlPlan = env.getExecutionPlan
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)
 
-    s"== Abstract Syntax Tree ==" +
+    val explanation = s"== Abstract Syntax Tree ==" +
       System.lineSeparator +
       s"$astPlan" +
       System.lineSeparator +
@@ -171,8 +171,15 @@ class StreamPlanner(
       s"$optimizedPlan" +
       System.lineSeparator +
       s"== Physical Execution Plan ==" +
-      System.lineSeparator +
+      System.lineSeparator
+
+    if (extraDetails.contains(ExplainDetail.JSON_EXECUTION_PLAN)) {
+      s"$explanation" +
+      s"$jsonSqlPlan"
+    } else {
+      s"$explanation" +
       s"$sqlPlan"
+    }
   }
 
   override def getCompletionHints(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
index a9f0847..ac75777 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala
@@ -177,6 +177,54 @@ class ExplainTest
     assertEquals(replaceStageId(expected), replaceStageId(result))
   }
 
+  @Test
+  def testBatchTableEnvironmentExecutionExplain(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = BatchTableEnvironment.create(env)
+
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+      "sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), 
Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+      "targetTable", sink.configure(fieldNames, fieldTypes))
+
+    val actual = tEnv.explainSql("INSERT INTO targetTable SELECT first, id 
FROM sourceTable",
+      ExplainDetail.JSON_EXECUTION_PLAN)
+    val expected = 
readFromResource("testBatchTableEnvironmentExecutionExplain.out")
+
+    assertEquals(replaceStreamNodeIdAndEstimatedCostValue(expected),
+      replaceStreamNodeIdAndEstimatedCostValue(actual))
+  }
+
+  @Test
+  def testStatementSetExecutionExplain(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = BatchTableEnvironment.create(env)
+
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+      "sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), 
Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+      "targetTable", sink.configure(fieldNames, fieldTypes))
+
+    val statementSet = tEnv.createStatementSet()
+    statementSet.addInsertSql("INSERT INTO targetTable SELECT first, id FROM 
sourceTable")
+
+    val actual = statementSet.explain(ExplainDetail.JSON_EXECUTION_PLAN)
+    val expected = readFromResource("testStatementSetExecutionExplain1.out")
+
+    assertEquals(replaceStreamNodeIdAndEstimatedCostValue(expected),
+      replaceStreamNodeIdAndEstimatedCostValue(actual))
+  }
+
   def replaceString(s: String, t1: Table, t2: Table): String = {
     replaceSourceNode(replaceSourceNode(replaceString(s), t1, 0), t2, 1)
   }
@@ -195,4 +243,9 @@ class ExplainTest
   def replaceString(s: String): String = {
     s.replaceAll("\\r\\n", "\n")
   }
+
+  def replaceStreamNodeIdAndEstimatedCostValue(s: String): String = {
+    s.replaceAll("\"id\": \\d+", "\"id\": ")
+        .replaceAll("\"value\": \"([0-9]+)(\\.[\\d]+)?\"", "\"value\": 
\"0.0\"").trim
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
index b38ff12..46f4835 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
@@ -114,6 +114,54 @@ class ExplainTest extends AbstractTestBase {
     assertEquals(replaceStageId(source), replaceStageId(result))
   }
 
+  @Test
+  def testStreamTableEnvironmentExecutionExplain(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+    val tEnv = StreamTableEnvironment.create(env, settings)
+
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+      "sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), 
Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+      "targetTable", sink.configure(fieldNames, fieldTypes))
+
+    val actual = tEnv.explainSql("INSERT INTO targetTable SELECT first, id 
FROM sourceTable",
+      ExplainDetail.JSON_EXECUTION_PLAN)
+    val expected = 
readFromResource("testStreamTableEnvironmentExecutionExplain.out")
+
+    assertEquals(replaceStreamNodeId(expected), replaceStreamNodeId(actual))
+  }
+
+  @Test
+  def testStatementSetExecutionExplain(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val settings = EnvironmentSettings.newInstance().useOldPlanner().build()
+    val tEnv = StreamTableEnvironment.create(env, settings)
+
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSourceInternal(
+      "sourceTable", CommonTestData.getCsvTableSource)
+
+    val fieldNames = Array("d", "e")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING(), 
Types.INT())
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(
+      "targetTable", sink.configure(fieldNames, fieldTypes))
+
+    val statementSet = tEnv.createStatementSet()
+    statementSet.addInsertSql("INSERT INTO targetTable SELECT first, id FROM 
sourceTable")
+
+    val actual = statementSet.explain(ExplainDetail.JSON_EXECUTION_PLAN)
+    val expected = readFromResource("testStatementSetExecutionExplain0.out")
+
+    assertEquals(replaceStreamNodeId(expected), replaceStreamNodeId(actual))
+  }
+
   def replaceString(s: String, t1: Table, t2: Table): String = {
     replaceSourceNode(replaceSourceNode(replaceStageId(s), t1, 0), t2, 1)
   }
@@ -130,4 +178,7 @@ class ExplainTest extends AbstractTestBase {
       .replace(s"%sourceNode$idx%", streamTableNode(t))
   }
 
+  def replaceStreamNodeId(s: String): String = {
+    s.replaceAll("\"id\" : \\d+", "\"id\" : ").trim
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testBatchTableEnvironmentExecutionExplain.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testBatchTableEnvironmentExecutionExplain.out
new file mode 100644
index 0000000..54ebc93
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testBatchTableEnvironmentExecutionExplain.out
@@ -0,0 +1,121 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`], 
fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`], 
fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, 
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, 
id)])
+
+== Physical Execution Plan ==
+{
+       "nodes": [
+
+       {
+               "id": ,
+               "type": "source",
+               "pact": "Data Source",
+               "contents": "CsvTableSource(read fields: first, id)",
+               "parallelism": "1",
+               "global_properties": [
+                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
+                       { "name": "Partitioning Order", "value": "(none)" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "local_properties": [
+                       { "name": "Order", "value": "(none)" },
+                       { "name": "Grouping", "value": "not grouped" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "estimates": [
+                       { "name": "Est. Output Size", "value": "0.0" },
+                       { "name": "Est. Cardinality", "value": "0.0" }          
],
+               "costs": [
+                       { "name": "Network", "value": "0.0" },
+                       { "name": "Disk I/O", "value": "0.0" },
+                       { "name": "CPU", "value": "0.0" },
+                       { "name": "Cumulative Network", "value": "0.0" },
+                       { "name": "Cumulative Disk I/O", "value": "0.0" },
+                       { "name": "Cumulative CPU", "value": "0.0" }
+               ],
+               "compiler_hints": [
+                       { "name": "Output Size (bytes)", "value": "(none)" },
+                       { "name": "Output Cardinality", "value": "(none)" },
+                       { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
+                       { "name": "Filter Factor", "value": "(none)" }          
]
+       },
+       {
+               "id": ,
+               "type": "pact",
+               "pact": "Map",
+               "contents": "to: Row",
+               "parallelism": "1",
+               "predecessors": [
+                       {"id": , "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
+               ],
+               "driver_strategy": "Map",
+               "global_properties": [
+                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
+                       { "name": "Partitioning Order", "value": "(none)" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "local_properties": [
+                       { "name": "Order", "value": "(none)" },
+                       { "name": "Grouping", "value": "not grouped" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "estimates": [
+                       { "name": "Est. Output Size", "value": "(unknown)" },
+                       { "name": "Est. Cardinality", "value": "0.0" }          
],
+               "costs": [
+                       { "name": "Network", "value": "0.0" },
+                       { "name": "Disk I/O", "value": "0.0" },
+                       { "name": "CPU", "value": "0.0" },
+                       { "name": "Cumulative Network", "value": "0.0" },
+                       { "name": "Cumulative Disk I/O", "value": "0.0" },
+                       { "name": "Cumulative CPU", "value": "0.0" }
+               ],
+               "compiler_hints": [
+                       { "name": "Output Size (bytes)", "value": "(none)" },
+                       { "name": "Output Cardinality", "value": "(none)" },
+                       { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
+                       { "name": "Filter Factor", "value": "(none)" }          
]
+       },
+       {
+               "id": ,
+               "type": "sink",
+               "pact": "Data Sink",
+               "contents": "UnsafeMemoryAppendTableSink(d, e)",
+               "parallelism": "1",
+               "predecessors": [
+                       {"id": , "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
+               ],
+               "global_properties": [
+                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
+                       { "name": "Partitioning Order", "value": "(none)" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "local_properties": [
+                       { "name": "Order", "value": "(none)" },
+                       { "name": "Grouping", "value": "not grouped" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "estimates": [
+                       { "name": "Est. Output Size", "value": "(unknown)" },
+                       { "name": "Est. Cardinality", "value": "0.0" }          
],
+               "costs": [
+                       { "name": "Network", "value": "0.0" },
+                       { "name": "Disk I/O", "value": "0.0" },
+                       { "name": "CPU", "value": "0.0" },
+                       { "name": "Cumulative Network", "value": "0.0" },
+                       { "name": "Cumulative Disk I/O", "value": "0.0" },
+                       { "name": "Cumulative CPU", "value": "0.0" }
+               ],
+               "compiler_hints": [
+                       { "name": "Output Size (bytes)", "value": "(none)" },
+                       { "name": "Output Cardinality", "value": "(none)" },
+                       { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
+                       { "name": "Filter Factor", "value": "(none)" }          
]
+       }
+       ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain0.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain0.out
new file mode 100644
index 0000000..e75e320
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain0.out
@@ -0,0 +1,63 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable], fields=[d, 
e])
+  StreamTableSourceScan(table=[[default_catalog, default_database, 
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, 
id)])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Custom File source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom File source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "CsvTableSource(read fields: first, id)",
+    "pact" : "Operator",
+    "contents" : "CsvTableSource(read fields: first, id)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Map",
+    "pact" : "Operator",
+    "contents" : "Map",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "to: Row",
+    "pact" : "Operator",
+    "contents" : "to: Row",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+    "pact" : "Data Sink",
+    "contents" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain1.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain1.out
new file mode 100644
index 0000000..54ebc93
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testStatementSetExecutionExplain1.out
@@ -0,0 +1,121 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`targetTable`], 
fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`targetTable`], 
fields=[d, e])
+  BatchTableSourceScan(table=[[default_catalog, default_database, 
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, 
id)])
+
+== Physical Execution Plan ==
+{
+       "nodes": [
+
+       {
+               "id": ,
+               "type": "source",
+               "pact": "Data Source",
+               "contents": "CsvTableSource(read fields: first, id)",
+               "parallelism": "1",
+               "global_properties": [
+                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
+                       { "name": "Partitioning Order", "value": "(none)" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "local_properties": [
+                       { "name": "Order", "value": "(none)" },
+                       { "name": "Grouping", "value": "not grouped" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "estimates": [
+                       { "name": "Est. Output Size", "value": "0.0" },
+                       { "name": "Est. Cardinality", "value": "0.0" }          
],
+               "costs": [
+                       { "name": "Network", "value": "0.0" },
+                       { "name": "Disk I/O", "value": "0.0" },
+                       { "name": "CPU", "value": "0.0" },
+                       { "name": "Cumulative Network", "value": "0.0" },
+                       { "name": "Cumulative Disk I/O", "value": "0.0" },
+                       { "name": "Cumulative CPU", "value": "0.0" }
+               ],
+               "compiler_hints": [
+                       { "name": "Output Size (bytes)", "value": "(none)" },
+                       { "name": "Output Cardinality", "value": "(none)" },
+                       { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
+                       { "name": "Filter Factor", "value": "(none)" }          
]
+       },
+       {
+               "id": ,
+               "type": "pact",
+               "pact": "Map",
+               "contents": "to: Row",
+               "parallelism": "1",
+               "predecessors": [
+                       {"id": , "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
+               ],
+               "driver_strategy": "Map",
+               "global_properties": [
+                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
+                       { "name": "Partitioning Order", "value": "(none)" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "local_properties": [
+                       { "name": "Order", "value": "(none)" },
+                       { "name": "Grouping", "value": "not grouped" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "estimates": [
+                       { "name": "Est. Output Size", "value": "(unknown)" },
+                       { "name": "Est. Cardinality", "value": "0.0" }          
],
+               "costs": [
+                       { "name": "Network", "value": "0.0" },
+                       { "name": "Disk I/O", "value": "0.0" },
+                       { "name": "CPU", "value": "0.0" },
+                       { "name": "Cumulative Network", "value": "0.0" },
+                       { "name": "Cumulative Disk I/O", "value": "0.0" },
+                       { "name": "Cumulative CPU", "value": "0.0" }
+               ],
+               "compiler_hints": [
+                       { "name": "Output Size (bytes)", "value": "(none)" },
+                       { "name": "Output Cardinality", "value": "(none)" },
+                       { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
+                       { "name": "Filter Factor", "value": "(none)" }          
]
+       },
+       {
+               "id": ,
+               "type": "sink",
+               "pact": "Data Sink",
+               "contents": "UnsafeMemoryAppendTableSink(d, e)",
+               "parallelism": "1",
+               "predecessors": [
+                       {"id": , "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
+               ],
+               "global_properties": [
+                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
+                       { "name": "Partitioning Order", "value": "(none)" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "local_properties": [
+                       { "name": "Order", "value": "(none)" },
+                       { "name": "Grouping", "value": "not grouped" },
+                       { "name": "Uniqueness", "value": "not unique" }
+               ],
+               "estimates": [
+                       { "name": "Est. Output Size", "value": "(unknown)" },
+                       { "name": "Est. Cardinality", "value": "0.0" }          
],
+               "costs": [
+                       { "name": "Network", "value": "0.0" },
+                       { "name": "Disk I/O", "value": "0.0" },
+                       { "name": "CPU", "value": "0.0" },
+                       { "name": "Cumulative Network", "value": "0.0" },
+                       { "name": "Cumulative Disk I/O", "value": "0.0" },
+                       { "name": "Cumulative CPU", "value": "0.0" }
+               ],
+               "compiler_hints": [
+                       { "name": "Output Size (bytes)", "value": "(none)" },
+                       { "name": "Output Cardinality", "value": "(none)" },
+                       { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
+                       { "name": "Filter Factor", "value": "(none)" }          
]
+       }
+       ]
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testStreamTableEnvironmentExecutionExplain.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testStreamTableEnvironmentExecutionExplain.out
new file mode 100644
index 0000000..e75e320
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testStreamTableEnvironmentExecutionExplain.out
@@ -0,0 +1,63 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.targetTable], fields=[d, e])
+  LogicalProject(first=[$0], id=[$1])
+    LogicalTableScan(table=[[default_catalog, default_database, sourceTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.targetTable], fields=[d, 
e])
+  StreamTableSourceScan(table=[[default_catalog, default_database, 
sourceTable]], fields=[first, id], source=[CsvTableSource(read fields: first, 
id)])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Custom File source",
+    "pact" : "Data Source",
+    "contents" : "Source: Custom File source",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "CsvTableSource(read fields: first, id)",
+    "pact" : "Operator",
+    "contents" : "CsvTableSource(read fields: first, id)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Map",
+    "pact" : "Operator",
+    "contents" : "Map",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "to: Row",
+    "pact" : "Operator",
+    "contents" : "to: Row",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+    "pact" : "Data Sink",
+    "contents" : "Sink: UnsafeMemoryAppendTableSink(d, e)",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}

Reply via email to