This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 56b124bcfd661a295ab8772d265c12de25f690ab Author: Jane Chan <[email protected]> AuthorDate: Fri Jan 13 17:50:27 2023 +0800 [FLINK-30668][table-planner] Introduce ExplainFormat to Planner This closes #21662 --- .../apache/flink/table/api/internal/TableEnvironmentImpl.java | 2 +- .../main/java/org/apache/flink/table/delegation/Planner.java | 5 ++++- .../test/java/org/apache/flink/table/utils/PlannerMock.java | 4 +++- .../apache/flink/table/planner/delegation/BatchPlanner.scala | 11 +++++++++-- .../apache/flink/table/planner/delegation/StreamPlanner.scala | 11 +++++++++-- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index c8797876b91..c54c3d5fa2d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -718,7 +718,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { if (operations.isEmpty()) { return ""; } else { - return planner.explain(operations, extraDetails); + return planner.explain(operations, format, extraDetails); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java index ee969a19cc3..50fa196f641 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Transformation; import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.ExplainFormat; import org.apache.flink.table.api.PlanReference; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; @@ -91,10 +92,12 @@ public interface Planner { * * @param operations The collection of relational queries for which the AST and execution plan * will be returned. + * @param format The output format of explained statement. See more details at {@link + * ExplainFormat}. * @param extraDetails The extra explain details which the explain result should include, e.g. * estimated cost, changelog mode for streaming, displaying execution plan in json format */ - String explain(List<Operation> operations, ExplainDetail... extraDetails); + String explain(List<Operation> operations, ExplainFormat format, ExplainDetail... extraDetails); // --- Plan compilation and restore diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java index c45a184eb59..76c93fec313 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java @@ -20,6 +20,7 @@ package org.apache.flink.table.utils; import org.apache.flink.api.dag.Transformation; import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.ExplainFormat; import org.apache.flink.table.api.PlanReference; import org.apache.flink.table.delegation.ExtendedOperationExecutor; import org.apache.flink.table.delegation.InternalPlan; @@ -51,7 +52,8 @@ public class PlannerMock implements Planner { } @Override - public String explain(List<Operation> operations, ExplainDetail... extraDetails) { + public String explain( + List<Operation> operations, ExplainFormat format, ExplainDetail... extraDetails) { return null; } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index 00f901d30e5..e5038c6f872 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.common.RuntimeExecutionMode import org.apache.flink.api.dag.Transformation import org.apache.flink.configuration.ExecutionOptions -import org.apache.flink.table.api.{ExplainDetail, PlanReference, TableConfig, TableException} +import org.apache.flink.table.api.{ExplainDetail, ExplainFormat, PlanReference, TableConfig, TableException} import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.delegation.{Executor, InternalPlan} @@ -100,7 +100,14 @@ class BatchPlanner( transformations ++ planner.extraTransformations } - override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { + override def explain( + operations: util.List[Operation], + format: ExplainFormat, + extraDetails: ExplainDetail*): String = { + if (format != ExplainFormat.TEXT) { + throw new UnsupportedOperationException( + s"Unsupported explain format [${format.getClass.getCanonicalName}]") + } val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = getExplainGraphs(operations) val sb = new mutable.StringBuilder diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index 1cae6cd8b3a..90a9dcb397c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.dag.Transformation import org.apache.flink.configuration.ExecutionOptions import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader import org.apache.flink.streaming.api.graph.StreamGraph -import org.apache.flink.table.api.{ExplainDetail, PlanReference, TableConfig, TableException} +import org.apache.flink.table.api.{ExplainDetail, ExplainFormat, PlanReference, TableConfig, TableException} import org.apache.flink.table.api.PlanReference.{ContentPlanReference, FilePlanReference, ResourcePlanReference} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.delegation.{Executor, InternalPlan} @@ -92,7 +92,14 @@ class StreamPlanner( transformations ++ planner.extraTransformations } - override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { + override def explain( + operations: util.List[Operation], + format: ExplainFormat, + extraDetails: ExplainDetail*): String = { + if (format != ExplainFormat.TEXT) { + throw new UnsupportedOperationException( + s"Unsupported explain format [${format.getClass.getCanonicalName}]") + } val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = getExplainGraphs(operations) val sb = new mutable.StringBuilder
