This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e0b6c121eaf [FLINK-33263][bugfix][table-planner] Remove redundant
transformation verification logic.
e0b6c121eaf is described below
commit e0b6c121eaf7aeb2974a45d199e452b022f07d29
Author: SuDewei <[email protected]>
AuthorDate: Mon Mar 4 17:42:38 2024 +0800
[FLINK-33263][bugfix][table-planner] Remove redundant transformation
verification logic.
---
.../org/apache/flink/api/dag/Transformation.java | 15 ----
.../table/planner/delegation/BatchPlanner.scala | 2 +-
.../table/planner/delegation/PlannerBase.scala | 3 +-
.../table/planner/delegation/StreamPlanner.scala | 2 +-
.../planner/plan/stream/sql/TableScanTest.xml | 95 ++++++++++++++++------
.../planner/plan/stream/sql/TableScanTest.scala | 12 ++-
.../flink/table/planner/utils/TableTestBase.scala | 51 +-----------
7 files changed, 81 insertions(+), 99 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
index 6256f9624f6..a0448697dd1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
@@ -19,7 +19,6 @@
package org.apache.flink.api.dag;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.operators.ResourceSpec;
@@ -603,20 +602,6 @@ public abstract class Transformation<T> {
+ '}';
}
- @VisibleForTesting
- public String toStringWithoutId() {
- return getClass().getSimpleName()
- + "{"
- + "name='"
- + name
- + '\''
- + ", outputType="
- + outputType
- + ", parallelism="
- + parallelism
- + '}';
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index cea10f7bb81..bb4c1b75a28 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -84,7 +84,7 @@ class BatchPlanner(
processors
}
- override def translateToPlan(execGraph: ExecNodeGraph):
util.List[Transformation[_]] = {
+ override protected def translateToPlan(execGraph: ExecNodeGraph):
util.List[Transformation[_]] = {
beforeTranslation()
val planner = createDummyPlanner()
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index b36edaa21d7..45788e6278e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -367,8 +367,7 @@ abstract class PlannerBase(
* @return
* The [[Transformation]] DAG that corresponds to the node DAG.
*/
- @VisibleForTesting
- def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]]
+ protected def translateToPlan(execGraph: ExecNodeGraph):
util.List[Transformation[_]]
def addExtraTransformation(transformation: Transformation[_]): Unit = {
if (!extraTransformations.contains(transformation)) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 894a37c8cf9..fb32326f117 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -78,7 +78,7 @@ class StreamPlanner(
override protected def getExecNodeGraphProcessors:
Seq[ExecNodeGraphProcessor] = Seq()
- override def translateToPlan(execGraph: ExecNodeGraph):
util.List[Transformation[_]] = {
+ override protected def translateToPlan(execGraph: ExecNodeGraph):
util.List[Transformation[_]] = {
beforeTranslation()
val planner = createDummyPlanner()
val transformations = execGraph.getRootNodes.map {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 8fe6835213c..465c4b0c67c 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -729,20 +729,25 @@ Calc(select=[ts, a, b], where=[>(a, 1)],
changelogMode=[I,UB,UA,D])
</TestCase>
<TestCase name="testSetParallelismForSource">
- <Resource name="sql">
- <![CDATA[SELECT * FROM src LEFT JOIN changelog_src on src.id =
changelog_src.id WHERE src.c > 1]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
LogicalProject(id=[$0], b=[$1], c=[$2], id0=[$3], a=[$4])
+- LogicalFilter(condition=[>($2, 1)])
+- LogicalJoin(condition=[=($0, $3)], joinType=[left])
:- LogicalTableScan(table=[[default_catalog, default_database, src]])
+- LogicalTableScan(table=[[default_catalog, default_database,
changelog_src]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
+
+== Optimized Physical Plan ==
+Join(joinType=[LeftOuterJoin], where=[=(id, id0)], select=[id, b, c, id0, a],
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+:- Exchange(distribution=[hash[id]])
+: +- Calc(select=[id, b, c], where=[>(c, 1)])
+: +- TableSourceScan(table=[[default_catalog, default_database, src,
filter=[]]], fields=[id, b, c])
++- Exchange(distribution=[hash[id]])
+ +- ChangelogNormalize(key=[id])
+ +- Exchange(distribution=[hash[id]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
changelog_src]], fields=[id, a])
+
+== Optimized Execution Plan ==
Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, b, c, id0, a],
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[id]])
: +- Calc(select=[id, b, c], where=[(c > 1)])
@@ -751,22 +756,60 @@ Join(joinType=[LeftOuterJoin], where=[(id = id0)],
select=[id, b, c, id0, a], le
+- ChangelogNormalize(key=[id])
+- Exchange(distribution=[hash[id]])
+- TableSourceScan(table=[[default_catalog, default_database,
changelog_src]], fields=[id, a])
-]]>
- </Resource>
- <Resource name="transformation">
- <![CDATA[
-TwoInputTransformation{name='Join(joinType=[LeftOuterJoin], where=[(id =
id0)], select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])', outputType=ROW<`id` INT, `b`
STRING, `c` INT, `id0` INT, `a` STRING>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +- PartitionTransformation{name='Exchange(distribution=[hash[id]])',
outputType=ROW<`id` INT, `b` STRING, `c`
INT>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +- OneInputTransformation{name='Calc(select=[id, b, c],
where=[(c > 1)])', outputType=ROW<`id` INT, `b` STRING, `c`
INT>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +-
SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id`
INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +-
LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog,
default_database, src, filter=[]]], fields=[id, b, c])', outputType=ROW<`id`
INT, `b` STRING, `c` INT>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=3}
- +- PartitionTransformation{name='Exchange(distribution=[hash[id]])',
outputType=ROW<`id` INT NOT NULL, `a`
STRING>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +- OneInputTransformation{name='ChangelogNormalize(key=[id])',
outputType=ROW<`id` INT NOT NULL, `a`
STRING>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +-
PartitionTransformation{name='Exchange(distribution=[hash[id]])',
outputType=ROW<`id` INT NOT NULL, `a`
STRING>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +- PartitionTransformation{name='Partitioner',
outputType=ROW<`id` INT NOT NULL, `a`
STRING>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +-
SourceTransformationWrapper{name='ChangeToDefaultParallel', outputType=ROW<`id`
INT NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=-1}
- +-
LegacySourceTransformation{name='TableSourceScan(table=[[default_catalog,
default_database, changelog_src]], fields=[id, a])', outputType=ROW<`id` INT
NOT NULL, `a` STRING>(org.apache.flink.table.data.RowData,
org.apache.flink.table.runtime.typeutils.RowDataSerializer), parallelism=5}
-]]>
- </Resource>
+
+== Physical Execution Plan ==
+{
+ "nodes" : [ {
+ "id" : ,
+ "type" : "Source: src[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog,
default_database, src, filter=[]]], fields=[id, b, c])",
+ "parallelism" : 3
+ }, {
+ "id" : ,
+ "type" : "Calc[]",
+ "pact" : "Operator",
+ "contents" : "[]:Calc(select=[id, b, c], where=[(c > 1)])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "REBALANCE",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Source: changelog_src[]",
+ "pact" : "Data Source",
+ "contents" : "[]:TableSourceScan(table=[[default_catalog,
default_database, changelog_src]], fields=[id, a])",
+ "parallelism" : 5
+ }, {
+ "id" : ,
+ "type" : "ChangelogNormalize[]",
+ "pact" : "Operator",
+ "contents" : "[]:ChangelogNormalize(key=[id])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH",
+ "side" : "second"
+ } ]
+ }, {
+ "id" : ,
+ "type" : "Join[]",
+ "pact" : "Operator",
+ "contents" : "[]:Join(joinType=[LeftOuterJoin], where=[(id = id0)],
select=[id, b, c, id0, a], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])",
+ "parallelism" : 10,
+ "predecessors" : [ {
+ "id" : ,
+ "ship_strategy" : "HASH",
+ "side" : "second"
+ }, {
+ "id" : ,
+ "ship_strategy" : "HASH",
+ "side" : "second"
+ } ]
+ } ]
+}]]>
+ </Resource>
</TestCase>
</Root>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index be1ae70d3fa..e55cf641991 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -779,7 +779,7 @@ class TableScanTest extends TableTestBase {
@Test
def testSetParallelismForSource(): Unit = {
val config = TableConfig.getDefault
-
config.set(ExecutionConfigOptions.TABLE_EXEC_SIMPLIFY_OPERATOR_NAME_ENABLED,
Boolean.box(false))
+ config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
Int.box(10))
val util = streamTestUtil(config)
util.addTable("""
@@ -809,8 +809,12 @@ class TableScanTest extends TableTestBase {
| 'enable-projection-push-down' = 'false'
|)
""".stripMargin)
- util.verifyTransformation(
- "SELECT * FROM src LEFT JOIN changelog_src " +
- "on src.id = changelog_src.id WHERE src.c > 1")
+ val query =
+ """
+ |SELECT *
+ |FROM src LEFT JOIN changelog_src
+ |ON src.id = changelog_src.id WHERE src.c > 1
+ |""".stripMargin
+ util.verifyExplain(query, ExplainDetail.JSON_EXECUTION_PLAN)
}
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index e5b418365be..1e006f3d94b 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.utils
import org.apache.flink.FlinkVersion
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo,
TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.configuration.BatchExecutionOptions
@@ -88,7 +87,7 @@ import org.junit.jupiter.api.extension.{BeforeEachCallback,
ExtendWith, Extensio
import org.junit.jupiter.api.io.TempDir
import org.junit.platform.commons.support.AnnotationSupport
-import java.io.{File, IOException, PrintWriter, StringWriter}
+import java.io.{File, IOException}
import java.net.URL
import java.nio.file.{Files, Path, Paths}
import java.time.Duration
@@ -703,20 +702,6 @@ abstract class TableTestUtilBase(test: TableTestBase,
isStreamingMode: Boolean)
withQueryBlockAlias = false)
}
- /**
- * Verify the AST (abstract syntax tree), the optimized exec plan and
tranformation for the given
- * SELECT query. Note: An exception will be thrown if the given sql can't be
translated to exec
- * plan and transformation result is wrong.
- */
- def verifyTransformation(query: String): Unit = {
- doVerifyPlan(
- query,
- Array.empty[ExplainDetail],
- withRowType = false,
- Array(PlanKind.AST, PlanKind.OPT_EXEC, PlanKind.TRANSFORM),
- withQueryBlockAlias = false)
- }
-
/** Verify the explain result for the given SELECT query. See more about
[[Table#explain()]]. */
def verifyExplain(query: String): Unit =
verifyExplain(getTableEnv.sqlQuery(query))
@@ -1055,14 +1040,6 @@ abstract class TableTestUtilBase(test: TableTestBase,
isStreamingMode: Boolean)
""
}
- // build transformation graph if `expectedPlans` contains TRANSFORM
- val transformation = if (expectedPlans.contains(PlanKind.TRANSFORM)) {
- val optimizedNodes = getPlanner.translateToExecNodeGraph(optimizedRels,
true)
- System.lineSeparator +
getTransformations(getPlanner.translateToPlan(optimizedNodes))
- } else {
- ""
- }
-
// check whether the sql equals to the expected if the `relNodes` are
translated from sql
assertSqlEqualsOrExpandFunc()
// check ast plan
@@ -1081,10 +1058,6 @@ abstract class TableTestUtilBase(test: TableTestBase,
isStreamingMode: Boolean)
if (expectedPlans.contains(PlanKind.OPT_EXEC)) {
assertEqualsOrExpand("optimized exec plan", optimizedExecPlan, expand =
false)
}
- // check transformation graph
- if (expectedPlans.contains(PlanKind.TRANSFORM)) {
- assertEqualsOrExpand("transformation", transformation, expand = false)
- }
}
private def doVerifyExplain(explainResult: String, extraDetails:
ExplainDetail*): Unit = {
@@ -1144,25 +1117,6 @@ abstract class TableTestUtilBase(test: TableTestBase,
isStreamingMode: Boolean)
replaceEstimatedCost(optimizedPlan)
}
- private def getTransformations(transformations:
java.util.List[Transformation[_]]): String = {
- val stringWriter = new StringWriter()
- val printWriter = new PrintWriter(stringWriter)
- transformations.foreach(transformation => getTransformation(printWriter,
transformation, 0))
- stringWriter.toString
- }
-
- private def getTransformation(
- printWriter: PrintWriter,
- transformation: Transformation[_],
- level: Int): Unit = {
- if (level == 0) {
- printWriter.println(transformation.toStringWithoutId)
- } else {
- printWriter.println(("\t" * level) + "+- " +
transformation.toStringWithoutId)
- }
- transformation.getInputs.foreach(child => getTransformation(printWriter,
child, level + 1))
- }
-
/** Replace the estimated costs for the given plan, because it may be
unstable. */
protected def replaceEstimatedCost(s: String): String = {
var str = s.replaceAll("\\r\\n", "\n")
@@ -1670,9 +1624,6 @@ object PlanKind extends Enumeration {
/** Optimized Execution Plan */
val OPT_EXEC: Value = Value("OPT_EXEC")
-
- /** Transformation */
- val TRANSFORM: Value = Value("TRANSFORM")
}
object TableTestUtil {