[ 
https://issues.apache.org/jira/browse/FLINK-19792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417971#comment-17417971
 ] 

Francesco Guardiani edited comment on FLINK-19792 at 9/21/21, 8:14 AM:
-----------------------------------------------------------------------

I tried to replicate the issue with the following query (rowtime is just a 
TIMESTAMP typed attribute computed from another attribute):
{code:java}
SELECT t1.a, t2.b FROM A t1, B t2 WHERE t1.rowtime = t2.rowtime AND t1.a = t2.a
{code}
And I see a proper interval join is planned:
{code:java}
Calc(select=[a, b])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], 
where=[((rowtime = rowtime0) AND (a = a0))], select=[a, rowtime, a0, b, 
rowtime0])
   :- Exchange(distribution=[hash[rowtime, a]])
   :  +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
   :     +- Calc(select=[a, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
   :        +- TableSourceScan(table=[[default_catalog, default_database, A, 
project=[a, c], metadata=[]]], fields=[a, c])
   +- Exchange(distribution=[hash[rowtime, a]])
      +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
         +- Calc(select=[a, b, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
            +- TableSourceScan(table=[[default_catalog, default_database, B]], 
fields=[a, b, c])
{code}
Am I overlooking something?

 


was (Author: slinkydeveloper):
I tried to replicate the issue with the following query (rowtime is just a 
TIMESTAMP typed attribute computed from another attribute):
{code:java}
SELECT t1.a, t2.b FROM A t1, B t2 WHERE t1.rowtime = t2.rowtime AND t1.a = t2.a
{code}
And I see a proper interval join is planned:

 

 
{code:java}
Calc(select=[a, b])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2], 
where=[((rowtime = rowtime0) AND (a = a0))], select=[a, rowtime, a0, b, 
rowtime0])
   :- Exchange(distribution=[hash[rowtime, a]])
   :  +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
   :     +- Calc(select=[a, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
   :        +- TableSourceScan(table=[[default_catalog, default_database, A, 
project=[a, c], metadata=[]]], fields=[a, c])
   +- Exchange(distribution=[hash[rowtime, a]])
      +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 
1000:INTERVAL SECOND)])
         +- Calc(select=[a, b, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
            +- TableSourceScan(table=[[default_catalog, default_database, B]], 
fields=[a, b, c])
{code}
Am I overlooking something?

 

> Interval join with equal time attributes is not recognized
> ----------------------------------------------------------
>
>                 Key: FLINK-19792
>                 URL: https://issues.apache.org/jira/browse/FLINK-19792
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Timo Walther
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> A user reported that interval joins with equal time attribute predicate are 
> not recognized, instead a regular inner join is used:
> For example:
> {code}
> table1 = table_env.from_path("table1")
> table2 = table_env.from_path("table2")
> print(table1.join(table2).where("ts = ts2 && id = id2").select("id, ts")
> {code}
> The documentation clearly states that this should be supported:
> {code}
> For example, the following predicates are valid interval join conditions:
> ltime === rtime
> ltime >= rtime && ltime < rtime + 10.minutes
> {code}
> Source: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#joins
> See also the discussion here:
> https://stackoverflow.com/q/64445207/806430



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to