This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4959ebfc349bb80f88582f9ef2d60d09f1140737 Author: godfreyhe <godfre...@163.com> AuthorDate: Fri Apr 24 23:04:33 2020 +0800 [FLINK-17267] [table] Introduce Table#explain api This closes #11905 --- flink-python/pyflink/table/explain_detail.py | 4 +- flink-python/pyflink/table/table.py | 14 ++++++ flink-python/pyflink/table/table_environment.py | 5 +-- .../{explain_detail.py => tests/test_explain.py} | 30 ++++++++----- .../table/tests/test_table_environment_api.py | 2 +- flink-python/pyflink/util/utils.py | 4 +- .../org/apache/flink/table/api/ExplainDetail.java | 4 +- .../java/org/apache/flink/table/api/Table.java | 10 +++++ .../table/api/internal/TableEnvironmentImpl.java | 8 +++- .../api/internal/TableEnvironmentInternal.java | 14 ++++++ .../apache/flink/table/api/internal/TableImpl.java | 6 +++ .../flink/table/api/internal/TableResultImpl.java | 51 +++++++++++++++++++--- .../table/planner/delegation/StreamPlanner.scala | 2 +- .../flink/table/api/TableEnvironmentTest.scala | 24 +++++++++- .../table/api/internal/BatchTableEnvImpl.scala | 8 ++-- .../flink/table/api/internal/TableEnvImpl.scala | 8 ++-- .../api/batch/BatchTableEnvironmentTest.scala | 22 ++++++++++ .../api/stream/StreamTableEnvironmentTest.scala | 22 ++++++++++ 18 files changed, 202 insertions(+), 36 deletions(-) diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/explain_detail.py index 48e7ce9..0cbcbe9 100644 --- a/flink-python/pyflink/table/explain_detail.py +++ b/flink-python/pyflink/table/explain_detail.py @@ -29,6 +29,6 @@ class ExplainDetail(object): # 0.0 memory} ESTIMATED_COST = 0 - # The changelog traits produced by a physical rel node. + # The changelog mode produced by a physical rel node. # e.g. GroupAggregate(..., changelogMode=[I,UA,D]) - CHANGELOG_TRAITS = 1 + CHANGELOG_MODE = 1 diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index 728d331..b74797e 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -21,6 +21,7 @@ from pyflink.java_gateway import get_gateway from pyflink.table.table_schema import TableSchema from pyflink.util.utils import to_jarray +from pyflink.util.utils import to_j_explain_detail_arr __all__ = ['Table', 'GroupedTable', 'GroupWindowedTable', 'OverWindowedTable', 'WindowGroupedTable'] @@ -718,6 +719,19 @@ class Table(object): """ self._j_table.executeInsert(table_path, overwrite) + def explain(self, *extra_details): + """ + Returns the AST of this table and the execution plan. + + :param extra_details: The extra explain details which the explain result should include, + e.g. estimated cost, changelog mode for streaming + :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail) + :return: The statement for which the AST and execution plan will be returned. + :rtype: str + """ + j_extra_details = to_j_explain_detail_arr(extra_details) + return self._j_table.explain(j_extra_details) + def __str__(self): return self._j_table.toString() diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 94ff785..91073d8 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -471,13 +471,12 @@ class TableEnvironment(object): def explain_sql(self, stmt, *extra_details): """ - Returns the AST of the specified statement and the execution plan to compute - the result of the given statement. + Returns the AST of the specified statement and the execution plan. :param stmt: The statement for which the AST and execution plan will be returned. :type stmt: str :param extra_details: The extra explain details which the explain result should include, - e.g. estimated cost, change log trait for streaming + e.g. estimated cost, changelog mode for streaming :type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail) :return: The statement for which the AST and execution plan will be returned. :rtype: str diff --git a/flink-python/pyflink/table/explain_detail.py b/flink-python/pyflink/table/tests/test_explain.py similarity index 58% copy from flink-python/pyflink/table/explain_detail.py copy to flink-python/pyflink/table/tests/test_explain.py index 48e7ce9..1dccaba 100644 --- a/flink-python/pyflink/table/explain_detail.py +++ b/flink-python/pyflink/table/tests/test_explain.py @@ -16,19 +16,25 @@ # limitations under the License. ################################################################################ -__all__ = ['ExplainDetail'] +from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase +from pyflink.table.explain_detail import ExplainDetail -class ExplainDetail(object): - """ - ExplainDetail defines the types of details for explain result. - """ +class StreamTableExplainTests(PyFlinkStreamTableTestCase): - # The cost information on physical rel node estimated by optimizer. - # e.g. TableSourceScan(..., cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, - # 0.0 memory} - ESTIMATED_COST = 0 + def test_explain(self): + t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c']) + result = t.group_by("c").select("a.sum, c as b").explain(ExplainDetail.CHANGELOG_MODE) - # The changelog traits produced by a physical rel node. - # e.g. GroupAggregate(..., changelogMode=[I,UA,D]) - CHANGELOG_TRAITS = 1 + assert isinstance(result, str) + + +if __name__ == '__main__': + import unittest + + try: + import xmlrunner + testRunner = xmlrunner.XMLTestRunner(output='target/test-reports') + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index bd279af..96987de 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -265,7 +265,7 @@ class StreamTableEnvironmentTests(TableEnvironmentTest, PyFlinkStreamTableTestCa source_sink_utils.TestAppendSink(field_names, field_types)) result = t_env.explain_sql( - "select a + 1, b, c from %s" % source, ExplainDetail.ESTIMATED_COST) + "select a + 1, b, c from %s" % source, ExplainDetail.CHANGELOG_MODE) assert isinstance(result, str) diff --git a/flink-python/pyflink/util/utils.py b/flink-python/pyflink/util/utils.py index 29a20da..065b537 100644 --- a/flink-python/pyflink/util/utils.py +++ b/flink-python/pyflink/util/utils.py @@ -134,8 +134,8 @@ def to_j_explain_detail_arr(p_extra_details): gateway = get_gateway() def to_j_explain_detail(p_extra_detail): - if p_extra_detail == ExplainDetail.CHANGELOG_TRAITS: - return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_TRAITS + if p_extra_detail == ExplainDetail.CHANGELOG_MODE: + return gateway.jvm.org.apache.flink.table.api.ExplainDetail.CHANGELOG_MODE else: return gateway.jvm.org.apache.flink.table.api.ExplainDetail.ESTIMATED_COST diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java index 6e9d014..5dfddc3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/ExplainDetail.java @@ -29,8 +29,8 @@ public enum ExplainDetail { ESTIMATED_COST, /** - * The changelog traits produced by a physical rel node. + * The changelog mode produced by a physical rel node. * e.g. GroupAggregate(..., changelogMode=[I,UA,D]) */ - CHANGELOG_TRAITS + CHANGELOG_MODE } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index 3362ca5..d1239e6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1473,4 +1473,14 @@ public interface Table { * @return The insert operation execution result. */ TableResult executeInsert(String tablePath, boolean overwrite); + + /** + * Returns the AST of this table and the execution plan to compute + * the result of this table. + * + * @param extraDetails The extra explain details which the explain result should include, + * e.g. estimated cost, change log trait for streaming + * @return AST and the execution plan. + */ + String explain(ExplainDetail... extraDetails); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 610627c..490e416 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -614,6 +614,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } @Override + public String explainInternal(List<Operation> operations, ExplainDetail... extraDetails) { + return planner.explain(operations, extraDetails); + } + + @Override public String[] getCompletionHints(String statement, int position) { return planner.getCompletionHints(statement, position); } @@ -873,6 +878,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(TableSchema.builder().field("result", DataTypes.STRING()).build()) .data(Collections.singletonList(Row.of(explanation))) + .setPrintStyle(TableResultImpl.PrintStyle.RAW_CONTENT) .build(); } else { @@ -997,7 +1003,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { protected ExplainDetail[] getExplainDetails(boolean extended) { if (extended) { if (isStreamingMode) { - return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_TRAITS }; + return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST, ExplainDetail.CHANGELOG_MODE }; } else { return new ExplainDetail[] { ExplainDetail.ESTIMATED_COST }; } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java index 2319538..228713c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentInternal.java @@ -19,11 +19,13 @@ package org.apache.flink.table.api.internal; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.ModifyOperation; +import org.apache.flink.table.operations.Operation; import java.util.List; @@ -56,4 +58,16 @@ interface TableEnvironmentInternal extends TableEnvironment { * @return the affected row counts (-1 means unknown). */ TableResult executeInternal(List<ModifyOperation> operations); + + /** + * Returns the AST of this table and the execution plan to compute + * the result of this table. + * + * @param operations The operations to be explained. + * @param extraDetails The extra explain details which the explain result should include, + * e.g. estimated cost, changelog mode for streaming + * @return AST and the execution plan. + */ + String explainInternal(List<Operation> operations, ExplainDetail... extraDetails); + } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java index 9b2e319..d3a63aa 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java @@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.AggregatedTable; +import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.FlatAggregateTable; import org.apache.flink.table.api.GroupWindow; import org.apache.flink.table.api.GroupWindowedTable; @@ -565,6 +566,11 @@ public class TableImpl implements Table { } @Override + public String explain(ExplainDetail... extraDetails) { + return tableEnvironment.explainInternal(Collections.singletonList(getQueryOperation()), extraDetails); + } + + @Override public String toString() { if (tableName == null) { tableName = "UnnamedTable$" + uniqueId.getAndIncrement(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java index 791ee89..a783976 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.utils.PrintUtils; @@ -51,16 +52,19 @@ class TableResultImpl implements TableResult { private final TableSchema tableSchema; private final ResultKind resultKind; private final Iterator<Row> data; + private final PrintStyle printStyle; private TableResultImpl( @Nullable JobClient jobClient, TableSchema tableSchema, ResultKind resultKind, - Iterator<Row> data) { + Iterator<Row> data, + PrintStyle printStyle) { this.jobClient = jobClient; this.tableSchema = Preconditions.checkNotNull(tableSchema, "tableSchema should not be null"); this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null"); this.data = Preconditions.checkNotNull(data, "data should not be null"); + this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null"); } @Override @@ -86,7 +90,18 @@ class TableResultImpl implements TableResult { @Override public void print() { Iterator<Row> it = collect(); - PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out)); + switch (printStyle) { + case TABLEAU: + PrintUtils.printAsTableauForm(getTableSchema(), it, new PrintWriter(System.out)); + break; + case RAW_CONTENT: + while (it.hasNext()) { + System.out.println(String.join(",", PrintUtils.rowToString(it.next()))); + } + break; + default: + throw new TableException("Unsupported print style: " + printStyle); + } } public static Builder builder() { @@ -101,6 +116,7 @@ class TableResultImpl implements TableResult { private TableSchema tableSchema = null; private ResultKind resultKind = null; private Iterator<Row> data = null; + private PrintStyle printStyle = PrintStyle.TABLEAU; private Builder() { } @@ -138,7 +154,7 @@ class TableResultImpl implements TableResult { } /** - * Specifies an row iterator as the execution result . + * Specifies an row iterator as the execution result. * * @param rowIterator a row iterator as the execution result. */ @@ -149,7 +165,7 @@ class TableResultImpl implements TableResult { } /** - * Specifies an row list as the execution result . + * Specifies an row list as the execution result. * * @param rowList a row list as the execution result. */ @@ -160,11 +176,36 @@ class TableResultImpl implements TableResult { } /** + * Specifies print style. Default is {@link PrintStyle#TABLEAU}. + */ + public Builder setPrintStyle(PrintStyle printStyle) { + Preconditions.checkNotNull(printStyle, "printStyle should not be null"); + this.printStyle = printStyle; + return this; + } + + /** * Returns a {@link TableResult} instance. */ public TableResult build() { - return new TableResultImpl(jobClient, tableSchema, resultKind, data); + return new TableResultImpl(jobClient, tableSchema, resultKind, data, printStyle); } } + /** + * PrintStyle defines the styles of printing. + */ + public enum PrintStyle { + /** + * print the result schema and content as tableau form. + */ + TABLEAU, + + /** + * only print the result content as raw form. + * column delimiter is ",", row delimiter is "\n". + */ + RAW_CONTENT + } + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index 959de06..5dc6a6f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -114,7 +114,7 @@ class StreamPlanner( } else { SqlExplainLevel.DIGEST_ATTRIBUTES } - val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_TRAITS) + val withChangelogTraits = extraDetails.contains(ExplainDetail.CHANGELOG_MODE) sb.append(ExecNodePlanDumper.dagToString( execNodes, explainLevel, diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 0e197ba..aa6bd2e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -911,7 +911,7 @@ class TableEnvironmentTest { assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) val actual = tableEnv.explainSql( - "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_TRAITS) + "select * from MyTable where a > 10", ExplainDetail.CHANGELOG_MODE) val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out") assertEquals(replaceStageId(expected), replaceStageId(actual)) } @@ -951,6 +951,28 @@ class TableEnvironmentTest { assertEquals(replaceStageId(expected), replaceStageId(actual)) } + @Test + def testTableExplain(): Unit = { + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val actual = tableEnv.sqlQuery("select * from MyTable where a > 10") + .explain(ExplainDetail.CHANGELOG_MODE) + val expected = TableTestUtil.readFromResource("/explain/testExplainSqlWithSelect.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + private def testUnsupportedExplain(explain: String): Unit = { try { tableEnv.executeSql(explain) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala index b3caf20..7ba116d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala @@ -217,7 +217,7 @@ abstract class BatchTableEnvImpl( * @param extended Flag to include detailed optimizer estimates. */ private[flink] def explain(table: Table, extended: Boolean): String = { - explain( + explainInternal( JCollections.singletonList(table.getQueryOperation.asInstanceOf[Operation]), getExplainDetails(extended): _*) } @@ -225,12 +225,14 @@ abstract class BatchTableEnvImpl( override def explain(table: Table): String = explain(table: Table, extended = false) override def explain(extended: Boolean): String = { - explain( + explainInternal( bufferedModifyOperations.asScala.map(_.asInstanceOf[Operation]).asJava, getExplainDetails(extended): _*) } - protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String = { + protected override def explainInternal( + operations: JList[Operation], + extraDetails: ExplainDetail*): String = { require(operations.asScala.nonEmpty, "operations should not be empty") val astList = operations.asScala.map { case queryOperation: QueryOperation => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 1f01186..4c6cbd4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.operators.DataSink import org.apache.flink.core.execution.JobClient import org.apache.flink.table.api._ +import org.apache.flink.table.api.internal.TableResultImpl.PrintStyle import org.apache.flink.table.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder} import org.apache.flink.table.catalog._ import org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, _} @@ -762,11 +763,12 @@ abstract class TableEnvImpl( case _: ShowViewsOperation => buildShowResult(listViews()) case explainOperation: ExplainOperation => - val explanation = explain(JCollections.singletonList(explainOperation.getChild)) + val explanation = explainInternal(JCollections.singletonList(explainOperation.getChild)) TableResultImpl.builder. resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(TableSchema.builder.field("result", DataTypes.STRING).build) .data(JCollections.singletonList(Row.of(explanation))) + .setPrintStyle(PrintStyle.RAW_CONTENT) .build case _ => throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG) @@ -1142,10 +1144,10 @@ abstract class TableEnvImpl( "Unsupported SQL query! explainSql() only accepts a single SQL query.") } - explain(operations, extraDetails: _*) + explainInternal(operations, extraDetails: _*) } - protected def explain(operations: JList[Operation], extraDetails: ExplainDetail*): String + protected def explainInternal(operations: JList[Operation], extraDetails: ExplainDetail*): String override def fromValues(values: Expression*): Table = { createTable(operationTreeBuilder.values(values: _*)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala index c928314..74d820e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala @@ -504,6 +504,28 @@ class BatchTableEnvironmentTest extends TableTestBase { assertEquals(replaceStageId(expected), replaceStageId(actual)) } + @Test + def testTableExplain(): Unit = { + val util = batchTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 10").explain() + val expected = readFromResource("testExplainSqlWithSelect1.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + private def testUnsupportedExplain(tableEnv: BatchTableEnvironment, explain: String): Unit = { try { tableEnv.executeSql(explain) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index 25bb536..bb710c6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -384,6 +384,28 @@ class StreamTableEnvironmentTest extends TableTestBase { assertEquals(replaceStageId(expected), replaceStageId(actual)) } + @Test + def testTableExplain(): Unit = { + val util = streamTestUtil() + val createTableStmt = + """ + |CREATE TABLE MyTable ( + | a bigint, + | b int, + | c varchar + |) with ( + | 'connector' = 'COLLECTION', + | 'is-bounded' = 'false' + |) + """.stripMargin + val tableResult1 = util.tableEnv.executeSql(createTableStmt) + assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) + + val actual = util.tableEnv.sqlQuery("select * from MyTable where a > 10").explain() + val expected = readFromResource("testExplainSqlWithSelect0.out") + assertEquals(replaceStageId(expected), replaceStageId(actual)) + } + private def prepareSchemaExpressionParser: (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {