This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 56b124bcfd661a295ab8772d265c12de25f690ab
Author: Jane Chan <[email protected]>
AuthorDate: Fri Jan 13 17:50:27 2023 +0800

    [FLINK-30668][table-planner] Introduce ExplainFormat to Planner
    
    This closes #21662
---
 .../apache/flink/table/api/internal/TableEnvironmentImpl.java |  2 +-
 .../main/java/org/apache/flink/table/delegation/Planner.java  |  5 ++++-
 .../test/java/org/apache/flink/table/utils/PlannerMock.java   |  4 +++-
 .../apache/flink/table/planner/delegation/BatchPlanner.scala  | 11 +++++++++--
 .../apache/flink/table/planner/delegation/StreamPlanner.scala | 11 +++++++++--
 5 files changed, 26 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c8797876b91..c54c3d5fa2d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -718,7 +718,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         if (operations.isEmpty()) {
             return "";
         } else {
-            return planner.explain(operations, extraDetails);
+            return planner.explain(operations, format, extraDetails);
         }
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index ee969a19cc3..50fa196f641 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.Operation;
@@ -91,10 +92,12 @@ public interface Planner {
      *
      * @param operations The collection of relational queries for which the 
AST and execution plan
      *     will be returned.
+     * @param format The output format of explained statement. See more 
details at {@link
+     *     ExplainFormat}.
      * @param extraDetails The extra explain details which the explain result 
should include, e.g.
      *     estimated cost, changelog mode for streaming, displaying execution 
plan in json format
      */
-    String explain(List<Operation> operations, ExplainDetail... extraDetails);
+    String explain(List<Operation> operations, ExplainFormat format, 
ExplainDetail... extraDetails);
 
     // --- Plan compilation and restore
 
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index c45a184eb59..76c93fec313 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.utils;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
 import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.InternalPlan;
@@ -51,7 +52,8 @@ public class PlannerMock implements Planner {
     }
 
     @Override
-    public String explain(List<Operation> operations, ExplainDetail... 
extraDetails) {
+    public String explain(
+            List<Operation> operations, ExplainFormat format, ExplainDetail... 
extraDetails) {
         return null;
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 00f901d30e5..e5038c6f872 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.delegation
 import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ExecutionOptions
-import org.apache.flink.table.api.{ExplainDetail, PlanReference, TableConfig, 
TableException}
+import org.apache.flink.table.api.{ExplainDetail, ExplainFormat, 
PlanReference, TableConfig, TableException}
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.delegation.{Executor, InternalPlan}
@@ -100,7 +100,14 @@ class BatchPlanner(
     transformations ++ planner.extraTransformations
   }
 
-  override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
+  override def explain(
+      operations: util.List[Operation],
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String = {
+    if (format != ExplainFormat.TEXT) {
+      throw new UnsupportedOperationException(
+        s"Unsupported explain format [${format.getClass.getCanonicalName}]")
+    }
     val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = 
getExplainGraphs(operations)
 
     val sb = new mutable.StringBuilder
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 1cae6cd8b3a..90a9dcb397c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.ExecutionOptions
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader
 import org.apache.flink.streaming.api.graph.StreamGraph
-import org.apache.flink.table.api.{ExplainDetail, PlanReference, TableConfig, 
TableException}
+import org.apache.flink.table.api.{ExplainDetail, ExplainFormat, 
PlanReference, TableConfig, TableException}
 import org.apache.flink.table.api.PlanReference.{ContentPlanReference, 
FilePlanReference, ResourcePlanReference}
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
 import org.apache.flink.table.delegation.{Executor, InternalPlan}
@@ -92,7 +92,14 @@ class StreamPlanner(
     transformations ++ planner.extraTransformations
   }
 
-  override def explain(operations: util.List[Operation], extraDetails: 
ExplainDetail*): String = {
+  override def explain(
+      operations: util.List[Operation],
+      format: ExplainFormat,
+      extraDetails: ExplainDetail*): String = {
+    if (format != ExplainFormat.TEXT) {
+      throw new UnsupportedOperationException(
+        s"Unsupported explain format [${format.getClass.getCanonicalName}]")
+    }
     val (sinkRelNodes, optimizedRelNodes, execGraph, streamGraph) = 
getExplainGraphs(operations)
 
     val sb = new mutable.StringBuilder

Reply via email to