[
https://issues.apache.org/jira/browse/BEAM-912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646524#comment-15646524
]
Kenneth Knowles edited comment on BEAM-912 at 11/8/16 5:11 AM:
---------------------------------------------------------------
Very good point. My idea does not work for you. Actually the difficult part of
your problem isn't really windowing. Windows are how we decide to trigger and
evict data when allowed lateness is past - that is why {{BoundedWindow}} only
has a maximum timestamp. But your problem is challenging if we just talk about
doing a range join in Beam, where only equi-joins on shared keys are in the
SDK/model. A naive way to do it is a full cross product and then filter the
data that matches. Some prominent big data products still do this, even though
it is very inefficient.
In Beam one way we could make this more efficient is with a side input (see
https://s.apache.org/beam-side-inputs-1-pager) that supports range queries.
Then collection B would be the main input, and collection A would be the side
inputs. The windowing of each of them would be just like the {{assignWindows}}
above, but with no merging. Then, because side inputs are only eventually
consistent, you would want to trigger collection B after some allowed lateness,
and then downstream join it with A and whatever elements from A had arrived
would be the ones that are joined. In cases where the side input is small or
bounded, you may be able to do this in memory or on local disk.
Triggers cannot solve this problem, and they are not designed to do your main
computation like this - they only control when results get output (sometimes we
say "materialized") from a {{GroupByKey}} or {{Combine}} - see
https://s.apache.org/beam-triggers.
The solution you describe is how you might use a stateful {{ParDo}} to
implement a per-key range join, though I think the need for a data structure
that supports range queries will be the same to support out of order data. Your
{{onElement}} becomes {{@ProcessElement}} and {{BagState}} is the same. The
state will be partitioned by key and window, so your example would be globally
windowed. Using state in your {{DoFn}} is something I am working on right now,
so you should follow BEAM-25 for updates on when it is ready for you to use,
and check out https://s.apache.org/beam-state.
was (Author: kenn):
Very good point. My idea does not work for you. Actually the difficult part of
your problem isn't really windowing. Windows are how we decide to trigger and
evict data when allowed lateness is past - that is why {{BoundedWindow}} only
has a maximum timestamp. But your problem is challenging if we just talk about
doing a range join in Beam, where only equi-joins on shared keys are in the
SDK/model. A naive way to do it is a full cross product and then filter the
data that matches. Some prominent big data products still do this, even though
it is very inefficient.
In Beam one way we could make this more efficient is with a side input (see
https://s.apache.org/beam-side-inputs-1-pager) that supports range queries.
Then collection B would be the main input, and collection A would be the side
inputs. The windowing of each of them would be just like the {{assignWindows}}
above, but with no merging. Then, because side inputs are only eventually
consistent, you would want to trigger collection B after some allowed lateness,
and then downstream join it with A and whatever elements from A had arrived
would be the ones that are joined.
Triggers cannot solve this problem, and they are not designed to do your main
computation like this - they only control when results get output (sometimes we
say "materialized") from a {{GroupByKey}} or {{Combine}} - see
https://s.apache.org/beam-triggers.
The solution you describe is how you might use a stateful {{ParDo}} to
implement a per-key range join, though I think the need for a data structure
that supports range queries will be the same to support out of order data. Your
{{onElement}} becomes {{@ProcessElement}} and {{BagState}} is the same. The
state will be partitioned by key and window, so your example would be globally
windowed. Using state in your {{DoFn}} is something I am working on right now,
so you should follow BEAM-25 for updates on when it is ready for you to use,
and check out https://s.apache.org/beam-state.
> Range join in Beam
> ------------------
>
> Key: BEAM-912
> URL: https://issues.apache.org/jira/browse/BEAM-912
> Project: Beam
> Issue Type: New Feature
> Components: beam-model
> Reporter: Jingsong Lee
> Assignee: Kenneth Knowles
> Attachments: betweenJoin.png
>
>
> 1.We can support some data-driven trigger, so we need expose data in
> OnElementContext of onElement method.
> 2.We can support more flexible join, so we need expose buffer tag in
> TriggerContext, now this buffer tag is in SystemReduceFn.
> for example: SELECT STREAM * FROM Orders AS o JOIN Shipments AS s
> ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime +
> INTERVAL '1' HOUR;
> link: https://issues.apache.org/jira/browse/BEAM-101
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)