This discussion brings many really interesting questions for me. :-)
> I don't see batch vs. streaming as part of the model. One can have
microbatch, or even a runner that alternates between different modes.
Although I understand motivation of this statement, this project name is
"Apache Beam: An advanced unified programming model". What does the
model unify, if "streaming vs. batch" is not part of the model?
Using microbatching, chaining of batch jobs, or pure streaming are
exactly the "runtime conditions/characteristics" I refer to. All these
define several runtime parameters, which in turn define how well/badly
will the pipeline perform and how many resources might be needed. From
my point of view, pure streaming should be the most resource demanding
(if not, why bother with batch? why not run everything in streaming
only? what will there remain to "unify"?).
> Fortunately, for batch, only the state for a single key needs to be
preserved at a time, rather than the state for all keys across the range
of skew. Of course if you have few or hot keys, one can still have
issues (and this is not specific to StatefulDoFns).
Yes, but here is still the presumption that my stateful DoFn can
tolerate arbitrary shuffling of inputs. Let me explain the use case in
more detail.
Suppose you have input stream consisting of 1s and 0s (and some key for
each element, which is irrelevant for the demonstration). Your task is
to calculate in running global window the actual number of changes
between state 0 and state 1 and vice versa. When the state doesn't
change, you don't calculate anything. If input (for given key) would be
(tN denotes timestamp N):
t1: 1
t2: 0
t3: 0
t4: 1
t5: 1
t6: 0
then the output should yield (supposing that default state is zero):
t1: (one: 1, zero: 0)
t2: (one: 1, zero: 1)
t3: (one: 1, zero: 1)
t4: (one: 2, zero: 1)
t5: (one: 2, zero: 1)
t6: (one: 2, zero: 2)
How would you implement this in current Beam semantics?
> Pipelines that fail in the "worst case" batch scenario are likely to
degrade poorly (possibly catastrophically) when the watermark falls
behind in streaming mode as well.
But the worst case is defined by input of size (available resources +
single byte) -> pipeline fail. Although it could have finished, given
the right conditions.
> This might be reasonable, implemented by default by buffering
everything and releasing elements as the watermark (+lateness) advances,
but would likely lead to inefficient (though *maybe* easier to reason
about) code.
Sure, the pipeline will be less efficient, because it would have to
buffer and sort the inputs. But at least it will produce correct results
in cases where updates to state are order-sensitive.
> Would it be roughly equivalent to GBK + FlatMap(lambda (key, values):
[(key, value) for value in values])?
I'd say roughly yes, but difference would be in the trigger. The trigger
should ideally fire as soon as watermark (+lateness) crosses element
with lowest timestamp in the buffer. Although this could be somehow
emulated by fixed trigger each X millis.
> Or is the underlying desire just to be able to hint to the runner
that the code may perform better (e.g. require less resources) as skew
is reduced (and hence to order by timestamp iff it's cheap)?
No, the sorting would have to be done in streaming case as well. That is
an imperative of the unified model. I think it is possible to sort by
timestamp only in batch case (and do it for *all* batch stateful pardos
without annotation), or introduce annotation, but then make the same
guarantees for streaming case as well.
Jan
On 5/20/19 4:41 PM, Robert Bradshaw wrote:
On Mon, May 20, 2019 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote:
Hi Robert,
yes, I think you rephrased my point - although no *explicit* guarantees
of ordering are given in either mode, there is *implicit* ordering in
streaming case that is due to nature of the processing - the difference
between watermark and timestamp of elements flowing through the pipeline
are generally low (too high difference leads to the overbuffering
problem), but there is no such bound on batch.
Fortunately, for batch, only the state for a single key needs to be
preserved at a time, rather than the state for all keys across the
range of skew. Of course if you have few or hot keys, one can still
have issues (and this is not specific to StatefulDoFns).
As a result, I see a few possible solutions:
- the best and most natural seems to be extension of the model, so
that it defines batch as not only "streaming pipeline executed in batch
fashion", but "pipeline with at least as good runtime characteristics as
in streaming case, executed in batch fashion", I really don't think that
there are any conflicts with the current model, or that this could
affect performance, because the required sorting (as pointed by
Aljoscha) is very probably already done during translation of stateful
pardos. Also note that this definition only affects user defined
stateful pardos
I don't see batch vs. streaming as part of the model. One can have
microbatch, or even a runner that alternates between different modes.
The model describes what the valid outputs are given a (sometimes
partial) set of inputs. It becomes really hard to define things like
"as good runtime characteristics." Once you allow any
out-of-orderedness, it is not very feasible to try and define (and
more cheaply implement) a "upper bound" of acceptable
out-of-orderedness.
Pipelines that fail in the "worst case" batch scenario are likely to
degrade poorly (possibly catastrophically) when the watermark falls
behind in streaming mode as well.
- another option would be to introduce annotation for DoFns (e.g.
@RequiresStableTimeCharacteristics), which would result in the sorting
in batch case - but - this extension would have to ensure the sorting in
streaming mode also - it would require definition of allowed lateness,
and triggger (essentially similar to window)
This might be reasonable, implemented by default by buffering
everything and releasing elements as the watermark (+lateness)
advances, but would likely lead to inefficient (though *maybe* easier
to reason about) code. Not sure about the semantics of triggering
here, especially data-driven triggers. Would it be roughly equivalent
to GBK + FlatMap(lambda (key, values): [(key, value) for value in
values])?
Or is the underlying desire just to be able to hint to the runner that
the code may perform better (e.g. require less resources) as skew is
reduced (and hence to order by timestamp iff it's cheap)?
- last option would be to introduce these "higher order guarantees" in
some extension DSL (e.g. Euphoria), but that seems to be the worst
option to me
I see the first two options quite equally good, although the letter one
is probably more time consuming to implement. But it would bring
additional feature to streaming case as well.
Thanks for any thoughts.
Jan
On 5/20/19 12:41 PM, Robert Bradshaw wrote:
On Fri, May 17, 2019 at 4:48 PM Jan Lukavský <je...@seznam.cz> wrote:
Hi Reuven,
How so? AFAIK stateful DoFns work just fine in batch runners.
Stateful ParDo works in batch as far, as the logic inside the state works for absolutely unbounded
out-of-orderness of elements. That basically (practically) can work only for cases, where the order
of input elements doesn't matter. But, "state" can refer to "state machine",
and any time you have a state machine involved, then the ordering of elements would matter.
No guarantees on order are provided in *either* streaming or batch
mode by the model. However, it is the case that in order to make
forward progress most streaming runners attempt to limit the amount of
out-of-orderedness of elements (in terms of event time vs. processing
time) to make forward progress, which in turn could help cap the
amount of state that must be held concurrently, whereas a batch runner
may not allow any state to be safely discarded until the whole
timeline from infinite past to infinite future has been observed.
Also, as pointed out, state is not preserved "batch to batch" in batch mode.
On Thu, May 16, 2019 at 3:59 PM Maximilian Michels <m...@apache.org> wrote:
batch semantics and streaming semantics differs only in that I can have
GlobalWindow with default trigger on batch and cannot on stream
You can have a GlobalWindow in streaming with a default trigger. You
could define additional triggers that do early firings. And you could
even trigger the global window by advancing the watermark to +inf.
IIRC, as a pragmatic note, we prohibited global window with default
trigger on unbounded PCollections in the SDK because this is more
likely to be user error than an actual desire to have no output until
drain. But it's semantically valid in the model.