This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 69872241788 [FLINK-29112][table-planner] Print the lookup join hint on
the node in the original RelNode tree for easier debugging
69872241788 is described below
commit 69872241788f112cb3b9148269c3c494487d7bc4
Author: lincoln lee <[email protected]>
AuthorDate: Fri Aug 26 16:30:55 2022 +0800
[FLINK-29112][table-planner] Print the lookup join hint on the node in the
original RelNode tree for easier debugging
This closes #20686
---
.../planner/plan/utils/RelTreeWriterImpl.scala | 6 ++---
.../plan/stream/sql/join/LookupJoinTest.xml | 28 +++++++++++-----------
2 files changed, 17 insertions(+), 17 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
index 9a7328b91c1..79bfbaf8501 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
@@ -22,7 +22,7 @@ import
org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.core.{Join, TableScan}
+import org.apache.calcite.rel.core.{Correlate, Join, TableScan}
import org.apache.calcite.rel.externalize.RelWriterImpl
import org.apache.calcite.rel.hint.Hintable
import org.apache.calcite.sql.SqlExplainLevel
@@ -110,8 +110,8 @@ class RelTreeWriterImpl(
if (withJoinHint) {
rel match {
- case join: Join =>
- val joinHints = FlinkHints.getAllJoinHints(join.getHints)
+ case _: Join | _: Correlate =>
+ val joinHints =
FlinkHints.getAllJoinHints(rel.asInstanceOf[Hintable].getHints)
if (joinHints.nonEmpty) {
printValues.add(Pair.of("joinHints",
RelExplainUtil.hintsToString(joinHints)))
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index e021cd264c9..b171bda90bf 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -541,7 +541,7 @@ GroupAggregate(groupBy=[b], select=[b, COUNT_RETRACT(a) AS
EXPR$1, SUM_RETRACT(c
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{async=true, table=AsyncLookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -563,7 +563,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{async=true, table=AsyncLookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -585,7 +585,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{async=false, table=AsyncLookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -607,7 +607,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{async=false, table=AsyncLookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -629,7 +629,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{table=LookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -651,7 +651,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{table=LookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -673,7 +673,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{async=true, table=LookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -695,7 +695,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{async=true, table=LookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -787,7 +787,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
+- LogicalProject(a=[$0], name=[$6], age=[$7])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3,
output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss,
table=AsyncLookupTable, capacity=300}]]])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -1490,7 +1490,7 @@ Calc(select=[a, b, c, name])
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
+- LogicalProject(a=[$0], name=[$6], age=[$7])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{retry-strategy=fixed_delay, time-out=600s, max-attempts=3,
output-mode=allow_unordered, fixed-delay=10s, retry-predicate=lookup_miss,
table=AsyncLookupTable, capacity=300}]]])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -1582,7 +1582,7 @@ Sink(table=[default_catalog.default_database.Sink1],
fields=[a, name, age])
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
+- LogicalProject(a=[$0], name=[$6], age=[$7])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{time-out=600s, output-mode=allow_unordered, table=AsyncLookupTable,
capacity=300}]]])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -1699,7 +1699,7 @@ Calc(select=[a, b, PROCTIME_MATERIALIZE(proctime) AS
proctime, id, name, age])
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
+- LogicalProject(a=[$0], name=[$6], age=[$7])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{time-out=600s, output-mode=allow_unordered, table=AsyncLookupTable,
capacity=300}]]])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -1791,7 +1791,7 @@ Sink(table=[default_catalog.default_database.Sink1],
fields=[a, name, age])
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
+- LogicalProject(a=[$0], name=[$6], age=[$7])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s,
retry-predicate=lookup_miss, table=LookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])
@@ -1905,7 +1905,7 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS
proctime, rowtime, id, n
<![CDATA[== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.Sink1], fields=[a, name,
age])
+- LogicalProject(a=[$0], name=[$6], age=[$7])
- +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{retry-strategy=fixed_delay, max-attempts=3, fixed-delay=10s,
retry-predicate=lookup_miss, table=LookupTable}]]])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+- LogicalFilter(condition=[=($cor0.a, $0)])
+- LogicalSnapshot(period=[$cor0.proctime])