Did anyone read these: https://cloud.google.com/dataflow/model/windowing, https://cloud.google.com/dataflow/model/triggers ?
The semantics seem very straightforward and I'm sure the google guys spent some time thinking this through. :D On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <se...@apache.org> wrote: > Perfect! I am eager to see what you came up with! > > On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hey all, >> >> We have spent some time with Asterios, Paris and Jonas to finalize the >> windowing semantics (both the current features and the window join), and I >> think we made very have come up with a very clear picture. >> >> We will write down the proposed semantics and publish it to the wiki next >> week. >> >> Cheers, >> Gyula >> >> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos < >> asterios.katsifodi...@tu-berlin.de> wrote: >> >> > As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere >> > Streams does: symmetric hash join. >> > >> > From [1]: >> > "When a tuple is received on an input port, it is inserted into the >> window >> > corresponding to the input port, which causes the window to trigger. As >> > part of the trigger processing, the tuple is compared against all tuples >> > inside the window of the opposing input port. If the tuples match, then >> an >> > output tuple will be produced for each match. If at least one output was >> > generated, a window punctuation will be generated after all the outputs." >> > >> > Cheers, >> > Asterios >> > >> > [1] >> > >> > >> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html >> > >> > >> > >> > On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax < >> > mj...@informatik.hu-berlin.de> wrote: >> > >> > > Hi Paris, >> > > >> > > thanks for the pointer to the Naiad paper. That is quite interesting. >> > > >> > > The paper I mentioned [1], does not describe the semantics in detail; >> it >> > > is more about the implementation for the stream-joins. However, it uses >> > > the same semantics (from my understanding) as proposed by Gyula. >> > > >> > > -Matthias >> > > >> > > [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded >> > > Streams". VLDB 2002. >> > > >> > > >> > > >> > > On 04/07/2015 12:38 PM, Paris Carbone wrote: >> > > > Hello Matthias, >> > > > >> > > > Sure, ordering guarantees are indeed a tricky thing, I recall having >> > > that discussion back in TU Berlin. Bear in mind thought that >> DataStream, >> > > our abstract data type, represents a *partitioned* unbounded sequence >> of >> > > events. There are no *global* ordering guarantees made whatsoever in >> that >> > > model across partitions. If you see it more generally there are many >> > “race >> > > conditions” in a distributed execution graph of vertices that process >> > > multiple inputs asynchronously, especially when you add joins and >> > > iterations into the mix (how do you deal with reprocessing “old” tuples >> > > that iterate in the graph). Btw have you checked the Naiad paper [1]? >> > > Stephan cited a while ago and it is quite relevant to that discussion. >> > > > >> > > > Also, can you cite the paper with the joining semantics you are >> > > referring to? That would be of good help I think. >> > > > >> > > > Paris >> > > > >> > > > [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf >> > > > >> > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >> > > > >> > > > <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf> >> > > > On 07 Apr 2015, at 11:50, Matthias J. Sax < >> > mj...@informatik.hu-berlin.de >> > > <mailto:mj...@informatik.hu-berlin.de>> wrote: >> > > > >> > > > Hi @all, >> > > > >> > > > please keep me in the loop for this work. I am highly interested and >> I >> > > > want to help on it. >> > > > >> > > > My initial thoughts are as follows: >> > > > >> > > > 1) Currently, system timestamps are used and the suggested approach >> can >> > > > be seen as state-of-the-art (there is actually a research paper using >> > > > the exact same join semantic). Of course, the current approach is >> > > > inherently non-deterministic. The advantage is, that there is no >> > > > overhead in keeping track of the order of records and the latency >> > should >> > > > be very low. (Additionally, state-recovery is simplified. Because, >> the >> > > > processing in inherently non-deterministic, recovery can be done with >> > > > relaxed guarantees). >> > > > >> > > > 2) The user should be able to "switch on" deterministic processing, >> > > > ie, records are timestamped (either externally when generated, or >> > > > timestamped at the sources). Because deterministic processing adds >> some >> > > > overhead, the user should decide for it actively. >> > > > In this case, the order must be preserved in each re-distribution >> step >> > > > (merging is sufficient, if order is preserved within each incoming >> > > > channel). Furthermore, deterministic processing can be achieved by >> > sound >> > > > window semantics (and there is a bunch of them). Even for >> > > > single-stream-windows it's a tricky problem; for join-windows it's >> even >> > > > harder. From my point of view, it is less important which semantics >> are >> > > > chosen; however, the user must be aware how it works. The most tricky >> > > > part for deterministic processing, is to deal with duplicate >> timestamps >> > > > (which cannot be avoided). The timestamping for (intermediate) result >> > > > tuples, is also an important question to be answered. >> > > > >> > > > >> > > > -Matthias >> > > > >> > > > >> > > > On 04/07/2015 11:37 AM, Gyula Fóra wrote: >> > > > Hey, >> > > > >> > > > I agree with Kostas, if we define the exact semantics how this works, >> > > this >> > > > is not more ad-hoc than any other stateful operator with multiple >> > inputs. >> > > > (And I don't think any other system support something similar) >> > > > >> > > > We need to make some design choices that are similar to the issues we >> > had >> > > > for windowing. We need to chose how we want to evaluate the windowing >> > > > policies (global or local) because that affects what kind of policies >> > can >> > > > be parallel, but I can work on these things. >> > > > >> > > > I think this is an amazing feature, so I wouldn't necessarily rush >> the >> > > > implementation for 0.9 though. >> > > > >> > > > And thanks for helping writing these down. >> > > > >> > > > Gyula >> > > > >> > > > On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <ktzou...@apache.org >> > > <mailto:ktzou...@apache.org>> wrote: >> > > > >> > > > Yes, we should write these semantics down. I volunteer to help. >> > > > >> > > > I don't think that this is very ad-hoc. The semantics are basically >> the >> > > > following. Assuming an arriving element from the left side: >> > > > (1) We find the right-side matches >> > > > (2) We insert the left-side arrival into the left window >> > > > (3) We recompute the left window >> > > > We need to see whether right window re-computation needs to be >> > triggered >> > > as >> > > > well. I think that this way of joining streams is also what the >> > symmetric >> > > > hash join algorithms were meant to support. >> > > > >> > > > Kostas >> > > > >> > > > >> > > > On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <se...@apache.org >> > <mailto: >> > > se...@apache.org>> wrote: >> > > > >> > > > Is the approach of joining an element at a time from one input >> against >> > a >> > > > window on the other input not a bit arbitrary? >> > > > >> > > > This just joins whatever currently happens to be the window by the >> time >> > > > the >> > > > single element arrives - that is a bit non-predictable, right? >> > > > >> > > > As a more general point: The whole semantics of windowing and when >> they >> > > > are >> > > > triggered are a bit ad-hoc now. It would be really good to start >> > > > formalizing that a bit and >> > > > put it down somewhere. Users need to be able to clearly understand >> and >> > > > how >> > > > to predict the output. >> > > > >> > > > >> > > > >> > > > On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com >> > > <mailto:gyula.f...@gmail.com>> >> > > > wrote: >> > > > >> > > > I think it should be possible to make this compatible with the >> > > > .window().every() calls. Maybe if there is some trigger set in >> "every" >> > > > we >> > > > would not join that stream 1 by 1 but every so many elements. The >> > > > problem >> > > > here is that the window and every in this case are very-very >> different >> > > > than >> > > > the normal windowing semantics. The window would define the join >> window >> > > > for >> > > > each element of the other stream while every would define how often I >> > > > join >> > > > This stream with the other one. >> > > > >> > > > We need to think to make this intuitive. >> > > > >> > > > On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi < >> > > > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>> >> > > > wrote: >> > > > >> > > > That would be really neat, the problem I see there, that we do not >> > > > distinguish between dataStream.window() and >> > > > dataStream.window().every() >> > > > currently, they both return WindowedDataStreams and TriggerPolicies >> > > > of >> > > > the >> > > > every call do not make much sense in this setting (in fact >> > > > practically >> > > > the >> > > > trigger is always set to count of one). >> > > > >> > > > But of course we could make it in a way, that we check that the >> > > > eviction >> > > > should be either null or count of 1, in every other case we throw an >> > > > exception while building the JobGraph. >> > > > >> > > > On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek < >> > > > aljos...@apache.org<mailto:aljos...@apache.org>> >> > > > wrote: >> > > > >> > > > Or you could define it like this: >> > > > >> > > > stream_A = a.window(...) >> > > > stream_B = b.window(...) >> > > > >> > > > stream_A.join(stream_B).where().equals().with() >> > > > >> > > > So a join would just be a join of two WindowedDataStreamS. This >> > > > would >> > > > neatly move the windowing stuff into one place. >> > > > >> > > > On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi < >> > > > balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com> >> > > > >> > > > wrote: >> > > > Big +1 for the proposal for Peter and Gyula. I'm really for >> > > > bringing >> > > > the >> > > > windowing and window join API in sync. >> > > > >> > > > On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org >> <mailto: >> > > gyf...@apache.org>> >> > > > wrote: >> > > > >> > > > Hey guys, >> > > > >> > > > As Aljoscha has highlighted earlier the current window join >> > > > semantics >> > > > in >> > > > the streaming api doesn't follow the changes in the windowing >> > > > api. >> > > > More >> > > > precisely, we currently only support joins over time windows of >> > > > equal >> > > > size >> > > > on both streams. The reason for this is that we now take a >> > > > window >> > > > of >> > > > each >> > > > of the two streams and do joins over these pairs. This would be >> > > > a >> > > > blocking >> > > > operation if the windows are not closed at exactly the same time >> > > > (and >> > > > since >> > > > we dont want this we only allow time windows) >> > > > >> > > > I talked with Peter who came up with the initial idea of an >> > > > alternative >> > > > approach for stream joins which works as follows: >> > > > >> > > > Instead of pairing windows for joins, we do element against >> > > > window >> > > > joins. >> > > > What this means is that whenever we receive an element from one >> > > > of >> > > > the >> > > > streams, we join this element with the current window(this >> > > > window >> > > > is >> > > > constantly updated) of the other stream. This is non-blocking on >> > > > any >> > > > window >> > > > definitions as we dont have to wait for windows to be completed >> > > > and >> > > > we >> > > > can >> > > > use this with any of our predefined policies like Time.of(...), >> > > > Count.of(...), Delta.of(....). >> > > > >> > > > Additionally this also allows some very flexible way of defining >> > > > window >> > > > joins. With this we could also define grouped windowing inside >> > > > if >> > > > a >> > > > join. >> > > > An example of this would be: Join all elements of Stream1 with >> > > > the >> > > > last >> > > > 5 >> > > > elements by a given windowkey of Stream2 on some join key. >> > > > >> > > > This feature can be easily implemented over the current >> > > > operators, >> > > > so >> > > > I >> > > > already have a working prototype for the simple non-grouped >> > > > case. >> > > > My >> > > > only >> > > > concern is the API, the best thing I could come up with is >> > > > something >> > > > like >> > > > this: >> > > > >> > > > stream_A.join(stream_B).onWindow(windowDefA, >> > > > windowDefB).by(windowKey1, >> > > > windowKey2).where(...).equalTo(...).with(...) >> > > > >> > > > (the user can omit the "by" and "with" calls) >> > > > >> > > > I think this new approach would be worthy of our "flexible >> > > > windowing" >> > > > in >> > > > contrast with the current approach. >> > > > >> > > > Regards, >> > > > Gyula >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > >> > >>