There is a simple reason for that: They don't support joins. :D They support n-ary co-group, however. This is implemented using tagging and a group-by-key operation. So only elements in the same window can end up in the same co-grouped result.
On Fri, Apr 24, 2015 at 3:51 PM, Matthias J. Sax <mj...@informatik.hu-berlin.de> wrote: > Interesting read. Thanks for the pointer. > > Take home message (in my understanding): > - they support wall-clock, attribute-ts, and count windows > -> default is attribute-ts (and not wall-clock as in Flink) > -> it is not specified, if a global order is applied to windows, but I > doubt it, because of their Watermark approach > - they allow the user to assign timestamps for attribute-ts windows > - they deal with out-of-order data (-> not sure what the last sentence > means exactly: "...causing the late elements to be emitted as they > arrive." ?) > - their "Watermark" approach might yield high latencies > > However, they don't talk about joins... :( > > > > > On 04/24/2015 02:25 PM, Aljoscha Krettek wrote: >> Did anyone read these: >> https://cloud.google.com/dataflow/model/windowing, >> https://cloud.google.com/dataflow/model/triggers ? >> >> The semantics seem very straightforward and I'm sure the google guys >> spent some time thinking this through. :D >> >> On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <se...@apache.org> wrote: >>> Perfect! I am eager to see what you came up with! >>> >>> On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >>>> Hey all, >>>> >>>> We have spent some time with Asterios, Paris and Jonas to finalize the >>>> windowing semantics (both the current features and the window join), and I >>>> think we made very have come up with a very clear picture. >>>> >>>> We will write down the proposed semantics and publish it to the wiki next >>>> week. >>>> >>>> Cheers, >>>> Gyula >>>> >>>> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < >>>> asterios.katsifodi...@tu-berlin.de> wrote: >>>> >>>>> As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere >>>>> Streams does: symmetric hash join. >>>>> >>>>> From [1]: >>>>> "When a tuple is received on an input port, it is inserted into the >>>> window >>>>> corresponding to the input port, which causes the window to trigger. As >>>>> part of the trigger processing, the tuple is compared against all tuples >>>>> inside the window of the opposing input port. If the tuples match, then >>>> an >>>>> output tuple will be produced for each match. If at least one output was >>>>> generated, a window punctuation will be generated after all the outputs." >>>>> >>>>> Cheers, >>>>> Asterios >>>>> >>>>> [1] >>>>> >>>>> >>>> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html >>>>> >>>>> >>>>> >>>>> On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < >>>>> mj...@informatik.hu-berlin.de> wrote: >>>>> >>>>>> Hi Paris, >>>>>> >>>>>> thanks for the pointer to the Naiad paper. That is quite interesting. >>>>>> >>>>>> The paper I mentioned [1], does not describe the semantics in detail; >>>> it >>>>>> is more about the implementation for the stream-joins. However, it uses >>>>>> the same semantics (from my understanding) as proposed by Gyula. >>>>>> >>>>>> -Matthias >>>>>> >>>>>> [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded >>>>>> Streams". VLDB 2002. >>>>>> >>>>>> >>>>>> >>>>>> On 04/07/2015 12:38 PM, Paris Carbone wrote: >>>>>>> Hello Matthias, >>>>>>> >>>>>>> Sure, ordering guarantees are indeed a tricky thing, I recall having >>>>>> that discussion back in TU Berlin. Bear in mind thought that >>>> DataStream, >>>>>> our abstract data type, represents a *partitioned* unbounded sequence >>>> of >>>>>> events. There are no *global* ordering guarantees made whatsoever in >>>> that >>>>>> model across partitions. If you see it more generally there are many >>>>> “race >>>>>> conditions” in a distributed execution graph of vertices that process >>>>>> multiple inputs asynchronously, especially when you add joins and >>>>>> iterations into the mix (how do you deal with reprocessing “old” tuples >>>>>> that iterate in the graph). Btw have you checked the Naiad paper [1]? >>>>>> Stephan cited a while ago and it is quite relevant to that discussion. >>>>>>> >>>>>>> Also, can you cite the paper with the joining semantics you are >>>>>> referring to? That would be of good help I think. >>>>>>> >>>>>>> Paris >>>>>>> >>>>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >>>>>>> >>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>>> >>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>>> On 07 Apr 2015, at 11:50, Matthias J. Sax < >>>>> mj...@informatik.hu-berlin.de >>>>>> <mailto:mj...@informatik.hu-berlin.de>> wrote: >>>>>>> >>>>>>> Hi @all, >>>>>>> >>>>>>> please keep me in the loop for this work. I am highly interested and >>>> I >>>>>>> want to help on it. >>>>>>> >>>>>>> My initial thoughts are as follows: >>>>>>> >>>>>>> 1) Currently, system timestamps are used and the suggested approach >>>> can >>>>>>> be seen as state-of-the-art (there is actually a research paper using >>>>>>> the exact same join semantic). Of course, the current approach is >>>>>>> inherently non-deterministic. The advantage is, that there is no >>>>>>> overhead in keeping track of the order of records and the latency >>>>> should >>>>>>> be very low. (Additionally, state-recovery is simplified. Because, >>>> the >>>>>>> processing in inherently non-deterministic, recovery can be done with >>>>>>> relaxed guarantees). >>>>>>> >>>>>>> 2) The user should be able to "switch on" deterministic processing, >>>>>>> ie, records are timestamped (either externally when generated, or >>>>>>> timestamped at the sources). Because deterministic processing adds >>>> some >>>>>>> overhead, the user should decide for it actively. >>>>>>> In this case, the order must be preserved in each re-distribution >>>> step >>>>>>> (merging is sufficient, if order is preserved within each incoming >>>>>>> channel). Furthermore, deterministic processing can be achieved by >>>>> sound >>>>>>> window semantics (and there is a bunch of them). Even for >>>>>>> single-stream-windows it's a tricky problem; for join-windows it's >>>> even >>>>>>> harder. From my point of view, it is less important which semantics >>>> are >>>>>>> chosen; however, the user must be aware how it works. The most tricky >>>>>>> part for deterministic processing, is to deal with duplicate >>>> timestamps >>>>>>> (which cannot be avoided). The timestamping for (intermediate) result >>>>>>> tuples, is also an important question to be answered. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: >>>>>>> Hey, >>>>>>> >>>>>>> I agree with Kostas, if we define the exact semantics how this works, >>>>>> this >>>>>>> is not more ad-hoc than any other stateful operator with multiple >>>>> inputs. >>>>>>> (And I don't think any other system support something similar) >>>>>>> >>>>>>> We need to make some design choices that are similar to the issues we >>>>> had >>>>>>> for windowing. We need to chose how we want to evaluate the windowing >>>>>>> policies (global or local) because that affects what kind of policies >>>>> can >>>>>>> be parallel, but I can work on these things. >>>>>>> >>>>>>> I think this is an amazing feature, so I wouldn't necessarily rush >>>> the >>>>>>> implementation for 0.9 though. >>>>>>> >>>>>>> And thanks for helping writing these down. >>>>>>> >>>>>>> Gyula >>>>>>> >>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <ktzou...@apache.org >>>>>> <mailto:ktzou...@apache.org>> wrote: >>>>>>> >>>>>>> Yes, we should write these semantics down. I volunteer to help. >>>>>>> >>>>>>> I don't think that this is very ad-hoc. The semantics are basically >>>> the >>>>>>> following. Assuming an arriving element from the left side: >>>>>>> (1) We find the right-side matches >>>>>>> (2) We insert the left-side arrival into the left window >>>>>>> (3) We recompute the left window >>>>>>> We need to see whether right window re-computation needs to be >>>>> triggered >>>>>> as >>>>>>> well. I think that this way of joining streams is also what the >>>>> symmetric >>>>>>> hash join algorithms were meant to support. >>>>>>> >>>>>>> Kostas >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <se...@apache.org >>>>> <mailto: >>>>>> se...@apache.org>> wrote: >>>>>>> >>>>>>> Is the approach of joining an element at a time from one input >>>> against >>>>> a >>>>>>> window on the other input not a bit arbitrary? >>>>>>> >>>>>>> This just joins whatever currently happens to be the window by the >>>> time >>>>>>> the >>>>>>> single element arrives - that is a bit non-predictable, right? >>>>>>> >>>>>>> As a more general point: The whole semantics of windowing and when >>>> they >>>>>>> are >>>>>>> triggered are a bit ad-hoc now. It would be really good to start >>>>>>> formalizing that a bit and >>>>>>> put it down somewhere. Users need to be able to clearly understand >>>> and >>>>>>> how >>>>>>> to predict the output. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com >>>>>> <mailto:gyula.f...@gmail.com>> >>>>>>> wrote: >>>>>>> >>>>>>> I think it should be possible to make this compatible with the >>>>>>> .window().every() calls. Maybe if there is some trigger set in >>>> "every" >>>>>>> we >>>>>>> would not join that stream 1 by 1 but every so many elements. The >>>>>>> problem >>>>>>> here is that the window and every in this case are very-very >>>> different >>>>>>> than >>>>>>> the normal windowing semantics. The window would define the join >>>> window >>>>>>> for >>>>>>> each element of the other stream while every would define how often I >>>>>>> join >>>>>>> This stream with the other one. >>>>>>> >>>>>>> We need to think to make this intuitive. >>>>>>> >>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> >>>>>>> wrote: >>>>>>> >>>>>>> That would be really neat, the problem I see there, that we do not >>>>>>> distinguish between dataStream.window() and >>>>>>> dataStream.window().every() >>>>>>> currently, they both return WindowedDataStreams and TriggerPolicies >>>>>>> of >>>>>>> the >>>>>>> every call do not make much sense in this setting (in fact >>>>>>> practically >>>>>>> the >>>>>>> trigger is always set to count of one). >>>>>>> >>>>>>> But of course we could make it in a way, that we check that the >>>>>>> eviction >>>>>>> should be either null or count of 1, in every other case we throw an >>>>>>> exception while building the JobGraph. >>>>>>> >>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >>>>>>> aljos...@apache.org<mailto:aljos...@apache.org>> >>>>>>> wrote: >>>>>>> >>>>>>> Or you could define it like this: >>>>>>> >>>>>>> stream_A = a.window(...) >>>>>>> stream_B = b.window(...) >>>>>>> >>>>>>> stream_A.join(stream_B).where().equals().with() >>>>>>> >>>>>>> So a join would just be a join of two WindowedDataStreamS. This >>>>>>> would >>>>>>> neatly move the windowing stuff into one place. >>>>>>> >>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com> >>>>>>> >>>>>>> wrote: >>>>>>> Big +1 for the proposal for Peter and Gyula. I'm really for >>>>>>> bringing >>>>>>> the >>>>>>> windowing and window join API in sync. >>>>>>> >>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org >>>> <mailto: >>>>>> gyf...@apache.org>> >>>>>>> wrote: >>>>>>> >>>>>>> Hey guys, >>>>>>> >>>>>>> As Aljoscha has highlighted earlier the current window join >>>>>>> semantics >>>>>>> in >>>>>>> the streaming api doesn't follow the changes in the windowing >>>>>>> api. >>>>>>> More >>>>>>> precisely, we currently only support joins over time windows of >>>>>>> equal >>>>>>> size >>>>>>> on both streams. The reason for this is that we now take a >>>>>>> window >>>>>>> of >>>>>>> each >>>>>>> of the two streams and do joins over these pairs. This would be >>>>>>> a >>>>>>> blocking >>>>>>> operation if the windows are not closed at exactly the same time >>>>>>> (and >>>>>>> since >>>>>>> we dont want this we only allow time windows) >>>>>>> >>>>>>> I talked with Peter who came up with the initial idea of an >>>>>>> alternative >>>>>>> approach for stream joins which works as follows: >>>>>>> >>>>>>> Instead of pairing windows for joins, we do element against >>>>>>> window >>>>>>> joins. >>>>>>> What this means is that whenever we receive an element from one >>>>>>> of >>>>>>> the >>>>>>> streams, we join this element with the current window(this >>>>>>> window >>>>>>> is >>>>>>> constantly updated) of the other stream. This is non-blocking on >>>>>>> any >>>>>>> window >>>>>>> definitions as we dont have to wait for windows to be completed >>>>>>> and >>>>>>> we >>>>>>> can >>>>>>> use this with any of our predefined policies like Time.of(...), >>>>>>> Count.of(...), Delta.of(....). >>>>>>> >>>>>>> Additionally this also allows some very flexible way of defining >>>>>>> window >>>>>>> joins. With this we could also define grouped windowing inside >>>>>>> if >>>>>>> a >>>>>>> join. >>>>>>> An example of this would be: Join all elements of Stream1 with >>>>>>> the >>>>>>> last >>>>>>> 5 >>>>>>> elements by a given windowkey of Stream2 on some join key. >>>>>>> >>>>>>> This feature can be easily implemented over the current >>>>>>> operators, >>>>>>> so >>>>>>> I >>>>>>> already have a working prototype for the simple non-grouped >>>>>>> case. >>>>>>> My >>>>>>> only >>>>>>> concern is the API, the best thing I could come up with is >>>>>>> something >>>>>>> like >>>>>>> this: >>>>>>> >>>>>>> stream_A.join(stream_B).onWindow(windowDefA, >>>>>>> windowDefB).by(windowKey1, >>>>>>> windowKey2).where(...).equalTo(...).with(...) >>>>>>> >>>>>>> (the user can omit the "by" and "with" calls) >>>>>>> >>>>>>> I think this new approach would be worthy of our "flexible >>>>>>> windowing" >>>>>>> in >>>>>>> contrast with the current approach. >>>>>>> >>>>>>> Regards, >>>>>>> Gyula >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >> >