Interesting read. Thanks for the pointer. Take home message (in my understanding): - they support wall-clock, attribute-ts, and count windows -> default is attribute-ts (and not wall-clock as in Flink) -> it is not specified, if a global order is applied to windows, but I doubt it, because of their Watermark approach - they allow the user to assign timestamps for attribute-ts windows - they deal with out-of-order data (-> not sure what the last sentence means exactly: "...causing the late elements to be emitted as they arrive." ?) - their "Watermark" approach might yield high latencies
However, they don't talk about joins... :( On 04/24/2015 02:25 PM, Aljoscha Krettek wrote: > Did anyone read these: > https://cloud.google.com/dataflow/model/windowing, > https://cloud.google.com/dataflow/model/triggers ? > > The semantics seem very straightforward and I'm sure the google guys > spent some time thinking this through. :D > > On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <se...@apache.org> wrote: >> Perfect! I am eager to see what you came up with! >> >> On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Hey all, >>> >>> We have spent some time with Asterios, Paris and Jonas to finalize the >>> windowing semantics (both the current features and the window join), and I >>> think we made very have come up with a very clear picture. >>> >>> We will write down the proposed semantics and publish it to the wiki next >>> week. >>> >>> Cheers, >>> Gyula >>> >>> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < >>> asterios.katsifodi...@tu-berlin.de> wrote: >>> >>>> As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere >>>> Streams does: symmetric hash join. >>>> >>>> From [1]: >>>> "When a tuple is received on an input port, it is inserted into the >>> window >>>> corresponding to the input port, which causes the window to trigger. As >>>> part of the trigger processing, the tuple is compared against all tuples >>>> inside the window of the opposing input port. If the tuples match, then >>> an >>>> output tuple will be produced for each match. If at least one output was >>>> generated, a window punctuation will be generated after all the outputs." >>>> >>>> Cheers, >>>> Asterios >>>> >>>> [1] >>>> >>>> >>> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html >>>> >>>> >>>> >>>> On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < >>>> mj...@informatik.hu-berlin.de> wrote: >>>> >>>>> Hi Paris, >>>>> >>>>> thanks for the pointer to the Naiad paper. That is quite interesting. >>>>> >>>>> The paper I mentioned [1], does not describe the semantics in detail; >>> it >>>>> is more about the implementation for the stream-joins. However, it uses >>>>> the same semantics (from my understanding) as proposed by Gyula. >>>>> >>>>> -Matthias >>>>> >>>>> [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded >>>>> Streams". VLDB 2002. >>>>> >>>>> >>>>> >>>>> On 04/07/2015 12:38 PM, Paris Carbone wrote: >>>>>> Hello Matthias, >>>>>> >>>>>> Sure, ordering guarantees are indeed a tricky thing, I recall having >>>>> that discussion back in TU Berlin. Bear in mind thought that >>> DataStream, >>>>> our abstract data type, represents a *partitioned* unbounded sequence >>> of >>>>> events. There are no *global* ordering guarantees made whatsoever in >>> that >>>>> model across partitions. If you see it more generally there are many >>>> “race >>>>> conditions” in a distributed execution graph of vertices that process >>>>> multiple inputs asynchronously, especially when you add joins and >>>>> iterations into the mix (how do you deal with reprocessing “old” tuples >>>>> that iterate in the graph). Btw have you checked the Naiad paper [1]? >>>>> Stephan cited a while ago and it is quite relevant to that discussion. >>>>>> >>>>>> Also, can you cite the paper with the joining semantics you are >>>>> referring to? That would be of good help I think. >>>>>> >>>>>> Paris >>>>>> >>>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >>>>>> >>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>> >>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >>>>>> On 07 Apr 2015, at 11:50, Matthias J. Sax < >>>> mj...@informatik.hu-berlin.de >>>>> <mailto:mj...@informatik.hu-berlin.de>> wrote: >>>>>> >>>>>> Hi @all, >>>>>> >>>>>> please keep me in the loop for this work. I am highly interested and >>> I >>>>>> want to help on it. >>>>>> >>>>>> My initial thoughts are as follows: >>>>>> >>>>>> 1) Currently, system timestamps are used and the suggested approach >>> can >>>>>> be seen as state-of-the-art (there is actually a research paper using >>>>>> the exact same join semantic). Of course, the current approach is >>>>>> inherently non-deterministic. The advantage is, that there is no >>>>>> overhead in keeping track of the order of records and the latency >>>> should >>>>>> be very low. (Additionally, state-recovery is simplified. Because, >>> the >>>>>> processing in inherently non-deterministic, recovery can be done with >>>>>> relaxed guarantees). >>>>>> >>>>>> 2) The user should be able to "switch on" deterministic processing, >>>>>> ie, records are timestamped (either externally when generated, or >>>>>> timestamped at the sources). Because deterministic processing adds >>> some >>>>>> overhead, the user should decide for it actively. >>>>>> In this case, the order must be preserved in each re-distribution >>> step >>>>>> (merging is sufficient, if order is preserved within each incoming >>>>>> channel). Furthermore, deterministic processing can be achieved by >>>> sound >>>>>> window semantics (and there is a bunch of them). Even for >>>>>> single-stream-windows it's a tricky problem; for join-windows it's >>> even >>>>>> harder. From my point of view, it is less important which semantics >>> are >>>>>> chosen; however, the user must be aware how it works. The most tricky >>>>>> part for deterministic processing, is to deal with duplicate >>> timestamps >>>>>> (which cannot be avoided). The timestamping for (intermediate) result >>>>>> tuples, is also an important question to be answered. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote: >>>>>> Hey, >>>>>> >>>>>> I agree with Kostas, if we define the exact semantics how this works, >>>>> this >>>>>> is not more ad-hoc than any other stateful operator with multiple >>>> inputs. >>>>>> (And I don't think any other system support something similar) >>>>>> >>>>>> We need to make some design choices that are similar to the issues we >>>> had >>>>>> for windowing. We need to chose how we want to evaluate the windowing >>>>>> policies (global or local) because that affects what kind of policies >>>> can >>>>>> be parallel, but I can work on these things. >>>>>> >>>>>> I think this is an amazing feature, so I wouldn't necessarily rush >>> the >>>>>> implementation for 0.9 though. >>>>>> >>>>>> And thanks for helping writing these down. >>>>>> >>>>>> Gyula >>>>>> >>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <ktzou...@apache.org >>>>> <mailto:ktzou...@apache.org>> wrote: >>>>>> >>>>>> Yes, we should write these semantics down. I volunteer to help. >>>>>> >>>>>> I don't think that this is very ad-hoc. The semantics are basically >>> the >>>>>> following. Assuming an arriving element from the left side: >>>>>> (1) We find the right-side matches >>>>>> (2) We insert the left-side arrival into the left window >>>>>> (3) We recompute the left window >>>>>> We need to see whether right window re-computation needs to be >>>> triggered >>>>> as >>>>>> well. I think that this way of joining streams is also what the >>>> symmetric >>>>>> hash join algorithms were meant to support. >>>>>> >>>>>> Kostas >>>>>> >>>>>> >>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <se...@apache.org >>>> <mailto: >>>>> se...@apache.org>> wrote: >>>>>> >>>>>> Is the approach of joining an element at a time from one input >>> against >>>> a >>>>>> window on the other input not a bit arbitrary? >>>>>> >>>>>> This just joins whatever currently happens to be the window by the >>> time >>>>>> the >>>>>> single element arrives - that is a bit non-predictable, right? >>>>>> >>>>>> As a more general point: The whole semantics of windowing and when >>> they >>>>>> are >>>>>> triggered are a bit ad-hoc now. It would be really good to start >>>>>> formalizing that a bit and >>>>>> put it down somewhere. Users need to be able to clearly understand >>> and >>>>>> how >>>>>> to predict the output. >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com >>>>> <mailto:gyula.f...@gmail.com>> >>>>>> wrote: >>>>>> >>>>>> I think it should be possible to make this compatible with the >>>>>> .window().every() calls. Maybe if there is some trigger set in >>> "every" >>>>>> we >>>>>> would not join that stream 1 by 1 but every so many elements. The >>>>>> problem >>>>>> here is that the window and every in this case are very-very >>> different >>>>>> than >>>>>> the normal windowing semantics. The window would define the join >>> window >>>>>> for >>>>>> each element of the other stream while every would define how often I >>>>>> join >>>>>> This stream with the other one. >>>>>> >>>>>> We need to think to make this intuitive. >>>>>> >>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> >>>>>> wrote: >>>>>> >>>>>> That would be really neat, the problem I see there, that we do not >>>>>> distinguish between dataStream.window() and >>>>>> dataStream.window().every() >>>>>> currently, they both return WindowedDataStreams and TriggerPolicies >>>>>> of >>>>>> the >>>>>> every call do not make much sense in this setting (in fact >>>>>> practically >>>>>> the >>>>>> trigger is always set to count of one). >>>>>> >>>>>> But of course we could make it in a way, that we check that the >>>>>> eviction >>>>>> should be either null or count of 1, in every other case we throw an >>>>>> exception while building the JobGraph. >>>>>> >>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >>>>>> aljos...@apache.org<mailto:aljos...@apache.org>> >>>>>> wrote: >>>>>> >>>>>> Or you could define it like this: >>>>>> >>>>>> stream_A = a.window(...) >>>>>> stream_B = b.window(...) >>>>>> >>>>>> stream_A.join(stream_B).where().equals().with() >>>>>> >>>>>> So a join would just be a join of two WindowedDataStreamS. This >>>>>> would >>>>>> neatly move the windowing stuff into one place. >>>>>> >>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com> >>>>>> >>>>>> wrote: >>>>>> Big +1 for the proposal for Peter and Gyula. I'm really for >>>>>> bringing >>>>>> the >>>>>> windowing and window join API in sync. >>>>>> >>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org >>> <mailto: >>>>> gyf...@apache.org>> >>>>>> wrote: >>>>>> >>>>>> Hey guys, >>>>>> >>>>>> As Aljoscha has highlighted earlier the current window join >>>>>> semantics >>>>>> in >>>>>> the streaming api doesn't follow the changes in the windowing >>>>>> api. >>>>>> More >>>>>> precisely, we currently only support joins over time windows of >>>>>> equal >>>>>> size >>>>>> on both streams. The reason for this is that we now take a >>>>>> window >>>>>> of >>>>>> each >>>>>> of the two streams and do joins over these pairs. This would be >>>>>> a >>>>>> blocking >>>>>> operation if the windows are not closed at exactly the same time >>>>>> (and >>>>>> since >>>>>> we dont want this we only allow time windows) >>>>>> >>>>>> I talked with Peter who came up with the initial idea of an >>>>>> alternative >>>>>> approach for stream joins which works as follows: >>>>>> >>>>>> Instead of pairing windows for joins, we do element against >>>>>> window >>>>>> joins. >>>>>> What this means is that whenever we receive an element from one >>>>>> of >>>>>> the >>>>>> streams, we join this element with the current window(this >>>>>> window >>>>>> is >>>>>> constantly updated) of the other stream. This is non-blocking on >>>>>> any >>>>>> window >>>>>> definitions as we dont have to wait for windows to be completed >>>>>> and >>>>>> we >>>>>> can >>>>>> use this with any of our predefined policies like Time.of(...), >>>>>> Count.of(...), Delta.of(....). >>>>>> >>>>>> Additionally this also allows some very flexible way of defining >>>>>> window >>>>>> joins. With this we could also define grouped windowing inside >>>>>> if >>>>>> a >>>>>> join. >>>>>> An example of this would be: Join all elements of Stream1 with >>>>>> the >>>>>> last >>>>>> 5 >>>>>> elements by a given windowkey of Stream2 on some join key. >>>>>> >>>>>> This feature can be easily implemented over the current >>>>>> operators, >>>>>> so >>>>>> I >>>>>> already have a working prototype for the simple non-grouped >>>>>> case. >>>>>> My >>>>>> only >>>>>> concern is the API, the best thing I could come up with is >>>>>> something >>>>>> like >>>>>> this: >>>>>> >>>>>> stream_A.join(stream_B).onWindow(windowDefA, >>>>>> windowDefB).by(windowKey1, >>>>>> windowKey2).where(...).equalTo(...).with(...) >>>>>> >>>>>> (the user can omit the "by" and "with" calls) >>>>>> >>>>>> I think this new approach would be worthy of our "flexible >>>>>> windowing" >>>>>> in >>>>>> contrast with the current approach. >>>>>> >>>>>> Regards, >>>>>> Gyula >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>> >
signature.asc
Description: OpenPGP digital signature