This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a5fc5f28a1c3569da8de953f6cd1fad5371f6a4
Author: godfreyhe <godfre...@163.com>
AuthorDate: Wed Apr 29 10:57:32 2020 +0800

    [FLINK-17267] [table] Introduce TableEnvironment#explainSql api
---
 flink-python/pyflink/table/__init__.py             |  4 +-
 flink-python/pyflink/table/explain_detail.py       | 34 ++++++++++++
 flink-python/pyflink/table/table_environment.py    | 20 ++++++-
 .../table/tests/test_table_environment_api.py      | 29 +++++++++-
 flink-python/pyflink/util/utils.py                 | 20 +++++++
 .../org/apache/flink/table/api/ExplainDetail.java} | 45 +++++-----------
 .../apache/flink/table/api/TableEnvironment.java   | 15 +++++-
 .../table/api/internal/TableEnvironmentImpl.java   | 35 +++++++++++--
 .../org/apache/flink/table/delegation/Planner.java |  7 +--
 .../org/apache/flink/table/utils/PlannerMock.java  |  3 +-
 .../table/planner/delegation/BatchPlanner.scala    |  8 +--
 .../table/planner/delegation/StreamPlanner.scala   | 11 ++--
 .../resources/explain/testExplainSqlWithInsert.out | 31 +++++++++++
 .../resources/explain/testExplainSqlWithSelect.out | 21 ++++++++
 .../flink/table/api/TableEnvironmentTest.scala     | 57 ++++++++++++++++++++
 .../flink/table/planner/utils/TableTestBase.scala  |  2 +-
 .../table/api/internal/BatchTableEnvImpl.scala     | 20 +++++--
 .../flink/table/api/internal/TableEnvImpl.scala    | 18 +++++--
 .../apache/flink/table/planner/StreamPlanner.scala |  2 +-
 .../api/batch/BatchTableEnvironmentTest.scala      | 61 +++++++++++++++++++++-
 .../api/stream/StreamTableEnvironmentTest.scala    | 58 ++++++++++++++++++++
 .../flink/table/utils/MockTableEnvironment.scala   |  4 +-
 .../scala/resources/testExplainSqlWithInsert0.out  | 31 +++++++++++
 .../scala/resources/testExplainSqlWithInsert1.out  | 43 +++++++++++++++
 .../scala/resources/testExplainSqlWithSelect0.out  | 21 ++++++++
 .../scala/resources/testExplainSqlWithSelect1.out  | 27 ++++++++++
 26 files changed, 562 insertions(+), 65 deletions(-)

diff --git a/flink-python/pyflink/table/__init__.py 
b/flink-python/pyflink/table/__init__.py
index 140c6b3..1e367f3 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -70,6 +70,7 @@ from pyflink.table.sources import TableSource, CsvTableSource
 from pyflink.table.types import DataTypes, UserDefinedType, Row
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.udf import FunctionContext, ScalarFunction
+from pyflink.table.explain_detail import ExplainDetail
 
 __all__ = [
     'TableEnvironment',
@@ -93,5 +94,6 @@ __all__ = [
     'TableSchema',
     'FunctionContext',
     'ScalarFunction',
-    'SqlDialect'
+    'SqlDialect',
+    'ExplainDetail'
 ]
diff --git a/flink-python/pyflink/table/explain_detail.py 
b/flink-python/pyflink/table/explain_detail.py
new file mode 100644
index 0000000..48e7ce9
--- /dev/null
+++ b/flink-python/pyflink/table/explain_detail.py
@@ -0,0 +1,34 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+__all__ = ['ExplainDetail']
+
+
+class ExplainDetail(object):
+    """
+    ExplainDetail defines the types of details for explain result.
+    """
+
+    # The cost information on physical rel node estimated by optimizer.
+    # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 
2.4E9 io, 0.0 network,
+    # 0.0 memory}
+    ESTIMATED_COST = 0
+
+    # The changelog traits produced by a physical rel node.
+    # e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+    CHANGELOG_TRAITS = 1
diff --git a/flink-python/pyflink/table/table_environment.py 
b/flink-python/pyflink/table/table_environment.py
index d8e8c51..94ff785 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -36,7 +36,8 @@ from pyflink.table import Table
 from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, 
DataType, \
     _infer_schema_from_data, _create_converter, from_arrow_type, RowField, 
create_arrow_schema
 from pyflink.util import utils
-from pyflink.util.utils import get_j_env_configuration, is_local_deployment, 
load_java_class
+from pyflink.util.utils import get_j_env_configuration, is_local_deployment, 
load_java_class, \
+    to_j_explain_detail_arr
 
 __all__ = [
     'BatchTableEnvironment',
@@ -468,6 +469,23 @@ class TableEnvironment(object):
         else:
             return self._j_tenv.explain(table._j_table, extended)
 
+    def explain_sql(self, stmt, *extra_details):
+        """
+        Returns the AST of the specified statement and the execution plan to 
compute
+        the result of the given statement.
+
+        :param stmt: The statement for which the AST and execution plan will 
be returned.
+        :type stmt: str
+        :param extra_details: The extra explain details which the explain 
result should include,
+                              e.g. estimated cost, change log trait for 
streaming
+        :type extra_details: tuple[ExplainDetail] (variable-length arguments 
of ExplainDetail)
+        :return: The statement for which the AST and execution plan will be 
returned.
+        :rtype: str
+        """
+
+        j_extra_details = to_j_explain_detail_arr(extra_details)
+        return self._j_tenv.explainSql(stmt, j_extra_details)
+
     def sql_query(self, query):
         """
         Evaluates a SQL query on registered tables and retrieves the result as 
a
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index 87c8023..bd279af 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -35,8 +35,8 @@ from pyflink.table.types import RowType
 from pyflink.testing import source_sink_utils
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, 
PyFlinkBatchTableTestCase, \
     PyFlinkBlinkBatchTableTestCase
-from pyflink.util.exceptions import TableException
 from pyflink.util.utils import get_j_env_configuration
+from pyflink.table.explain_detail import ExplainDetail
 
 
 class TableEnvironmentTest(object):
@@ -242,6 +242,33 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, 
PyFlinkStreamTableTestCa
         actual = t_env.explain(extended=True)
         assert isinstance(actual, str)
 
+    def test_explain_sql_without_explain_detail(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", 
"Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), 
DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+
+        result = t_env.explain_sql("select a + 1, b, c from %s" % source)
+
+        assert isinstance(result, str)
+
+    def test_explain_sql_with_explain_detail(self):
+        t_env = self.t_env
+        source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", 
"Hello")], ["a", "b", "c"])
+        field_names = ["a", "b", "c"]
+        field_types = [DataTypes.BIGINT(), DataTypes.STRING(), 
DataTypes.STRING()]
+        t_env.register_table_sink(
+            "sinks",
+            source_sink_utils.TestAppendSink(field_names, field_types))
+
+        result = t_env.explain_sql(
+            "select a + 1, b, c from %s" % source, 
ExplainDetail.ESTIMATED_COST)
+
+        assert isinstance(result, str)
+
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
diff --git a/flink-python/pyflink/util/utils.py 
b/flink-python/pyflink/util/utils.py
index 89db742..29a20da 100644
--- a/flink-python/pyflink/util/utils.py
+++ b/flink-python/pyflink/util/utils.py
@@ -125,3 +125,23 @@ def add_jars_to_context_class_loader(jar_urls):
     new_classloader = gateway.jvm.java.net.URLClassLoader(
         to_jarray(gateway.jvm.java.net.URL, j_urls), context_classloader)
     gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
+
+
+def to_j_explain_detail_arr(p_extra_details):
+    # sphinx will check "import loop" when generating doc,
+    # use local import to avoid above error
+    from pyflink.table.explain_detail import ExplainDetail
+    gateway = get_gateway()
+
+    def to_j_explain_detail(p_extra_detail):
+        if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS:
+            return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS
+        else:
+            return 
gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST
+
+    _len = len(p_extra_details) if p_extra_details else 0
+    j_arr = 
gateway.new_array(gateway.jvm.org.apache.flink.table.api.ExplainDetail, _len)
+    for i in range(0, _len):
+        j_arr[i] = to_j_explain_detail(p_extra_details[i])
+
+    return j_arr
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
similarity index 50%
copy from 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
copy to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
index 92f50ff..6e9d014 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java
@@ -16,38 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.utils;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.operations.ModifyOperation;
-import org.apache.flink.table.operations.Operation;
-
-import java.util.List;
+package org.apache.flink.table.api;
 
 /**
- * Mocking {@link Planner} for tests.
+ * ExplainDetail defines the types of details for explain result.
  */
-public class PlannerMock implements Planner {
-
-       @Override
-       public Parser getParser() {
-               return new ParserMock();
-       }
-
-       @Override
-       public List<Transformation<?>> translate(List<ModifyOperation> 
modifyOperations) {
-               return null;
-       }
-
-       @Override
-       public String explain(List<Operation> operations, boolean extended) {
-               return null;
-       }
-
-       @Override
-       public String[] getCompletionHints(String statement, int position) {
-               return new String[0];
-       }
+public enum ExplainDetail {
+       /**
+        * The cost information on physical rel node estimated by optimizer.
+        * e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 
2.4E9 io, 0.0 network, 0.0 memory}
+        */
+       ESTIMATED_COST,
+
+       /**
+        * The changelog traits produced by a physical rel node.
+        * e.g. GroupAggregate(..., changelogMode=[I,UA,D])
+        */
+       CHANGELOG_TRAITS
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index d855b21..12d21ec 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -828,7 +828,7 @@ public interface TableEnvironment {
         * the result of the given {@link Table}.
         *
         * @param table The table for which the AST and execution plan will be 
returned.
-        * @param extended if the plan should contain additional properties 
such as
+        * @param extended if the plan should contain additional properties,
         * e.g. estimated cost, traits
         */
        String explain(Table table, boolean extended);
@@ -837,12 +837,23 @@ public interface TableEnvironment {
         * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
         * the result of multiple-sinks plan.
         *
-        * @param extended if the plan should contain additional properties 
such as
+        * @param extended if the plan should contain additional properties,
         * e.g. estimated cost, traits
         */
        String explain(boolean extended);
 
        /**
+        * Returns the AST of the specified statement and the execution plan to 
compute
+        * the result of the given statement.
+        *
+        * @param statement The statement for which the AST and execution plan 
will be returned.
+        * @param extraDetails The extra explain details which the explain 
result should include,
+        *   e.g. estimated cost, change log trait for streaming
+        * @return AST and the execution plan.
+        */
+       String explainSql(String statement, ExplainDetail... extraDetails);
+
+       /**
         * Returns completion hints for the given statement at the given cursor 
position.
         * The completion happens case insensitively.
         *
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1ca045b..610627c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.SqlParserException;
 import org.apache.flink.table.api.Table;
@@ -136,6 +137,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
        protected final FunctionCatalog functionCatalog;
        protected final Planner planner;
        protected final Parser parser;
+       private final boolean isStreamingMode;
        private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
                        "Unsupported SQL query! sqlUpdate() only accepts a 
single SQL statement of type " +
                        "INSERT, CREATE TABLE, DROP TABLE, ALTER TABLE, USE 
CATALOG, USE [CATALOG.]DATABASE, " +
@@ -179,6 +181,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                this.functionCatalog = functionCatalog;
                this.planner = planner;
                this.parser = planner.getParser();
+               this.isStreamingMode = isStreamingMode;
                this.operationTreeBuilder = OperationTreeBuilder.create(
                        tableConfig,
                        functionCatalog.asLookup(parser::parseIdentifier),
@@ -589,14 +592,25 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
 
        @Override
        public String explain(Table table, boolean extended) {
-               return 
planner.explain(Collections.singletonList(table.getQueryOperation()), extended);
+               return 
planner.explain(Collections.singletonList(table.getQueryOperation()), 
getExplainDetails(extended));
        }
 
        @Override
        public String explain(boolean extended) {
                List<Operation> operations = bufferedModifyOperations.stream()
-                       .map(o -> (Operation) o).collect(Collectors.toList());
-               return planner.explain(operations, extended);
+                               .map(o -> (Operation) 
o).collect(Collectors.toList());
+               return planner.explain(operations, getExplainDetails(extended));
+       }
+
+       @Override
+       public String explainSql(String statement, ExplainDetail... 
extraDetails) {
+               List<Operation> operations = parser.parse(statement);
+
+               if (operations.size() != 1) {
+                       throw new TableException("Unsupported SQL query! 
explainSql() only accepts a single SQL query.");
+               }
+
+               return planner.explain(operations, extraDetails);
        }
 
        @Override
@@ -854,7 +868,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                } else if (operation instanceof ShowViewsOperation) {
                        return buildShowResult(listViews());
                } else if (operation instanceof ExplainOperation) {
-                       String explanation = 
planner.explain(Collections.singletonList(((ExplainOperation) 
operation).getChild()), false);
+                       String explanation = 
planner.explain(Collections.singletonList(((ExplainOperation) 
operation).getChild()));
                        return TableResultImpl.builder()
                                        
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
                                        
.tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build())
@@ -979,6 +993,19 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                bufferedModifyOperations.addAll(modifyOperations);
        }
 
+       @VisibleForTesting
+       protected ExplainDetail[] getExplainDetails(boolean extended) {
+               if (extended) {
+                       if (isStreamingMode) {
+                               return new ExplainDetail[] { 
ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS };
+                       } else {
+                               return new ExplainDetail[] { 
ExplainDetail.ESTIMATED_COST };
+                       }
+               } else {
+                       return new ExplainDetail[0];
+               }
+       }
+
        private void registerTableSourceInternal(String name, TableSource<?> 
tableSource) {
                validateTableSource(tableSource);
                ObjectIdentifier objectIdentifier = 
catalogManager.qualifyIdentifier(UnresolvedIdentifier.of(name));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index 5bb9266..d926e3a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -79,10 +80,10 @@ public interface Planner {
         *
         * @param operations The collection of relational queries for which the 
AST
         * and execution plan will be returned.
-        * @param extended if the plan should contain additional properties 
such as
-        * e.g. estimated cost, traits
+        * @param extraDetails The extra explain details which the explain 
result should include,
+        *   e.g. estimated cost, change log trait for streaming
         */
-       String explain(List<Operation> operations, boolean extended);
+       String explain(List<Operation> operations, ExplainDetail... 
extraDetails);
 
        /**
         * Returns completion hints for the given statement at the given cursor 
position.
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index 92f50ff..42b5403 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.utils;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.operations.ModifyOperation;
@@ -42,7 +43,7 @@ public class PlannerMock implements Planner {
        }
 
        @Override
-       public String explain(List<Operation> operations, boolean extended) {
+       public String explain(List<Operation> operations, ExplainDetail... 
extraDetails) {
                return null;
        }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 9161753..f97e015 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, 
ModifyOperation, Operation, QueryOperation}
@@ -78,7 +78,7 @@ class BatchPlanner(
     }
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): 
String = {
+  override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
@@ -122,10 +122,10 @@ class BatchPlanner(
 
     sb.append("== Optimized Logical Plan ==")
     sb.append(System.lineSeparator)
-    val explainLevel = if (extended) {
+    val explainLevel = if 
(extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
       SqlExplainLevel.ALL_ATTRIBUTES
     } else {
-      SqlExplainLevel.DIGEST_ATTRIBUTES
+      SqlExplainLevel.EXPPLAN_ATTRIBUTES
     }
     sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
     sb.append(System.lineSeparator)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 7006533..959de06 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
ObjectIdentifier}
 import org.apache.flink.table.delegation.Executor
 import org.apache.flink.table.operations.{CatalogSinkModifyOperation, 
ModifyOperation, Operation, QueryOperation}
@@ -69,7 +69,7 @@ class StreamPlanner(
     }
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): 
String = {
+  override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
     require(operations.nonEmpty, "operations should not be empty")
     val sinkRelNodes = operations.map {
       case queryOperation: QueryOperation =>
@@ -109,11 +109,12 @@ class StreamPlanner(
 
     sb.append("== Optimized Logical Plan ==")
     sb.append(System.lineSeparator)
-    val (explainLevel, withChangelogTraits) = if (extended) {
-      (SqlExplainLevel.ALL_ATTRIBUTES, true)
+    val explainLevel = if 
(extraDetails.contains(ExplainDetail.ESTIMATED_COST)) {
+      SqlExplainLevel.ALL_ATTRIBUTES
     } else {
-      (SqlExplainLevel.DIGEST_ATTRIBUTES, false)
+      SqlExplainLevel.DIGEST_ATTRIBUTES
     }
+    val withChangelogTraits = 
extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS)
     sb.append(ExecNodePlanDumper.dagToString(
       execNodes,
       explainLevel,
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
new file mode 100644
index 0000000..870269f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithInsert.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, 
e])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[>($0, 10)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Sink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, e])
++- Calc(select=[a, b], where=[>(a, 10)])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]], fields=[a, b, c])
+
+== Physical Execution Plan ==
+ : Data Source
+       content : Source: Custom Source
+
+        : Operator
+               content : 
SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])
+               ship_strategy : FORWARD
+
+                : Operator
+                       content : Calc(select=[a, b], where=[(a > 10)])
+                       ship_strategy : FORWARD
+
+                        : Operator
+                               content : SinkConversionToRow
+                               ship_strategy : FORWARD
+
+                                : Data Sink
+                                       content : Sink: Unnamed
+                                       ship_strategy : FORWARD
+
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
new file mode 100644
index 0000000..0c87ae3
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/explain/testExplainSqlWithSelect.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalFilter(condition=[>($0, 10)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(a, b, c)]]])
+
+== Optimized Logical Plan ==
+Calc(select=[a, b, c], where=[>(a, 10)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: 
[CollectionTableSource(a, b, c)]]], fields=[a, b, c], changelogMode=[I])
+
+== Physical Execution Plan ==
+ : Data Source
+       content : Source: Custom Source
+
+        : Operator
+               content : 
SourceConversion(table=[default_catalog.default_database.MyTable, source: 
[CollectionTableSource(a, b, c)]], fields=[a, b, c])
+               ship_strategy : FORWARD
+
+                : Operator
+                       content : Calc(select=[a, b, c], where=[(a > 10)])
+                       ship_strategy : FORWARD
+
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index a27b47a..0e197ba 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -894,6 +894,63 @@ class TableEnvironmentTest {
     testUnsupportedExplain("explain plan as json for select * from MyTable")
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = tableEnv.explainSql(
+      "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS)
+    val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = 
TableTestUtil.readFromResource("/explain/testExplainSqlWithInsert.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(explain: String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 4cd0f5d..2d50f43 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1091,7 +1091,7 @@ class TestingTableEnvironment private(
   }
 
   override def explain(extended: Boolean): String = {
-    planner.explain(bufferedOperations.toList, extended)
+    planner.explain(bufferedOperations.toList, getExplainDetails(extended): _*)
   }
 
   @throws[Exception]
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index efc38a5..b3caf20 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -217,16 +217,20 @@ abstract class BatchTableEnvImpl(
     * @param extended Flag to include detailed optimizer estimates.
     */
   private[flink] def explain(table: Table, extended: Boolean): String = {
-    
explain(JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
 extended)
+    explain(
+      
JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]),
+      getExplainDetails(extended): _*)
   }
 
   override def explain(table: Table): String = explain(table: Table, extended 
= false)
 
   override def explain(extended: Boolean): String = {
-    
explain(bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, 
extended)
+    explain(
+      bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava,
+      getExplainDetails(extended): _*)
   }
 
-   protected def explain(operations: JList[Operation], extended: Boolean): 
String = {
+  protected def explain(operations: JList[Operation], extraDetails: 
ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astList = operations.asScala.map {
       case queryOperation: QueryOperation =>
@@ -285,6 +289,8 @@ abstract class BatchTableEnvImpl(
 
     val env = dataSinks.head.getDataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan
+    // keep the behavior as before
+    val extended = extraDetails.contains(ExplainDetail.ESTIMATED_COST)
     val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
 
     s"== Abstract Syntax Tree ==" +
@@ -597,6 +603,14 @@ abstract class BatchTableEnvImpl(
     TableSchema.builder().fields(originalNames, fieldTypes).build()
   }
 
+  private def getExplainDetails(extended: Boolean): Array[ExplainDetail] = {
+    if (extended) {
+      Array(ExplainDetail.ESTIMATED_COST)
+    } else {
+      Array.empty
+    }
+  }
+
   protected def createDummyBatchTableEnv(): BatchTableEnvImpl
 
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 7c6f144..1f01186 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -762,14 +762,13 @@ abstract class TableEnvImpl(
       case _: ShowViewsOperation =>
         buildShowResult(listViews())
       case explainOperation: ExplainOperation =>
-        val explanation = explain(
-          JCollections.singletonList(explainOperation.getChild),
-          extended = false)
+        val explanation = 
explain(JCollections.singletonList(explainOperation.getChild))
         TableResultImpl.builder.
           resultKind(ResultKind.SUCCESS_WITH_CONTENT)
           .tableSchema(TableSchema.builder.field("result", 
DataTypes.STRING).build)
           .data(JCollections.singletonList(Row.of(explanation)))
           .build
+
       case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG)
     }
   }
@@ -1135,7 +1134,18 @@ abstract class TableEnvImpl(
     }
   }
 
-  protected def explain(operations: JList[Operation], extended: Boolean): 
String
+  override def explainSql(statement: String, extraDetails: ExplainDetail*): 
String = {
+    val operations = parser.parse(statement)
+
+    if (operations.size != 1) {
+      throw new TableException(
+        "Unsupported SQL query! explainSql() only accepts a single SQL query.")
+    }
+
+    explain(operations, extraDetails: _*)
+  }
+
+  protected def explain(operations: JList[Operation], extraDetails: 
ExplainDetail*): String
 
   override def fromValues(values: Expression*): Table = {
     createTable(operationTreeBuilder.values(values: _*))
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 756d9ca..d81ca1c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -120,7 +120,7 @@ class StreamPlanner(
     }.filter(Objects.nonNull).asJava
   }
 
-  override def explain(operations: util.List[Operation], extended: Boolean): 
String = {
+  override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
     require(operations.asScala.nonEmpty, "operations should not be empty")
     val astWithUpdatesAsRetractionTuples = operations.asScala.map {
       case queryOperation: QueryOperation =>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
index d09bd5d..c928314 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala
@@ -24,8 +24,7 @@ import org.apache.flink.table.api.{ResultKind, TableException}
 import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectPath}
 import 
org.apache.flink.table.runtime.stream.sql.FunctionITCase.{SimpleScalarFunction, 
TestUDF}
 import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.apache.flink.table.utils.TableTestUtil.{readFromResource, 
replaceStageId}
+import org.apache.flink.table.utils.TableTestUtil.{readFromResource, 
replaceStageId, _}
 import org.apache.flink.types.Row
 
 import org.hamcrest.Matchers.containsString
@@ -447,6 +446,64 @@ class BatchTableEnvironmentTest extends TableTestBase {
       "explain plan as json for select * from MyTable")
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithSelect1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val util = batchTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = util.tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithInsert1.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: 
String): Unit = {
     try {
       tableEnv.executeSql(explain)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index 439fadb..25bb536 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -326,6 +326,64 @@ class StreamTableEnvironmentTest extends TableTestBase {
     }
   }
 
+  @Test
+  def testExplainSqlWithSelect(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val actual = util.tableEnv.explainSql("select * from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithSelect0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
+  @Test
+  def testExplainSqlWithInsert(): Unit = {
+    val util = streamTestUtil()
+    val createTableStmt1 =
+      """
+        |CREATE TABLE MyTable (
+        |  a bigint,
+        |  b int,
+        |  c varchar
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult1 = util.tableEnv.executeSql(createTableStmt1)
+    assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)
+
+    val createTableStmt2 =
+      """
+        |CREATE TABLE MySink (
+        |  d bigint,
+        |  e int
+        |) with (
+        |  'connector' = 'COLLECTION',
+        |  'is-bounded' = 'false'
+        |)
+      """.stripMargin
+    val tableResult2 = util.tableEnv.executeSql(createTableStmt2)
+    assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
+
+    val actual = util.tableEnv.explainSql(
+      "insert into MySink select a, b from MyTable where a > 10")
+    val expected = readFromResource("testExplainSqlWithInsert0.out")
+    assertEquals(replaceStageId(expected), replaceStageId(actual))
+  }
+
   private def prepareSchemaExpressionParser:
     (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = 
{
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 8a9d9c4..312d980 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.utils
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, 
TableResult}
+import org.apache.flink.table.api.{ExplainDetail, Table, TableConfig, 
TableEnvironment, TableResult}
 import org.apache.flink.table.catalog.Catalog
 import org.apache.flink.table.descriptors.{ConnectTableDescriptor, 
ConnectorDescriptor}
 import org.apache.flink.table.expressions.Expression
@@ -74,6 +74,8 @@ class MockTableEnvironment extends TableEnvironment {
 
   override def explain(extended: Boolean): String = ???
 
+  override def explainSql(statement: String, extraDetails: ExplainDetail*): 
String = ???
+
   override def getCompletionHints(statement: String, position: Int): 
Array[String] = ???
 
   override def sqlQuery(query: String): Table = ???
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
new file mode 100644
index 0000000..bbd0f53
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert0.out
@@ -0,0 +1,31 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  LogicalProject(a=[$0], b=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamSink(name=[default_catalog.default_database.MySink], fields=[d, e])
+  DataStreamCalc(select=[a, b], where=[>(a, 10)])
+    StreamTableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+       content : collect elements with CollectionInputFormat
+
+        : Operator
+               content : from: (a, b)
+               ship_strategy : FORWARD
+
+                : Operator
+                       content : where: (>(a, 10)), select: (a, b)
+                       ship_strategy : FORWARD
+
+                        : Operator
+                               content : to: Row
+                               ship_strategy : FORWARD
+
+                                : Data Sink
+                                       content : Sink: Unnamed
+                                       ship_strategy : FORWARD
+
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
new file mode 100644
index 0000000..b904056
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithInsert1.out
@@ -0,0 +1,43 @@
+== Abstract Syntax Tree ==
+LogicalSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, 
e])
+  LogicalProject(a=[$0], b=[$1])
+    LogicalFilter(condition=[>($0, 10)])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetSink(name=[`default_catalog`.`default_database`.`MySink`], fields=[d, 
e])
+  DataSetCalc(select=[a, b], where=[>(a, 10)])
+    BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+        : Map
+               content : from: (a, b)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : Map
+               Partitioning : RANDOM_PARTITIONED
+
+                : FlatMap
+                       content : where: (>(a, 10)), select: (a, b)
+                       ship_strategy : Forward
+                       exchange_mode : PIPELINED
+                       driver_strategy : FlatMap
+                       Partitioning : RANDOM_PARTITIONED
+
+                        : Map
+                               content : to: Row
+                               ship_strategy : Forward
+                               exchange_mode : PIPELINED
+                               driver_strategy : Map
+                               Partitioning : RANDOM_PARTITIONED
+
+                                : Data Sink
+                                       content : 
org.apache.flink.api.java.io.LocalCollectionOutputFormat
+                                       ship_strategy : Forward
+                                       exchange_mode : PIPELINED
+                                       Partitioning : RANDOM_PARTITIONED
+
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
new file mode 100644
index 0000000..4459ad6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect0.out
@@ -0,0 +1,21 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataStreamCalc(select=[a, b, c], where=[>(a, 10)])
+  StreamTableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+       content : collect elements with CollectionInputFormat
+
+        : Operator
+               content : Map
+               ship_strategy : FORWARD
+
+                : Operator
+                       content : where: (>(a, 10)), select: (a, b, c)
+                       ship_strategy : FORWARD
+
diff --git 
a/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
new file mode 100644
index 0000000..91e87ee
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/resources/testExplainSqlWithSelect1.out
@@ -0,0 +1,27 @@
+== Abstract Syntax Tree ==
+LogicalProject(a=[$0], b=[$1], c=[$2])
+  LogicalFilter(condition=[>($0, 10)])
+    LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Logical Plan ==
+DataSetCalc(select=[a, b, c], where=[>(a, 10)])
+  BatchTableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c], source=[CollectionTableSource(a, b, c)])
+
+== Physical Execution Plan ==
+ : Data Source
+       content : collect elements with CollectionInputFormat
+       Partitioning : RANDOM_PARTITIONED
+
+        : FlatMap
+               content : where: (>(a, 10)), select: (a, b, c)
+               ship_strategy : Forward
+               exchange_mode : PIPELINED
+               driver_strategy : FlatMap
+               Partitioning : RANDOM_PARTITIONED
+
+                : Data Sink
+                       content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
+                       ship_strategy : Forward
+                       exchange_mode : PIPELINED
+                       Partitioning : RANDOM_PARTITIONED
+

Reply via email to