Your query

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
  GROUP BY TUMBLE(a. rowtime, INTERVAL '1' MINUTE)

won't work because there is no constraint on b.rowtime. The system would 
literally have to wait forever before it could move on from a, or output a 
total. Let’s try this:

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON FLOOR(a.rowtime TO MINUTE) = FLOOR(b.rowtime TO MINUTE)
  GROUP BY TUMBLE(a. rowtime, INTERVAL '1' MINUTE)

This query is valid, but may or may not be what you want. In order for records 
with the same id to be matched, they will have to occur within the same minute. 
If a occurs at 10:43:59 and b occurs at 10:44:02, they will not match. Also, if 
there are several records in a or b with the same id value then you will get a 
cartesian product effect.

A common scenario is that b needs to occur a after a within a rolling one 
minute window. Thus, if a occurs at 10:43:59 then b can occur between 10:43:59 
and 10:44:59. I would write that query as follows:

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON b.rowtime BETWEEN a.rowtime AND a.rowtime + INTERVAL ‘1’ MINUTE

You can then summarize the joined records, emitting a total each minute:

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON b.rowtime BETWEEN a.rowtime AND a.rowtime + INTERVAL ‘1’ MINUTE
  GROUP BY TUMBLE(a. rowtime, INTERVAL '1' MINUTE)

The following query

  SELECT STREAM a.foo, b.bar
  FROM a JOIN a.id = b.id
    ON b.rowtime BETWEEN a.rowtime AND a.rowtime + INTERVAL ‘1’ MINUTE
  GROUP BY TUMBLE(b. rowtime, INTERVAL '1' MINUTE)

differs only in the last line (“b.rowtime” versus “a.rowtime”), but will emit 
different totals because we are bucketing by different timestamp fields. In 
order to implement this query efficiently, Calcite will have to deduce that 
a.rowtime and b.rowtime are sufficiently close that it can re-sort the join 
(ordered by a.rowtime) into the input that the aggregate needs (order by 
r.rowtime). A partial sort over a rolling 1 minute window (think of a priority 
queue) would suffice.

Julian


> On Feb 24, 2017, at 2:32 PM, Haohui Mai <whe...@apache.org> wrote:
> 
> Hi,
> 
> I wonder what would be the best way to express joining two windowed streams
> in SQL?
> 
> A even simpler use case will be joining two streams within the same window
> (see
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html).
> I'm looking at something like
> 
> SELECT a.foo, b.bar FROM a JOIN a.id = b.id GROUP BY TUMBLE(a. rowtime,
> INTERVAL '1' MINUTE)
> 
> Is it the right syntax? Or on a even higher level, does this query has
> proper semantics?
> 
> Your comments are appreciated.
> 
> Regards,
> Haohui

Reply via email to