This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 264eff6084d [hotfix][tests][table-planner] Add two more cases to 
verify the conflict of multiple LOOKUP hints
264eff6084d is described below

commit 264eff6084ddf8f03105345ab9decd85ef78e475
Author: lincoln lee <[email protected]>
AuthorDate: Fri Sep 2 22:58:31 2022 +0800

    [hotfix][tests][table-planner] Add two more cases to verify the conflict of 
multiple LOOKUP hints
    
    This closes #20743
---
 .../plan/stream/sql/join/LookupJoinTest.xml        | 170 ++++++++++++++++++---
 .../plan/stream/sql/join/LookupJoinTest.scala      |  30 ++++
 2 files changed, 178 insertions(+), 22 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index b171bda90bf..c0626e94636 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -1878,28 +1878,6 @@ Sink(table=[default_catalog.default_database.Sink1], 
fields=[a, name, age])
 }]]>
     </Resource>
   </TestCase>
-  <TestCase name="testLeftJoinTemporalTable[LegacyTableSource=true]">
-    <Resource name="sql">
-      <![CDATA[SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR 
SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
-+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
3}])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-   +- LogicalFilter(condition=[=($cor0.a, $0)])
-      +- LogicalSnapshot(period=[$cor0.proctime])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
-+- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, 
id, name, age])
-   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testJoinWithRetryHint[LegacyTableSource=true]">
     <Resource name="explain">
       <![CDATA[== Abstract Syntax Tree ==
@@ -2011,6 +1989,154 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], 
rowtime=[$4], id=[$5], nam
 Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
 +- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, 
id, name, age])
    +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered'), 
+           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * 
+FROM MyTable AS T 
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+ ON T.a = D.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] 
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP 
inheritPath:[0] options:{output-mode=ordered, table=AsyncLookupTable}]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], 
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[UNORDERED, 180000ms, 100])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLeftJoinTemporalTable[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR 
SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
3}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[LeftOuterJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, 
id, name, age])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered'), 
+           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 
'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * 
+FROM MyTable AS T 
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+  ON T.a = D.id
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 
+  ON T.a = D1.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] 
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP 
inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, 
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] 
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP 
inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, 
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   :  +- LogicalFilter(condition=[=($cor0.a, $0)])
+   :     +- LogicalSnapshot(period=[$cor0.proctime])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable]])
+   +- LogicalFilter(condition=[=($cor1.a, $0)])
+      +- LogicalSnapshot(period=[$cor1.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age, id0, name0, age0])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[UNORDERED, 180000ms, 100])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testMultipleJoinHintsWithDifferentTableName[LegacyTableSource=true]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered'), 
+           LOOKUP('table'='LookupTable', 'retry-predicate'='lookup_miss', 
'retry-strategy'='fixed_delay', 'fixed-delay'='10s', 'max-attempts'='3') */ * 
+FROM MyTable AS T 
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+  ON T.a = D.id
+JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 
+  ON T.a = D1.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7], id0=[$8], name0=[$9], age0=[$10])
++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] 
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP 
inheritPath:[0] options:{retry-strategy=fixed_delay, max-attempts=3, 
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0, 0] 
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP 
inheritPath:[0, 0] options:{retry-strategy=fixed_delay, max-attempts=3, 
fixed-delay=10s, retry-predicate=lookup_miss, table=LookupTable}]]])
+   :  :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]], hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   :  +- LogicalFilter(condition=[=($cor0.a, $0)])
+   :     +- LogicalSnapshot(period=[$cor0.proctime])
+   :        +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable, source: [TestTemporalTable(id, name, age)]]])
+   +- LogicalFilter(condition=[=($cor1.a, $0)])
+      +- LogicalSnapshot(period=[$cor1.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
LookupTable, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age, id0, name0, age0])
++- LookupJoin(table=[default_catalog.default_database.LookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age, id0, name0, age0], retry=[lookup_miss, FIXED_DELAY, 10000ms, 3])
+   +- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[UNORDERED, 180000ms, 100])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testMultipleJoinHintsWithSameTableName[LegacyTableSource=false]">
+    <Resource name="sql">
+      <![CDATA[
+SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered'), 
+           LOOKUP('table'='AsyncLookupTable', 'output-mode'='ordered') */ * 
+FROM MyTable AS T 
+JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+ ON T.a = D.id
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], 
name=[$6], age=[$7])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 3}], joinHints=[[[LOOKUP inheritPath:[0] 
options:{output-mode=allow_unordered, table=AsyncLookupTable}][LOOKUP 
inheritPath:[0] options:{output-mode=ordered, table=AsyncLookupTable}]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]], 
hints=[[[ALIAS inheritPath:[] options:[T]]]])
+   +- LogicalFilter(condition=[=($cor0.a, $0)])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
AsyncLookupTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, 
name, age])
++- LookupJoin(table=[default_catalog.default_database.AsyncLookupTable], 
joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, 
name, age], async=[UNORDERED, 180000ms, 100])
+   +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 469b74fa287..943dfecdaf1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -766,6 +766,36 @@ class LookupJoinTest(legacyTableSource: Boolean) extends 
TableTestBase with Seri
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testMultipleJoinHintsWithSameTableName(): Unit = {
+    // only the first hint will take effect
+    val sql =
+      """
+        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered'), 
+        |           LOOKUP('table'='AsyncLookupTable', 
'output-mode'='ordered') */ * 
+        |FROM MyTable AS T 
+        |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+        | ON T.a = D.id
+      """.stripMargin
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testMultipleJoinHintsWithDifferentTableName(): Unit = {
+    // both hints on corresponding tables will take effect
+    val sql =
+      """
+        |SELECT /*+ LOOKUP('table'='AsyncLookupTable', 
'output-mode'='allow_unordered'), 
+        |           LOOKUP('table'='LookupTable', 
'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 
'fixed-delay'='10s', 'max-attempts'='3') */ * 
+        |FROM MyTable AS T 
+        |JOIN AsyncLookupTable FOR SYSTEM_TIME AS OF T.proctime AS D 
+        |  ON T.a = D.id
+        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D1 
+        |  ON T.a = D1.id
+      """.stripMargin
+    util.verifyExecPlan(sql)
+  }
+
   @Test
   def testJoinSyncTableWithAsyncHint(): Unit = {
     val sql =

Reply via email to