[
https://issues.apache.org/jira/browse/BEAM-912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15652072#comment-15652072
]
Kenneth Knowles commented on BEAM-912:
--------------------------------------
After conversation with a colleague, we have another suggestion that I think is
quite simple and effective: Use a variation on non-merging {{FixedWindows}} of
some small granularity, replicating your B items into each window that they
overlap, then do the naive full cartesian product and filter.
{code}
class OverlapWindowFn extends WindowFn {
private final Duration granularity =
Duration.millis(GRANULARITY_TUNED_FOR_PERFORMANCE);
void assignWindows(...) {
... assign elements of B to every window that they overlap with ...
... assign elements of A to the window that they fall into ...
}
}
{code}
Note that you don't need to merge, so that is also much more efficient and
allows optimizations. This is also much higher level than implementing a join
yourself with state.
This separates our classic questions "what are you computing?" (range join via
filtered cross product) and the "where is your data in event time?" (when do we
think we can garbage collect it)
> Range join in Beam
> ------------------
>
> Key: BEAM-912
> URL: https://issues.apache.org/jira/browse/BEAM-912
> Project: Beam
> Issue Type: New Feature
> Components: beam-model
> Reporter: Jingsong Lee
> Assignee: Kenneth Knowles
> Attachments: betweenJoin.png
>
>
> 1.We can support some data-driven trigger, so we need expose data in
> OnElementContext of onElement method.
> 2.We can support more flexible join, so we need expose buffer tag in
> TriggerContext, now this buffer tag is in SystemReduceFn.
> for example: SELECT STREAM * FROM Orders AS o JOIN Shipments AS s
> ON o.orderId = s.orderId AND s.rowtime BETWEEN o.rowtime AND o.rowtime +
> INTERVAL '1' HOUR;
> link: https://issues.apache.org/jira/browse/BEAM-101
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)