Hi Jan,

It's a super interesting use case you have and has a lot of similarity with
complexity that comes up when dealing with time series problems.

I wonder if it would be interesting to see if the pattern generalises
enough to make some utility classes abstracting the complexity from the
user.

Cheers

Reza

On Tue, 21 May 2019, 20:13 Jan Lukavský, <je...@seznam.cz> wrote:

> Hi Reza,
>
> I think it probably would provide enough compression. But it would
> introduce complications and latency for the streaming case. Although I see
> your point, I was trying to figure out if the Beam model should support
> these use cases more "natively".
>
> Cheers,
>
>  Jan
> On 5/21/19 11:03 AM, Reza Rokni wrote:
>
> In a lot of cases the initial combiner can dramatically reduce the amount
> of data in this last phase making it tractable for a lot of use cases.
>
>  I assume in your example the first phase would not provide enough
> compression?
>
> Cheers
>
> Reza
>
> On Tue, 21 May 2019, 16:47 Jan Lukavský, <je...@seznam.cz> wrote:
>
>> Hi Reza, thanks for reaction, comments inline.
>> On 5/21/19 1:02 AM, Reza Rokni wrote:
>>
>> Hi,
>>
>> If I have understood the use case correctly, your output is an ordered
>> counter of state changes.
>>
>> One approach  which might be worth exploring is outlined below, haven't
>> had a chance to test it so could be missing pieces or be plane old wrong (
>> will try and come up with a test example later on to try it out).
>>
>> 1 - Window into a small enough Duration such that the number of
>> elements in a window per key can be read into memory structure for sorting.
>>
>> 2 - GBK
>> 3 - In a DoFn do the ordering and output a Timestamped<V> elements that
>> contain the state changes for just that window and the value of the last
>> element  {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 0)}. This
>> will cause memory pressure so your step 1 is important.
>>
>> This is just an optimization, right?
>>
>> 4- Window these outputs into the Global Window with a Stateful DoFn
>>
>> Because you finally have to do the stateful ParDo in Global window, you
>> will end up with the same problem - the first three steps just might give
>> you some extra time. But if you have enough data (long enough history, of
>> very frequent changes, or both), then you will run into the same issues as
>> without the optimization here. The BagState simply would not be able to
>> hold all the data in batch case.
>>
>> Jan
>>
>> 5-  Add elements to a BagState in the stateful dofn
>> 6 - In the Global Window set an EventTimer to fire at time boundaries
>> that match the time window that you need. Note Timers do not have a read
>> function for the time that they are set. (Here is one way to set
>> metadata to emulate a read function
>> <https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542>)
>> Again this can cause memory pressure.
>> 7 - At each OnTimer,
>> 7a-  read and sort the elements in the BagState,
>> 7b - True up the state changes with the cross-window state changes from
>> the list.
>> 7c - Store the last accumulator into a different State
>>
>> Sorry that was off the top of my head so could be missing things. For
>> example LateData would need to be dealt with outside of this flow...
>>
>> Cheers
>> Reza
>>
>> On Tue, 21 May 2019 at 07:00, Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Thanks for the nice small example of a calculation that depends on
>>> order. You are right that many state machines have this property. I agree
>>> w/ you and Luke that it is convenient for batch processing to sort by event
>>> timestamp before running a stateful ParDo. In streaming you could also
>>> implement "sort by event timestamp" by buffering until you know all earlier
>>> data will be dropped - a slack buffer up to allowed lateness.
>>>
>>> I do not think that it is OK to sort in batch and not in streaming. Many
>>> state machines diverge very rapidly when things are out of order. So each
>>> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs
>>> to deliver sorted data (by some mix of buffering and dropping), or to
>>> reject the pipeline as unsupported.
>>>
>>> And also want to say that this is not the default case - many uses of
>>> state & timers in ParDo yield different results at the element level, but
>>> the results are equivalent at in the big picture. Such as the example of
>>> "assign a unique sequence number to each element" or "group into batches"
>>> it doesn't matter exactly what the result is, only that it meets the spec.
>>> And other cases like user funnels are monotonic enough that you also don't
>>> actually need sorting.
>>>
>>> Kenn
>>>
>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Yes, the problem will arise probably mostly when you have not well
>>>> distributed keys (or too few keys). I'm really not sure if a pure GBK with
>>>> a trigger can solve this - it might help to have data driven trigger. There
>>>> would still be some doubts, though. The main question is still here -
>>>> people say, that sorting by timestamp before stateful ParDo would be
>>>> prohibitively slow, but I don't really see why - the sorting is very
>>>> probably already there. And if not (hash grouping instead of sorted
>>>> grouping), then the sorting would affect only user defined StatefulParDos.
>>>>
>>>> This would suggest that the best way out of this would be really to add
>>>> annotation, so that the author of the pipeline can decide.
>>>>
>>>> If that would be acceptable I think I can try to prepare some basic
>>>> functionality, but I'm not sure, if I would be able to cover all runners /
>>>> sdks.
>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
>>>>
>>>> It is read all per key and window and not just read all (this still
>>>> won't scale with hot keys in the global window). The GBK preceding the
>>>> StatefulParDo will guarantee that you are processing all the values for a
>>>> specific key and window at any given time. Is there a specific
>>>> window/trigger that is missing that you feel would remove the need for you
>>>> to use StatefulParDo?
>>>>
>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Lukasz,
>>>>>
>>>>> > Today, if you must have a strict order, you must guarantee that your
>>>>> StatefulParDo implements the necessary "buffering & sorting" into state.
>>>>>
>>>>> Yes, no problem with that. But this whole discussion started, because
>>>>> *this doesn't work on batch*. You simply cannot first read everything from
>>>>> distributed storage and then buffer it all into memory, just to read it
>>>>> again, but sorted. That will not work. And even if it would, it would be a
>>>>> terrible waste of resources.
>>>>>
>>>>> Jan
>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> This discussion brings many really interesting questions for me. :-)
>>>>>>
>>>>>>  > I don't see batch vs. streaming as part of the model. One can have
>>>>>> microbatch, or even a runner that alternates between different modes.
>>>>>>
>>>>>> Although I understand motivation of this statement, this project name
>>>>>> is
>>>>>> "Apache Beam: An advanced unified programming model". What does the
>>>>>> model unify, if "streaming vs. batch" is not part of the model?
>>>>>>
>>>>>> Using microbatching, chaining of batch jobs, or pure streaming are
>>>>>> exactly the "runtime conditions/characteristics" I refer to. All
>>>>>> these
>>>>>> define several runtime parameters, which in turn define how
>>>>>> well/badly
>>>>>> will the pipeline perform and how many resources might be needed.
>>>>>> From
>>>>>> my point of view, pure streaming should be the most resource
>>>>>> demanding
>>>>>> (if not, why bother with batch? why not run everything in streaming
>>>>>> only? what will there remain to "unify"?).
>>>>>>
>>>>>>  > Fortunately, for batch, only the state for a single key needs to
>>>>>> be
>>>>>> preserved at a time, rather than the state for all keys across the
>>>>>> range
>>>>>> of skew. Of course if you have few or hot keys, one can still have
>>>>>> issues (and this is not specific to StatefulDoFns).
>>>>>>
>>>>>> Yes, but here is still the presumption that my stateful DoFn can
>>>>>> tolerate arbitrary shuffling of inputs. Let me explain the use case
>>>>>> in
>>>>>> more detail.
>>>>>>
>>>>>> Suppose you have input stream consisting of 1s and 0s (and some key
>>>>>> for
>>>>>> each element, which is irrelevant for the demonstration). Your task
>>>>>> is
>>>>>> to calculate in running global window the actual number of changes
>>>>>> between state 0 and state 1 and vice versa. When the state doesn't
>>>>>> change, you don't calculate anything. If input (for given key) would
>>>>>> be
>>>>>> (tN denotes timestamp N):
>>>>>>
>>>>>>   t1: 1
>>>>>>
>>>>>>   t2: 0
>>>>>>
>>>>>>   t3: 0
>>>>>>
>>>>>>   t4: 1
>>>>>>
>>>>>>   t5: 1
>>>>>>
>>>>>>   t6: 0
>>>>>>
>>>>>> then the output should yield (supposing that default state is zero):
>>>>>>
>>>>>>   t1: (one: 1, zero: 0)
>>>>>>
>>>>>>   t2: (one: 1, zero: 1)
>>>>>>
>>>>>>   t3: (one: 1, zero: 1)
>>>>>>
>>>>>>   t4: (one: 2, zero: 1)
>>>>>>
>>>>>>   t5: (one: 2, zero: 1)
>>>>>>
>>>>>>   t6: (one: 2, zero: 2)
>>>>>>
>>>>>> How would you implement this in current Beam semantics?
>>>>>>
>>>>>
>>>>> I think your saying here that I know that my input is ordered in a
>>>>> specific way and since I assume the order when writing my pipeline I can
>>>>> perform this optimization. But there is nothing preventing a runner from
>>>>> noticing that your processing in the global window with a specific type of
>>>>> trigger and re-ordering your inputs/processing to get better performance
>>>>> (since you can't use an AfterWatermark trigger for your pipeline in
>>>>> streaming for the GlobalWindow).
>>>>>
>>>>> Today, if you must have a strict order, you must guarantee that your
>>>>> StatefulParDo implements the necessary "buffering & sorting" into state. I
>>>>> can see why you would want an annotation that says I must have timestamp
>>>>> ordered elements, since it makes writing certain StatefulParDos much
>>>>> easier. StatefulParDo is a low-level function, it really is the "here you
>>>>> go and do whatever you need to but here be dragons" function while
>>>>> windowing and triggering is meant to keep many people from writing
>>>>> StatefulParDo in the first place.
>>>>>
>>>>>
>>>>>>  > Pipelines that fail in the "worst case" batch scenario are likely
>>>>>> to
>>>>>> degrade poorly (possibly catastrophically) when the watermark falls
>>>>>> behind in streaming mode as well.
>>>>>>
>>>>>> But the worst case is defined by input of size (available resources +
>>>>>> single byte) -> pipeline fail. Although it could have finished, given
>>>>>> the right conditions.
>>>>>>
>>>>>>  > This might be reasonable, implemented by default by buffering
>>>>>> everything and releasing elements as the watermark (+lateness)
>>>>>> advances,
>>>>>> but would likely lead to inefficient (though *maybe* easier to reason
>>>>>> about) code.
>>>>>>
>>>>>> Sure, the pipeline will be less efficient, because it would have to
>>>>>> buffer and sort the inputs. But at least it will produce correct
>>>>>> results
>>>>>> in cases where updates to state are order-sensitive.
>>>>>>
>>>>>>  > Would it be roughly equivalent to GBK + FlatMap(lambda (key,
>>>>>> values):
>>>>>> [(key, value) for value in values])?
>>>>>>
>>>>>> I'd say roughly yes, but difference would be in the trigger. The
>>>>>> trigger
>>>>>> should ideally fire as soon as watermark (+lateness) crosses element
>>>>>> with lowest timestamp in the buffer. Although this could be somehow
>>>>>> emulated by fixed trigger each X millis.
>>>>>>
>>>>>>  > Or is the underlying desire just to be able to hint to the runner
>>>>>> that the code may perform better (e.g. require less resources) as
>>>>>> skew
>>>>>> is reduced (and hence to order by timestamp iff it's cheap)?
>>>>>>
>>>>>> No, the sorting would have to be done in streaming case as well. That
>>>>>> is
>>>>>> an imperative of the unified model. I think it is possible to sort by
>>>>>> timestamp only in batch case (and do it for *all* batch stateful
>>>>>> pardos
>>>>>> without annotation), or introduce annotation, but then make the same
>>>>>> guarantees for streaming case as well.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
>>>>>> > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>> >> Hi Robert,
>>>>>> >>
>>>>>> >> yes, I think you rephrased my point - although no *explicit*
>>>>>> guarantees
>>>>>> >> of ordering are given in either mode, there is *implicit* ordering
>>>>>> in
>>>>>> >> streaming case that is due to nature of the processing - the
>>>>>> difference
>>>>>> >> between watermark and timestamp of elements flowing through the
>>>>>> pipeline
>>>>>> >> are generally low (too high difference leads to the overbuffering
>>>>>> >> problem), but there is no such bound on batch.
>>>>>> > Fortunately, for batch, only the state for a single key needs to be
>>>>>> > preserved at a time, rather than the state for all keys across the
>>>>>> > range of skew. Of course if you have few or hot keys, one can still
>>>>>> > have issues (and this is not specific to StatefulDoFns).
>>>>>> >
>>>>>> >> As a result, I see a few possible solutions:
>>>>>> >>
>>>>>> >>    - the best and most natural seems to be extension of the model,
>>>>>> so
>>>>>> >> that it defines batch as not only "streaming pipeline executed in
>>>>>> batch
>>>>>> >> fashion", but "pipeline with at least as good runtime
>>>>>> characteristics as
>>>>>> >> in streaming case, executed in batch fashion", I really don't
>>>>>> think that
>>>>>> >> there are any conflicts with the current model, or that this could
>>>>>> >> affect performance, because the required sorting (as pointed by
>>>>>> >> Aljoscha) is very probably already done during translation of
>>>>>> stateful
>>>>>> >> pardos. Also note that this definition only affects user defined
>>>>>> >> stateful pardos
>>>>>> > I don't see batch vs. streaming as part of the model. One can have
>>>>>> > microbatch, or even a runner that alternates between different
>>>>>> modes.
>>>>>> > The model describes what the valid outputs are given a (sometimes
>>>>>> > partial) set of inputs. It becomes really hard to define things like
>>>>>> > "as good runtime characteristics." Once you allow any
>>>>>> > out-of-orderedness, it is not very feasible to try and define (and
>>>>>> > more cheaply implement) a "upper bound" of acceptable
>>>>>> > out-of-orderedness.
>>>>>> >
>>>>>> > Pipelines that fail in the "worst case" batch scenario are likely to
>>>>>> > degrade poorly (possibly catastrophically) when the watermark falls
>>>>>> > behind in streaming mode as well.
>>>>>> >
>>>>>> >>    - another option would be to introduce annotation for DoFns
>>>>>> (e.g.
>>>>>> >> @RequiresStableTimeCharacteristics), which would result in the
>>>>>> sorting
>>>>>> >> in batch case - but - this extension would have to ensure the
>>>>>> sorting in
>>>>>> >> streaming mode also - it would require definition of allowed
>>>>>> lateness,
>>>>>> >> and triggger (essentially similar to window)
>>>>>> > This might be reasonable, implemented by default by buffering
>>>>>> > everything and releasing elements as the watermark (+lateness)
>>>>>> > advances, but would likely lead to inefficient (though *maybe*
>>>>>> easier
>>>>>> > to reason about) code. Not sure about the semantics of triggering
>>>>>> > here, especially data-driven triggers. Would it be roughly
>>>>>> equivalent
>>>>>> > to GBK + FlatMap(lambda (key, values): [(key, value) for value in
>>>>>> > values])?
>>>>>> >
>>>>>> > Or is the underlying desire just to be able to hint to the runner
>>>>>> that
>>>>>> > the code may perform better (e.g. require less resources) as skew is
>>>>>> > reduced (and hence to order by timestamp iff it's cheap)?
>>>>>> >
>>>>>> >>    - last option would be to introduce these "higher order
>>>>>> guarantees" in
>>>>>> >> some extension DSL (e.g. Euphoria), but that seems to be the worst
>>>>>> >> option to me
>>>>>> >>
>>>>>> >> I see the first two options quite equally good, although the
>>>>>> letter one
>>>>>> >> is probably more time consuming to implement. But it would bring
>>>>>> >> additional feature to streaming case as well.
>>>>>> >>
>>>>>> >> Thanks for any thoughts.
>>>>>> >>
>>>>>> >>    Jan
>>>>>> >>
>>>>>> >> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
>>>>>> >>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz>
>>>>>> wrote:
>>>>>> >>>> Hi Reuven,
>>>>>> >>>>
>>>>>> >>>>> How so? AFAIK stateful DoFns work just fine in batch runners.
>>>>>> >>>> Stateful ParDo works in batch as far, as the logic inside the
>>>>>> state works for absolutely unbounded out-of-orderness of elements. That
>>>>>> basically (practically) can work only for cases, where the order of input
>>>>>> elements doesn't matter. But, "state" can refer to "state machine", and 
>>>>>> any
>>>>>> time you have a state machine involved, then the ordering of elements 
>>>>>> would
>>>>>> matter.
>>>>>> >>> No guarantees on order are provided in *either* streaming or batch
>>>>>> >>> mode by the model. However, it is the case that in order to make
>>>>>> >>> forward progress most streaming runners attempt to limit the
>>>>>> amount of
>>>>>> >>> out-of-orderedness of elements (in terms of event time vs.
>>>>>> processing
>>>>>> >>> time) to make forward progress, which in turn could help cap the
>>>>>> >>> amount of state that must be held concurrently, whereas a batch
>>>>>> runner
>>>>>> >>> may not allow any state to be safely discarded until the whole
>>>>>> >>> timeline from infinite past to infinite future has been observed.
>>>>>> >>>
>>>>>> >>> Also, as pointed out, state is not preserved "batch to batch" in
>>>>>> batch mode.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <
>>>>>> m...@apache.org> wrote:
>>>>>> >>>
>>>>>> >>>>>    batch semantics and streaming semantics differs only in that
>>>>>> I can have GlobalWindow with default trigger on batch and cannot on 
>>>>>> stream
>>>>>> >>>> You can have a GlobalWindow in streaming with a default trigger.
>>>>>> You
>>>>>> >>>> could define additional triggers that do early firings. And you
>>>>>> could
>>>>>> >>>> even trigger the global window by advancing the watermark to
>>>>>> +inf.
>>>>>> >>> IIRC, as a pragmatic note, we prohibited global window with
>>>>>> default
>>>>>> >>> trigger on unbounded PCollections in the SDK because this is more
>>>>>> >>> likely to be user error than an actual desire to have no output
>>>>>> until
>>>>>> >>> drain. But it's semantically valid in the model.
>>>>>>
>>>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>>

Reply via email to