[
https://issues.apache.org/jira/browse/FLINK-19792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417971#comment-17417971
]
Francesco Guardiani edited comment on FLINK-19792 at 9/21/21, 8:14 AM:
-----------------------------------------------------------------------
I tried to replicate the issue with the following query (rowtime is just a
TIMESTAMP typed attribute computed from another attribute):
{code:java}
SELECT t1.a, t2.b FROM A t1, B t2 WHERE t1.rowtime = t2.rowtime AND t1.a = t2.a
{code}
And I see a proper interval join is planned:
{code:java}
Calc(select=[a, b])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2],
where=[((rowtime = rowtime0) AND (a = a0))], select=[a, rowtime, a0, b,
rowtime0])
:- Exchange(distribution=[hash[rowtime, a]])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
: +- Calc(select=[a, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database, A,
project=[a, c], metadata=[]]], fields=[a, c])
+- Exchange(distribution=[hash[rowtime, a]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
+- Calc(select=[a, b, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, B]],
fields=[a, b, c])
{code}
Am I overlooking something?
was (Author: slinkydeveloper):
I tried to replicate the issue with the following query (rowtime is just a
TIMESTAMP typed attribute computed from another attribute):
{code:java}
SELECT t1.a, t2.b FROM A t1, B t2 WHERE t1.rowtime = t2.rowtime AND t1.a = t2.a
{code}
And I see a proper interval join is planned:
{code:java}
Calc(select=[a, b])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=0, leftUpperBound=0, leftTimeIndex=1, rightTimeIndex=2],
where=[((rowtime = rowtime0) AND (a = a0))], select=[a, rowtime, a0, b,
rowtime0])
:- Exchange(distribution=[hash[rowtime, a]])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
: +- Calc(select=[a, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database, A,
project=[a, c], metadata=[]]], fields=[a, c])
+- Exchange(distribution=[hash[rowtime, a]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
1000:INTERVAL SECOND)])
+- Calc(select=[a, b, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])
+- TableSourceScan(table=[[default_catalog, default_database, B]],
fields=[a, b, c])
{code}
Am I overlooking something?
> Interval join with equal time attributes is not recognized
> ----------------------------------------------------------
>
> Key: FLINK-19792
> URL: https://issues.apache.org/jira/browse/FLINK-19792
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Timo Walther
> Priority: Minor
> Labels: auto-deprioritized-major
>
> A user reported that interval joins with equal time attribute predicate are
> not recognized, instead a regular inner join is used:
> For example:
> {code}
> table1 = table_env.from_path("table1")
> table2 = table_env.from_path("table2")
> print(table1.join(table2).where("ts = ts2 && id = id2").select("id, ts")
> {code}
> The documentation clearly states that this should be supported:
> {code}
> For example, the following predicates are valid interval join conditions:
> ltime === rtime
> ltime >= rtime && ltime < rtime + 10.minutes
> {code}
> Source:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#joins
> See also the discussion here:
> https://stackoverflow.com/q/64445207/806430
--
This message was sent by Atlassian Jira
(v8.3.4#803005)