[ 
https://issues.apache.org/jira/browse/BEAM-912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646524#comment-15646524
 ] 

Kenneth Knowles edited comment on BEAM-912 at 11/8/16 5:11 AM:
---------------------------------------------------------------

Very good point. My idea does not work for you. Actually the difficult part of 
your problem isn't really windowing. Windows are how we decide to trigger and 
evict data when allowed lateness is past - that is why {{BoundedWindow}} only 
has a maximum timestamp. But your problem is challenging if we just talk about 
doing a range join in Beam, where only equi-joins on shared keys are in the 
SDK/model. A naive way to do it is a full cross product and then filter the 
data that matches. Some prominent big data products still do this, even though 
it is very inefficient.

In Beam one way we could make this more efficient is with a side input (see 
https://s.apache.org/beam-side-inputs-1-pager) that supports range queries. 
Then collection B would be the main input, and collection A would be the side 
inputs. The windowing of each of them would be just like the {{assignWindows}} 
above, but with no merging. Then, because side inputs are only eventually 
consistent, you would want to trigger collection B after some allowed lateness, 
and then downstream join it with A and whatever elements from A had arrived 
would be the ones that are joined. In cases where the side input is bounded and 
small, you may be able to do this in memory or on local disk.

Triggers cannot solve this problem, and they are not designed to do your main 
computation like this - they only control when results get output (sometimes we 
say "materialized") from a {{GroupByKey}} or {{Combine}} - see 
https://s.apache.org/beam-triggers.

The solution you describe is how you might use a stateful {{ParDo}} to 
implement a per-key range join, though I think the need for a data structure 
that supports range queries will be the same to support out of order data. Your 
{{onElement}} becomes {{@ProcessElement}} and {{BagState}} is the same. The 
state will be partitioned by key and window, so your example would be globally 
windowed. Using state in your {{DoFn}} is something I am working on right now, 
so you should follow BEAM-25 for updates on when it is ready for you to use, 
and check out https://s.apache.org/beam-state.


was (Author: kenn):
Very good point. My idea does not work for you. Actually the difficult part of 
your problem isn't really windowing. Windows are how we decide to trigger and 
evict data when allowed lateness is past - that is why {{BoundedWindow}} only 
has a maximum timestamp. But your problem is challenging if we just talk about 
doing a range join in Beam, where only equi-joins on shared keys are in the 
SDK/model. A naive way to do it is a full cross product and then filter the 
data that matches. Some prominent big data products still do this, even though 
it is very inefficient.

In Beam one way we could make this more efficient is with a side input (see 
https://s.apache.org/beam-side-inputs-1-pager) that supports range queries. 
Then collection B would be the main input, and collection A would be the side 
inputs. The windowing of each of them would be just like the {{assignWindows}} 
above, but with no merging. Then, because side inputs are only eventually 
consistent, you would want to trigger collection B after some allowed lateness, 
and then downstream join it with A and whatever elements from A had arrived 
would be the ones that are joined. In cases where the side input is small and 
bounded, you may be able to do this in memory or on local disk.

Triggers cannot solve this problem, and they are not designed to do your main 
computation like this - they only control when results get output (sometimes we 
say "materialized") from a {{GroupByKey}} or {{Combine}} - see 
https://s.apache.org/beam-triggers.

The solution you describe is how you might use a stateful {{ParDo}} to 
implement a per-key range join, though I think the need for a data structure 
that supports range queries will be the same to support out of order data. Your 
{{onElement}} becomes {{@ProcessElement}} and {{BagState}} is the same. The 
state will be partitioned by key and window, so your example would be globally 
windowed. Using state in your {{DoFn}} is something I am working on right now, 
so you should follow BEAM-25 for updates on when it is ready for you to use, 
and check out https://s.apache.org/beam-state.

> Range join in Beam
> ------------------
>
>                 Key: BEAM-912
>                 URL: https://issues.apache.org/jira/browse/BEAM-912
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Jingsong Lee
>            Assignee: Kenneth Knowles
>         Attachments: betweenJoin.png
>
>
> 1.We can support some data-driven trigger, so we need expose data in 
> OnElementContext of onElement method. 
> 2.We can support more flexible join, so we need expose buffer tag in 
> TriggerContext, now this buffer tag is in SystemReduceFn.
> for example: SELECT STREAM * FROM Orders AS o JOIN Shipments AS s
> ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime + 
> INTERVAL '1' HOUR;
> link: https://issues.apache.org/jira/browse/BEAM-101



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to