Interesting read. Thanks for the pointer.

Take home message (in my understanding):
  - they support wall-clock, attribute-ts, and count windows
  -> default is attribute-ts (and not wall-clock as in Flink)
  -> it is not specified, if a global order is applied to windows, but I
doubt it, because of their Watermark approach
  - they allow the user to assign timestamps for attribute-ts windows
  - they deal with out-of-order data (-> not sure what the last sentence
means exactly: "...causing the late elements to be emitted as they
arrive." ?)
  - their "Watermark" approach might yield high latencies

However, they don't talk about joins... :(




On 04/24/2015 02:25 PM, Aljoscha Krettek wrote:
> Did anyone read these:
> https://cloud.google.com/dataflow/model/windowing,
> https://cloud.google.com/dataflow/model/triggers ?
> 
> The semantics seem very straightforward and I'm sure the google guys
> spent some time thinking this through. :D
> 
> On Mon, Apr 20, 2015 at 3:43 PM, Stephan Ewen <se...@apache.org> wrote:
>> Perfect! I am eager to see what you came up with!
>>
>> On Sat, Apr 18, 2015 at 2:00 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> Hey all,
>>>
>>> We have spent some time with Asterios, Paris and Jonas to finalize the
>>> windowing semantics (both the current features and the window join), and I
>>> think we made very have come up with a very clear picture.
>>>
>>> We will write down the proposed semantics and publish it to the wiki next
>>> week.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Thu, Apr 16, 2015 at 5:50 PM, Asterios Katsifodimos <
>>> asterios.katsifodi...@tu-berlin.de> wrote:
>>>
>>>> As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere
>>>> Streams does: symmetric hash join.
>>>>
>>>> From [1]:
>>>> "When a tuple is received on an input port, it is inserted into the
>>> window
>>>> corresponding to the input port, which causes the window to trigger. As
>>>> part of the trigger processing, the tuple is compared against all tuples
>>>> inside the window of the opposing input port. If the tuples match, then
>>> an
>>>> output tuple will be produced for each match. If at least one output was
>>>> generated, a window punctuation will be generated after all the outputs."
>>>>
>>>> Cheers,
>>>> Asterios
>>>>
>>>> [1]
>>>>
>>>>
>>> http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html
>>>>
>>>>
>>>>
>>>> On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax <
>>>> mj...@informatik.hu-berlin.de> wrote:
>>>>
>>>>> Hi Paris,
>>>>>
>>>>> thanks for the pointer to the Naiad paper. That is quite interesting.
>>>>>
>>>>> The paper I mentioned [1], does not describe the semantics in detail;
>>> it
>>>>> is more about the implementation for the stream-joins. However, it uses
>>>>> the same semantics (from my understanding) as proposed by Gyula.
>>>>>
>>>>> -Matthias
>>>>>
>>>>> [1] Kang, Naughton, Viglas. "Evaluationg Window Joins over Unbounded
>>>>> Streams". VLDB 2002.
>>>>>
>>>>>
>>>>>
>>>>> On 04/07/2015 12:38 PM, Paris Carbone wrote:
>>>>>> Hello Matthias,
>>>>>>
>>>>>> Sure, ordering guarantees are indeed a tricky thing, I recall having
>>>>> that discussion back in TU Berlin. Bear in mind thought that
>>> DataStream,
>>>>> our abstract data type, represents a *partitioned* unbounded sequence
>>> of
>>>>> events. There are no *global* ordering guarantees made whatsoever in
>>> that
>>>>> model across partitions. If you see it more generally there are many
>>>> “race
>>>>> conditions” in a distributed execution graph of vertices that process
>>>>> multiple inputs asynchronously, especially when you add joins and
>>>>> iterations into the mix (how do you deal with reprocessing “old” tuples
>>>>> that iterate in the graph). Btw have you checked the Naiad paper [1]?
>>>>> Stephan cited a while ago and it is quite relevant to that discussion.
>>>>>>
>>>>>> Also, can you cite the paper with the joining semantics you are
>>>>> referring to? That would be of good help I think.
>>>>>>
>>>>>> Paris
>>>>>>
>>>>>> [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
>>>>>>
>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>>
>>>>>> <https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf>
>>>>>> On 07 Apr 2015, at 11:50, Matthias J. Sax <
>>>> mj...@informatik.hu-berlin.de
>>>>> <mailto:mj...@informatik.hu-berlin.de>> wrote:
>>>>>>
>>>>>> Hi @all,
>>>>>>
>>>>>> please keep me in the loop for this work. I am highly interested and
>>> I
>>>>>> want to help on it.
>>>>>>
>>>>>> My initial thoughts are as follows:
>>>>>>
>>>>>> 1) Currently, system timestamps are used and the suggested approach
>>> can
>>>>>> be seen as state-of-the-art (there is actually a research paper using
>>>>>> the exact same join semantic). Of course, the current approach is
>>>>>> inherently non-deterministic. The advantage is, that there is no
>>>>>> overhead in keeping track of the order of records and the latency
>>>> should
>>>>>> be very low. (Additionally, state-recovery is simplified. Because,
>>> the
>>>>>> processing in inherently non-deterministic, recovery can be done with
>>>>>> relaxed guarantees).
>>>>>>
>>>>>>  2) The user should be able to "switch on" deterministic processing,
>>>>>> ie, records are timestamped (either externally when generated, or
>>>>>> timestamped at the sources). Because deterministic processing adds
>>> some
>>>>>> overhead, the user should decide for it actively.
>>>>>> In this case, the order must be preserved in each re-distribution
>>> step
>>>>>> (merging is sufficient, if order is preserved within each incoming
>>>>>> channel). Furthermore, deterministic processing can be achieved by
>>>> sound
>>>>>> window semantics (and there is a bunch of them). Even for
>>>>>> single-stream-windows it's a tricky problem; for join-windows it's
>>> even
>>>>>> harder. From my point of view, it is less important which semantics
>>> are
>>>>>> chosen; however, the user must be aware how it works. The most tricky
>>>>>> part for deterministic processing, is to deal with duplicate
>>> timestamps
>>>>>> (which cannot be avoided). The timestamping for (intermediate) result
>>>>>> tuples, is also an important question to be answered.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 04/07/2015 11:37 AM, Gyula Fóra wrote:
>>>>>> Hey,
>>>>>>
>>>>>> I agree with Kostas, if we define the exact semantics how this works,
>>>>> this
>>>>>> is not more ad-hoc than any other stateful operator with multiple
>>>> inputs.
>>>>>> (And I don't think any other system support something similar)
>>>>>>
>>>>>> We need to make some design choices that are similar to the issues we
>>>> had
>>>>>> for windowing. We need to chose how we want to evaluate the windowing
>>>>>> policies (global or local) because that affects what kind of policies
>>>> can
>>>>>> be parallel, but I can work on these things.
>>>>>>
>>>>>> I think this is an amazing feature, so I wouldn't necessarily rush
>>> the
>>>>>> implementation for 0.9 though.
>>>>>>
>>>>>> And thanks for helping writing these down.
>>>>>>
>>>>>> Gyula
>>>>>>
>>>>>> On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas <ktzou...@apache.org
>>>>> <mailto:ktzou...@apache.org>> wrote:
>>>>>>
>>>>>> Yes, we should write these semantics down. I volunteer to help.
>>>>>>
>>>>>> I don't think that this is very ad-hoc. The semantics are basically
>>> the
>>>>>> following. Assuming an arriving element from the left side:
>>>>>> (1) We find the right-side matches
>>>>>> (2) We insert the left-side arrival into the left window
>>>>>> (3) We recompute the left window
>>>>>> We need to see whether right window re-computation needs to be
>>>> triggered
>>>>> as
>>>>>> well. I think that this way of joining streams is also what the
>>>> symmetric
>>>>>> hash join algorithms were meant to support.
>>>>>>
>>>>>> Kostas
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 7, 2015 at 10:49 AM, Stephan Ewen <se...@apache.org
>>>> <mailto:
>>>>> se...@apache.org>> wrote:
>>>>>>
>>>>>> Is the approach of joining an element at a time from one input
>>> against
>>>> a
>>>>>> window on the other input not a bit arbitrary?
>>>>>>
>>>>>> This just joins whatever currently happens to be the window by the
>>> time
>>>>>> the
>>>>>> single element arrives - that is a bit non-predictable, right?
>>>>>>
>>>>>> As a more general point: The whole semantics of windowing and when
>>> they
>>>>>> are
>>>>>> triggered are a bit ad-hoc now. It would be really good to start
>>>>>> formalizing that a bit and
>>>>>> put it down somewhere. Users need to be able to clearly understand
>>> and
>>>>>> how
>>>>>> to predict the output.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 12:10 PM, Gyula Fóra <gyula.f...@gmail.com
>>>>> <mailto:gyula.f...@gmail.com>>
>>>>>> wrote:
>>>>>>
>>>>>> I think it should be possible to make this compatible with the
>>>>>> .window().every() calls. Maybe if there is some trigger set in
>>> "every"
>>>>>> we
>>>>>> would not join that stream 1 by 1 but every so many elements. The
>>>>>> problem
>>>>>> here is that the window and every in this case are very-very
>>> different
>>>>>> than
>>>>>> the normal windowing semantics. The window would define the join
>>> window
>>>>>> for
>>>>>> each element of the other stream while every would define how often I
>>>>>> join
>>>>>> This stream with the other one.
>>>>>>
>>>>>> We need to think to make this intuitive.
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 11:23 AM, Márton Balassi <
>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>>
>>>>>> wrote:
>>>>>>
>>>>>> That would be really neat, the problem I see there, that we do not
>>>>>> distinguish between dataStream.window() and
>>>>>> dataStream.window().every()
>>>>>> currently, they both return WindowedDataStreams and TriggerPolicies
>>>>>> of
>>>>>> the
>>>>>> every call do not make much sense in this setting (in fact
>>>>>> practically
>>>>>> the
>>>>>> trigger is always set to count of one).
>>>>>>
>>>>>> But of course we could make it in a way, that we check that the
>>>>>> eviction
>>>>>> should be either null or count of 1, in every other case we throw an
>>>>>> exception while building the JobGraph.
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 8:43 AM, Aljoscha Krettek <
>>>>>> aljos...@apache.org<mailto:aljos...@apache.org>>
>>>>>> wrote:
>>>>>>
>>>>>> Or you could define it like this:
>>>>>>
>>>>>> stream_A = a.window(...)
>>>>>> stream_B = b.window(...)
>>>>>>
>>>>>> stream_A.join(stream_B).where().equals().with()
>>>>>>
>>>>>> So a join would just be a join of two WindowedDataStreamS. This
>>>>>> would
>>>>>> neatly move the windowing stuff into one place.
>>>>>>
>>>>>> On Thu, Apr 2, 2015 at 9:54 PM, Márton Balassi <
>>>>>> balassi.mar...@gmail.com<mailto:balassi.mar...@gmail.com>
>>>>>>
>>>>>> wrote:
>>>>>> Big +1 for the proposal for Peter and Gyula. I'm really for
>>>>>> bringing
>>>>>> the
>>>>>> windowing and window join API in sync.
>>>>>>
>>>>>> On Thu, Apr 2, 2015 at 6:32 PM, Gyula Fóra <gyf...@apache.org
>>> <mailto:
>>>>> gyf...@apache.org>>
>>>>>> wrote:
>>>>>>
>>>>>> Hey guys,
>>>>>>
>>>>>> As Aljoscha has highlighted earlier the current window join
>>>>>> semantics
>>>>>> in
>>>>>> the streaming api doesn't follow the changes in the windowing
>>>>>> api.
>>>>>> More
>>>>>> precisely, we currently only support joins over time windows of
>>>>>> equal
>>>>>> size
>>>>>> on both streams. The reason for this is that we now take a
>>>>>> window
>>>>>> of
>>>>>> each
>>>>>> of the two streams and do joins over these pairs. This would be
>>>>>> a
>>>>>> blocking
>>>>>> operation if the windows are not closed at exactly the same time
>>>>>> (and
>>>>>> since
>>>>>> we dont want this we only allow time windows)
>>>>>>
>>>>>> I talked with Peter who came up with the initial idea of an
>>>>>> alternative
>>>>>> approach for stream joins which works as follows:
>>>>>>
>>>>>> Instead of pairing windows for joins, we do element against
>>>>>> window
>>>>>> joins.
>>>>>> What this means is that whenever we receive an element from one
>>>>>> of
>>>>>> the
>>>>>> streams, we join this element with the current window(this
>>>>>> window
>>>>>> is
>>>>>> constantly updated) of the other stream. This is non-blocking on
>>>>>> any
>>>>>> window
>>>>>> definitions as we dont have to wait for windows to be completed
>>>>>> and
>>>>>> we
>>>>>> can
>>>>>> use this with any of our predefined policies like Time.of(...),
>>>>>> Count.of(...), Delta.of(....).
>>>>>>
>>>>>> Additionally this also allows some very flexible way of defining
>>>>>> window
>>>>>> joins. With this we could also define grouped windowing inside
>>>>>> if
>>>>>> a
>>>>>> join.
>>>>>> An example of this would be: Join all elements of Stream1 with
>>>>>> the
>>>>>> last
>>>>>> 5
>>>>>> elements by a given windowkey of Stream2 on some join key.
>>>>>>
>>>>>> This feature can be easily implemented over the current
>>>>>> operators,
>>>>>> so
>>>>>> I
>>>>>> already have a working prototype for the simple non-grouped
>>>>>> case.
>>>>>> My
>>>>>> only
>>>>>> concern is the API, the best thing I could come up with is
>>>>>> something
>>>>>> like
>>>>>> this:
>>>>>>
>>>>>> stream_A.join(stream_B).onWindow(windowDefA,
>>>>>> windowDefB).by(windowKey1,
>>>>>> windowKey2).where(...).equalTo(...).with(...)
>>>>>>
>>>>>> (the user can omit the "by" and "with" calls)
>>>>>>
>>>>>> I think this new approach would be worthy of our "flexible
>>>>>> windowing"
>>>>>> in
>>>>>> contrast with the current approach.
>>>>>>
>>>>>> Regards,
>>>>>> Gyula
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to