Or you could define it like this: stream_A = a.window(...) stream_B = b.window(...)
stream_A.join(stream_B).where().equals().with() So a join would just be a join of two WindowedDataStreamS. This would neatly move the windowing stuff into one place. On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <balassi.mar...@gmail.com> wrote: > Big +1 for the proposal for Peter and Gyula. I'm really for bringing the > windowing and window join API in sync. > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org> wrote: > >> Hey guys, >> >> As Aljoscha has highlighted earlier the current window join semantics in >> the streaming api doesn't follow the changes in the windowing api. More >> precisely, we currently only support joins over time windows of equal size >> on both streams. The reason for this is that we now take a window of each >> of the two streams and do joins over these pairs. This would be a blocking >> operation if the windows are not closed at exactly the same time (and since >> we dont want this we only allow time windows) >> >> I talked with Peter who came up with the initial idea of an alternative >> approach for stream joins which works as follows: >> >> Instead of pairing windows for joins, we do element against window joins. >> What this means is that whenever we receive an element from one of the >> streams, we join this element with the current window(this window is >> constantly updated) of the other stream. This is non-blocking on any window >> definitions as we dont have to wait for windows to be completed and we can >> use this with any of our predefined policies like Time.of(...), >> Count.of(...), Delta.of(....). >> >> Additionally this also allows some very flexible way of defining window >> joins. With this we could also define grouped windowing inside if a join. >> An example of this would be: Join all elements of Stream1 with the last 5 >> elements by a given windowkey of Stream2 on some join key. >> >> This feature can be easily implemented over the current operators, so I >> already have a working prototype for the simple non-grouped case. My only >> concern is the API, the best thing I could come up with is something like >> this: >> >> stream_A.join(stream_B).onWindow(windowDefA, windowDefB).by(windowKey1, >> windowKey2).where(...).equalTo(...).with(...) >> >> (the user can omit the "by" and "with" calls) >> >> I think this new approach would be worthy of our "flexible windowing" in >> contrast with the current approach. >> >> Regards, >> Gyula >>