This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 264eff6084d [hotfix][tests][table-planner] Add two more cases to
verify the conflict of multiple LOOKUP hints
264eff6084d is described below
commit 264eff6084ddf8f03105345ab9decd85ef78e475
Author: lincoln lee <[email protected]>
AuthorDate: Fri Sep 2 22:58:31 2022 +0800
[hotfix][tests][table-planner] Add two more cases to verify the conflict of
multiple LOOKUP hints
This closes #20743
---
.../plan/stream/sql/join/LookupJoinTest.xml | 170 ++++++++++++++++++---
.../plan/stream/sql/join/LookupJoinTest.scala | 30 ++++
2 files changed, 178 insertions(+), 22 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index b171bda90bf..c0626e94636 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -1878,28 +1878,6 @@ Sink(table=[default_catalog.default_database.Sink1],
fields=[a, name, age])
}]]>
</Resource>
</TestCase>
- <TestCase name="testLeftJoinTemporalTable[LegacyTableSource=true]">
- <Resource name="sql">
- <![CDATA[SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR
SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
3}])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
- +- LogicalFilter(condition=[=($cor0.a, $0)])
- +- LogicalSnapshot(period=[$cor0.proctime])
- +- LogicalTableScan(table=[[default_catalog, default_database,
LookupTable, source: [TestTemporalTable(id, name, age)]]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime,
id, name, age])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testJoinWithRetryHint[LegacyTableSource=true]">
<Resource name="explain">
<![CDATA[== Abstract Syntax Tree ==
@@ -2011,6 +1989,154 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
rowtime=[$4], id=[$5], nam
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age])
+- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime,
id, name, age])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=true]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable',
'output-mode'='allow_unordered'),
+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ ON T.a = D.id
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP
inheritPath:[0] options:{output-mode=ordered, table=AsyncLookupTable}]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable],
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id,
name, age], async=[UNORDERED, 180000ms, 100])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLeftJoinTemporalTable[LegacyTableSource=true]">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR
SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime,
id, name, age])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=false]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable',
'output-mode'='allow_unordered'),
+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss',
'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ ON T.a = D.id
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1
+ ON T.a = D1.id
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP
inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3,
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+ :- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0]
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP
inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3,
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+ : +- LogicalFilter(condition=[=($cor0.a, $0)])
+ : +- LogicalSnapshot(period=[$cor0.proctime])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
AsyncLookupTable]])
+ +- LogicalFilter(condition=[=($cor1.a, $0)])
+ +- LogicalSnapshot(period=[$cor1.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
LookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age, id0, name0, age0])
++- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id,
name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+ +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable],
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id,
name, age], async=[UNORDERED, 180000ms, 100])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=true]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable',
'output-mode'='allow_unordered'),
+ LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss',
'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ ON T.a = D.id
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1
+ ON T.a = D1.id
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP
inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3,
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+ :- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0]
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP
inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3,
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+ : :- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+ : +- LogicalFilter(condition=[=($cor0.a, $0)])
+ : +- LogicalSnapshot(period=[$cor0.proctime])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+ +- LogicalFilter(condition=[=($cor1.a, $0)])
+ +- LogicalSnapshot(period=[$cor1.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age, id0, name0, age0])
++- LookupJoin(table=[default_catalog.default_database.LookupTable],
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id,
name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+ +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable],
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id,
name, age], async=[UNORDERED, 180000ms, 100])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=false]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable',
'output-mode'='allow_unordered'),
+ LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ *
+FROM MyTable AS T
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ ON T.a = D.id
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5],
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0]
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP
inheritPath:[0] options:{output-mode=ordered, table=AsyncLookupTable}]]])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]],
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
AsyncLookupTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id,
name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable],
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id,
name, age], async=[UNORDERED, 180000ms, 100])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 469b74fa287..943dfecdaf1 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -766,6 +766,36 @@ class LookupJoinTest(legacyTableSource: Boolean) extends
TableTestBase with Seri
util.verifyExecPlan(sql)
}
+ @Test
+ def testMultipleJoinHintsWithSameTableName(): Unit = {
+ // only the first hint will take effect
+ val sql =
+ """
+ |SELECT /*+ LOOKUP('table'='AsyncLookupTable',
'output-mode'='allow_unordered'),
+ | LOOKUP('table'='AsyncLookupTable',
'output-mode'='ordered') */ *
+ |FROM MyTable AS T
+ |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ | ON T.a = D.id
+ """.stripMargin
+ util.verifyExecPlan(sql)
+ }
+
+ @Test
+ def testMultipleJoinHintsWithDifferentTableName(): Unit = {
+ // both hints on corresponding tables will take effect
+ val sql =
+ """
+ |SELECT /*+ LOOKUP('table'='AsyncLookupTable',
'output-mode'='allow_unordered'),
+ | LOOKUP('table'='LookupTable',
'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay',
'fixed-delay'='10s', 'max-attempts'='3') */ *
+ |FROM MyTable AS T
+ |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ | ON T.a = D.id
+ |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1
+ | ON T.a = D1.id
+ """.stripMargin
+ util.verifyExecPlan(sql)
+ }
+
@Test
def testJoinSyncTableWithAsyncHint(): Unit = {
val sql =