Or you could define it like this:

stream_A = a.window(...)
stream_B = b.window(...)

stream_A.join(stream_B).where().equals().with()

So a join would just be a join of two WindowedDataStreamS. This would
neatly move the windowing stuff into one place.

On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <balassi.mar...@gmail.com> wrote:
> Big +1 for the proposal for Peter and Gyula. I'm really for bringing the
> windowing and window join API in sync.
>
> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org> wrote:
>
>> Hey guys,
>>
>> As Aljoscha has highlighted earlier the current window join semantics in
>> the streaming api doesn't follow the changes in the windowing api. More
>> precisely, we currently only support joins over time windows of equal size
>> on both streams. The reason for this is that we now take a window of each
>> of the two streams and do joins over these pairs. This would be a blocking
>> operation if the windows are not closed at exactly the same time (and since
>> we dont want this we only allow time windows)
>>
>> I talked with Peter who came up with the initial idea of an alternative
>> approach for stream joins which works as follows:
>>
>> Instead of pairing windows for joins, we do element against window joins.
>> What this means is that whenever we receive an element from one of the
>> streams, we join this element with the current window(this window is
>> constantly updated) of the other stream. This is non-blocking on any window
>> definitions as we dont have to wait for windows to be completed and we can
>> use this with any of our predefined policies like Time.of(...),
>> Count.of(...), Delta.of(....).
>>
>> Additionally this also allows some very flexible way of defining window
>> joins. With this we could also define grouped windowing inside if a join.
>> An example of this would be: Join all elements of Stream1 with the last 5
>> elements by a given windowkey of Stream2 on some join key.
>>
>> This feature can be easily implemented over the current operators, so I
>> already have a working prototype for the simple non-grouped case. My only
>> concern is the API, the best thing I could come up with is something like
>> this:
>>
>> stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1,
>> windowKey2).where(...).equalTo(...).with(...)
>>
>> (the user can omit the "by" and "with" calls)
>>
>> I think this new approach would be worthy of our "flexible windowing" in
>> contrast with the current approach.
>>
>> Regards,
>> Gyula
>>

Reply via email to