This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e0b6c121eaf [FLINK-33263][bugfix][table-planner] Remove redundant 
transformation verification logic.
e0b6c121eaf is described below

commit e0b6c121eaf7aeb2974a45d199e452b022f07d29
Author: SuDewei <[email protected]>
AuthorDate: Mon Mar 4 17:42:38 2024 +0800

    [FLINK-33263][bugfix][table-planner] Remove redundant transformation 
verification logic.
---
 .../org/apache/flink/api/dag/Transformation.java   | 15 ----
 .../table/planner/delegation/BatchPlanner.scala    |  2 +-
 .../table/planner/delegation/PlannerBase.scala     |  3 +-
 .../table/planner/delegation/StreamPlanner.scala   |  2 +-
 .../planner/plan/stream/sql/TableScanTest.xml      | 95 ++++++++++++++++------
 .../planner/plan/stream/sql/TableScanTest.scala    | 12 ++-
 .../flink/table/planner/utils/TableTestBase.scala  | 51 +-----------
 7 files changed, 81 insertions(+), 99 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java 
b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
index 6256f9624f6..a0448697dd1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.dag;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.operators.ResourceSpec;
@@ -603,20 +602,6 @@ public abstract class Transformation<T> {
                 + '}';
     }
 
-    @VisibleForTesting
-    public String toStringWithoutId() {
-        return getClass().getSimpleName()
-                + "{"
-                + "name='"
-                + name
-                + '\''
-                + ", outputType="
-                + outputType
-                + ", parallelism="
-                + parallelism
-                + '}';
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index cea10f7bb81..bb4c1b75a28 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -84,7 +84,7 @@ class BatchPlanner(
     processors
   }
 
-  override def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
+  override protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
     beforeTranslation()
     val planner = createDummyPlanner()
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index b36edaa21d7..45788e6278e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -367,8 +367,7 @@ abstract class PlannerBase(
    * @return
    *   The [[Transformation]] DAG that corresponds to the node DAG.
    */
-  @VisibleForTesting
-  def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
+  protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]]
 
   def addExtraTransformation(transformation: Transformation[_]): Unit = {
     if (!extraTransformations.contains(transformation)) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 894a37c8cf9..fb32326f117 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -78,7 +78,7 @@ class StreamPlanner(
 
   override protected def getExecNodeGraphProcessors: 
Seq[ExecNodeGraphProcessor] = Seq()
 
-  override def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
+  override protected def translateToPlan(execGraph: ExecNodeGraph): 
util.List[Transformation[_]] = {
     beforeTranslation()
     val planner = createDummyPlanner()
     val transformations = execGraph.getRootNodes.map {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 8fe6835213c..465c4b0c67c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -729,20 +729,25 @@ Calc(select=[ts, a, b], where=[>(a, 1)], 
changelogMode=[I,UB,UA,D])
   </TestCase>
 
   <TestCase name="testSetParallelismForSource">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM src LEFT JOIN changelog_src on src.id = 
changelog_src.id WHERE src.c > 1]]>
-    </Resource>
-       <Resource name="ast">
-      <![CDATA[
+  <Resource name="explain">
+         <![CDATA[== Abstract Syntax Tree ==
 LogicalProject(id=[$0], b=[$1], c=[$2], id0=[$3], a=[$4])
 +- LogicalFilter(condition=[>($2, 1)])
    +- LogicalJoin(condition=[=($0, $3)], joinType=[left])
       :- LogicalTableScan(table=[[default_catalog, default_database, src]])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
changelog_src]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
+
+== Optimized Physical Plan ==
+Join(joinType=[LeftOuterJoin], where=[=(id, id0)], select=[id, b, c, id0, a], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+:- Exchange(distribution=[hash[id]])
+:  +- Calc(select=[id, b, c], where=[>(c, 1)])
+:     +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[id, b, c])
++- Exchange(distribution=[hash[id]])
+   +- ChangelogNormalize(key=[id])
+      +- Exchange(distribution=[hash[id]])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
changelog_src]], fields=[id, a])
+
+== Optimized Execution Plan ==
 Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
 :- Exchange(distribution=[hash[id]])
 :  +- Calc(select=[id, b, c], where=[(c > 1)])
@@ -751,22 +756,60 @@ Join(joinType=[LeftOuterJoin], where=[(id = id0)], 
select=[id, b, c, id0, a], le
    +- ChangelogNormalize(key=[id])
       +- Exchange(distribution=[hash[id]])
          +- TableSourceScan(table=[[default_catalog, default_database, 
changelog_src]], fields=[id, a])
-]]>
-       </Resource>
-       <Resource name="transformation">
-      <![CDATA[
-TwoInputTransformation{name='Join(joinType=[LeftOuterJoin], where=[(id = 
id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])', outputType=ROW<`id` INT, `b` 
STRING, `c` INT, `id0` INT, `a` STRING>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-       +- PartitionTransformation{name='Exchange(distribution=[hash[id]])', 
outputType=ROW<`id` INT, `b` STRING, `c` 
INT>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-               +- OneInputTransformation{name='Calc(select=[id, b, c], 
where=[(c > 1)])', outputType=ROW<`id` INT, `b` STRING, `c` 
INT>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-                       +- 
SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id` 
INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-                               +- 
LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog, 
default_database, src, filter=[]]], fields=[id, b, c])', outputType=ROW<`id` 
INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=3}
-       +- PartitionTransformation{name='Exchange(distribution=[hash[id]])', 
outputType=ROW<`id` INT NOT NULL, `a` 
STRING>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-               +- OneInputTransformation{name='ChangelogNormalize(key=[id])', 
outputType=ROW<`id` INT NOT NULL, `a` 
STRING>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-                       +- 
PartitionTransformation{name='Exchange(distribution=[hash[id]])', 
outputType=ROW<`id` INT NOT NULL, `a` 
STRING>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-                               +- PartitionTransformation{name='Partitioner', 
outputType=ROW<`id` INT NOT NULL, `a` 
STRING>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-                                       +- 
SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id` 
INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
-                                               +- 
LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog, 
default_database, changelog_src]], fields=[id, a])', outputType=ROW<`id` INT 
NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData, 
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=5}
-]]>
-       </Resource>
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: src[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, src, filter=[]]], fields=[id, b, c])",
+    "parallelism" : 3
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[id, b, c], where=[(c > 1)])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Source: changelog_src[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, changelog_src]], fields=[id, a])",
+    "parallelism" : 5
+  }, {
+    "id" : ,
+    "type" : "ChangelogNormalize[]",
+    "pact" : "Operator",
+    "contents" : "[]:ChangelogNormalize(key=[id])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Join[]",
+    "pact" : "Operator",
+    "contents" : "[]:Join(joinType=[LeftOuterJoin], where=[(id = id0)], 
select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey])",
+    "parallelism" : 10,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "HASH",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "HASH",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+  </Resource>
   </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index be1ae70d3fa..e55cf641991 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -779,7 +779,7 @@ class TableScanTest extends TableTestBase {
   @Test
   def testSetParallelismForSource(): Unit = {
     val config = TableConfig.getDefault
-    
config.set(ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED, 
Boolean.box(false))
+    config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
Int.box(10))
     val util = streamTestUtil(config)
 
     util.addTable("""
@@ -809,8 +809,12 @@ class TableScanTest extends TableTestBase {
                     |  'enable-projection-push-down' = 'false'
                     |)
       """.stripMargin)
-    util.verifyTransformation(
-      "SELECT * FROM src LEFT JOIN changelog_src " +
-        "on src.id = changelog_src.id WHERE src.c > 1")
+    val query =
+      """
+        |SELECT *
+        |FROM src LEFT JOIN changelog_src
+        |ON src.id = changelog_src.id WHERE src.c > 1
+        |""".stripMargin
+    util.verifyExplain(query, ExplainDetail.JSON_EXECUTION_PLAN)
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index e5b418365be..1e006f3d94b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.utils
 
 import org.apache.flink.FlinkVersion
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.BatchExecutionOptions
@@ -88,7 +87,7 @@ import org.junit.jupiter.api.extension.{BeforeEachCallback, 
ExtendWith, Extensio
 import org.junit.jupiter.api.io.TempDir
 import org.junit.platform.commons.support.AnnotationSupport
 
-import java.io.{File, IOException, PrintWriter, StringWriter}
+import java.io.{File, IOException}
 import java.net.URL
 import java.nio.file.{Files, Path, Paths}
 import java.time.Duration
@@ -703,20 +702,6 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
       withQueryBlockAlias = false)
   }
 
-  /**
-   * Verify the AST (abstract syntax tree), the optimized exec plan and 
tranformation for the given
-   * SELECT query. Note: An exception will be thrown if the given sql can't be 
translated to exec
-   * plan and transformation result is wrong.
-   */
-  def verifyTransformation(query: String): Unit = {
-    doVerifyPlan(
-      query,
-      Array.empty[ExplainDetail],
-      withRowType = false,
-      Array(PlanKind.AST, PlanKind.OPT_EXEC, PlanKind.TRANSFORM),
-      withQueryBlockAlias = false)
-  }
-
   /** Verify the explain result for the given SELECT query. See more about 
[[Table#explain()]]. */
   def verifyExplain(query: String): Unit = 
verifyExplain(getTableEnv.sqlQuery(query))
 
@@ -1055,14 +1040,6 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
       ""
     }
 
-    // build transformation graph if `expectedPlans` contains TRANSFORM
-    val transformation = if (expectedPlans.contains(PlanKind.TRANSFORM)) {
-      val optimizedNodes = getPlanner.translateToExecNodeGraph(optimizedRels, 
true)
-      System.lineSeparator + 
getTransformations(getPlanner.translateToPlan(optimizedNodes))
-    } else {
-      ""
-    }
-
     // check whether the sql equals to the expected if the `relNodes` are 
translated from sql
     assertSqlEqualsOrExpandFunc()
     // check ast plan
@@ -1081,10 +1058,6 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
     if (expectedPlans.contains(PlanKind.OPT_EXEC)) {
       assertEqualsOrExpand("optimized exec plan", optimizedExecPlan, expand = 
false)
     }
-    // check transformation graph
-    if (expectedPlans.contains(PlanKind.TRANSFORM)) {
-      assertEqualsOrExpand("transformation", transformation, expand = false)
-    }
   }
 
   private def doVerifyExplain(explainResult: String, extraDetails: 
ExplainDetail*): Unit = {
@@ -1144,25 +1117,6 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
     replaceEstimatedCost(optimizedPlan)
   }
 
-  private def getTransformations(transformations: 
java.util.List[Transformation[_]]): String = {
-    val stringWriter = new StringWriter()
-    val printWriter = new PrintWriter(stringWriter)
-    transformations.foreach(transformation => getTransformation(printWriter, 
transformation, 0))
-    stringWriter.toString
-  }
-
-  private def getTransformation(
-      printWriter: PrintWriter,
-      transformation: Transformation[_],
-      level: Int): Unit = {
-    if (level == 0) {
-      printWriter.println(transformation.toStringWithoutId)
-    } else {
-      printWriter.println(("\t" * level) + "+- " + 
transformation.toStringWithoutId)
-    }
-    transformation.getInputs.foreach(child => getTransformation(printWriter, 
child, level + 1))
-  }
-
   /** Replace the estimated costs for the given plan, because it may be 
unstable. */
   protected def replaceEstimatedCost(s: String): String = {
     var str = s.replaceAll("\\r\\n", "\n")
@@ -1670,9 +1624,6 @@ object PlanKind extends Enumeration {
 
   /** Optimized Execution Plan */
   val OPT_EXEC: Value = Value("OPT_EXEC")
-
-  /** Transformation */
-  val TRANSFORM: Value = Value("TRANSFORM")
 }
 
 object TableTestUtil {

Reply via email to