Sequence metadata does have the disadvantage that users can no longer use
the types coming from the source. You must create a new type that contains
a sequence number (unless Beam provides this). It also gets awkward with
Flatten - the sequence number is no longer enough, you must also encode
which side of the flatten each element came from.

On Tue, May 28, 2019 at 3:18 AM Jan Lukavský <je...@seznam.cz> wrote:

> As I understood it, Kenn was supporting the idea that sequence metadata
> is preferable over FIFO. I was trying to point out, that it even should
> provide the same functionally as FIFO, plus one important more -
> reproducibility and ability to being persisted and reused the same way
> in batch and streaming.
>
> There is no doubt, that sequence metadata can be stored in every
> storage. But, regarding some implicit ordering that sources might have -
> yes, of course, data written into HDFS or Cloud Storage has ordering,
> but only partial - inside some bulk (e.g. file) and the ordering is not
> defined correctly on boundaries of these bulks (between files). That is
> why I'd say, that ordering of sources is relevant only for
> (partitioned!) streaming sources and generally always reduces to
> sequence metadata (e.g. offsets).
>
> Jan
>
> On 5/28/19 11:43 AM, Robert Bradshaw wrote:
> > Huge +1 to all Kenn said.
> >
> > Jan, batch sources can have orderings too, just like Kafka. I think
> > it's reasonable (for both batch and streaming) that if a source has an
> > ordering that is an important part of the data, it should preserve
> > this ordering into the data itself (e.g. as sequence numbers, offsets,
> > etc.)
> >
> > On Fri, May 24, 2019 at 10:35 PM Kenneth Knowles <k...@apache.org>
> wrote:
> >> I strongly prefer explicit sequence metadata over FIFO requirements,
> because:
> >>
> >>   - FIFO is complex to specify: for example Dataflow has "per stage
> key-to-key" FIFO today, but it is not guaranteed to remain so (plus "stage"
> is not a portable concept, nor even guaranteed to remain a Dataflow concept)
> >>   - complex specifications are by definition poor usability (if
> necessary, then it is what it is)
> >>   - overly restricts the runner, reduces parallelism, for example any
> non-stateful ParDo has per-element parallelism, not per "key"
> >>   - another perspective on that: FIFO makes everyone pay rather than
> just the transform that requires exactly sequencing
> >>   - previous implementation details like reshuffles become part of the
> model
> >>   - I'm not even convinced the use cases involved are addressed by some
> careful FIFO restrictions; many sinks re-key and they would all have to
> become aware of how keying of a sequence of "stages" affects the end-to-end
> FIFO
> >>
> >> A noop becoming a non-noop is essentially the mathematical definition
> of moving from higher-level to lower-level abstraction.
> >>
> >> So this strikes at the core question of what level of abstraction Beam
> aims to represent. Lower-level means there are fewer possible
> implementations and it is more tied to the underlying architecture, and
> anything not near-exact match pays a huge penalty. Higher-level means there
> are more implementations possible with different tradeoffs, though they may
> all pay a minor penalty.
> >>
> >> I could be convinced to change my mind, but it needs some extensive
> design, examples, etc. I think it is probably about the most consequential
> design decision in the whole Beam model, around the same level as the
> decision to use ParDo and GBK as the primitives IMO.
> >>
> >> Kenn
> >>
> >> On Thu, May 23, 2019 at 10:17 AM Reuven Lax <re...@google.com> wrote:
> >>> Not really. I'm suggesting that some variant of FIFO ordering is
> necessary, which requires either runners natively support FIFO ordering or
> transforms adding some extra sequence number to each record to sort by.
> >>>
> >>> I still think your proposal is very useful by the way. I'm merely
> pointing out that to solve the state-machine problem we probably need
> something more.
> >>>
> >>> Reuven
> >>>
> >>> On Thu, May 23, 2019 at 9:50 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>> Hi,
> >>>> yes. It seems that ordering by user supplied UDF makes sense and I
> will update the design proposal accordingly.
> >>>> Would that solve the issues you mention?
> >>>> Jan
> >>>> ---------- Původní e-mail ----------
> >>>> Od: Reuven Lax <re...@google.com>
> >>>> Komu: dev <dev@beam.apache.org>
> >>>> Datum: 23. 5. 2019 18:44:38
> >>>> Předmět: Re: Definition of Unified model
> >>>>
> >>>> I'm simply saying that timestamp ordering is insufficient for state
> machines. I wasn't proposing Kafka as a solution - that was simply an
> example of how people solve this problem in other scenarios.
> >>>>
> >>>> BTW another example of ordering: Imagine today that you have a
> triggered Sum aggregation writing out to a key-value sink. In theory we
> provide no ordering, so the sink might write the triggered sums in the
> wrong order, ending up with an incorrect value in the sink. In this case
> you probably want values ordered by trigger pane index.
> >>>>
> >>>> Reuven
> >>>>
> >>>> On Thu, May 23, 2019 at 8:59 AM Jan Lukavský <je...@seznam.cz> wrote:
> >>>>
> >>>> Hi Reuven,
> >>>> I share the view point of Robert. I think the isuue you refer to is
> not in reality related to timestamps, but to the fact, that ordering of
> events in time is observer dependent (either caused by relativity, or time
> skew, essentially this has the same consequences). And the resolution in
> fact isn't Kafka, but generally an authoritative observer, that tells you
> "I saw the events in this order". And you either have one (and have the
> outcome of his observation persisted in the data - e.g. as offset in Kafka
> partition), then you should be able to use it (maybe that suggests afterall
> that sorting by some user supplied UDF might make sense), or do not have
> it, and then any interpretation of the data seems to be equally valid.
> Although determinism is fine, of course.
> >>>> Jan
> >>>> ---------- Původní e-mail ----------
> >>>> Od: Reuven Lax <re...@google.com>
> >>>> Komu: dev <dev@beam.apache.org>
> >>>> Datum: 23. 5. 2019 17:39:12
> >>>> Předmět: Re: Definition of Unified model
> >>>>
> >>>> So an example would be elements of type "startUserSession" and
> "endUserSession" (website sessions, not Beam sessions). Logically you may
> need to process them in the correct order if you have any sort of
> state-machine logic. However timestamp ordering is never guaranteed to
> match the logical ordering. Not only might you have several elements with
> the same timestamp, but in reality time skew across backend servers can
> cause the events to have timestamps in reverse order of the actual
> causality order.
> >>>>
> >>>> People do solve this problem today though. Publish the events to
> Kafka, making sure that events for the same user end up in the same Kafka
> partition. This ensures that the events appear in the Kafka partitions in
> causality order, even if the timestamp order doesn't match. The your Kafka
> subscriber simply process the elements in each partition in order.
> >>>>
> >>>> I think the ability to impose FIFO causality ordering is what's
> needed for any state-machine work. Timestamp ordering has advantages
> (though often I think the advantage is in state), but does not solve this
> problem.
> >>>>
> >>>> Reuven
> >>>>
> >>>> On Thu, May 23, 2019 at 7:48 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>>
> >>>> Good point.
> >>>>
> >>>> The "implementation-specific" way I would do this is
> >>>> window-by-instant, followed by a DoFn that gets all the elements with
> >>>> the same timestamp and sorts/acts accordingly, but this counts on the
> >>>> runner producing windows in timestamp order (likely?) and also the
> >>>> subsequent DoFn getting them in this order (also likely, due to
> >>>> fusion).
> >>>>
> >>>> One could make the argument that, though it does not provide
> >>>> deterministic behavior, getting elements of the same timestamp in
> >>>> different orders should produce equally valid interpretations of the
> >>>> data. (After all, due to relatively, timestamps are not technically
> >>>> well ordered across space.) I can see how data-dependent tiebreakers
> >>>> could be useful, or promises of preservation of order between
> >>>> operations.
> >>>>
> >>>> - Robert
> >>>>
> >>>> On Thu, May 23, 2019 at 4:18 PM Reuven Lax <re...@google.com> wrote:
> >>>>> So Jan's example of state machines is quite a valid use case for
> ordering. However in my experience, timestamp ordering is insufficient for
> state machines. Elements that cause state transitions might come in with
> the exact same timestamp, yet still have a necessary ordering. Especially
> given Beam's decision to have milliseconds timestamps this is possible, but
> even at microsecond or nanosecond precision this can happen at scale. To
> handle state machines you usually need some sort of FIFO ordering along
> with an ordered sources, such as Kafka, not timestamp ordering.
> >>>>>
> >>>>> Reuven
> >>>>>
> >>>>> On Thu, May 23, 2019 at 12:32 AM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>> Hi all,
> >>>>>>
> >>>>>> thanks everyone for this discussion. I think I have gathered enough
> >>>>>> feedback to be able to put down a proposition for changes, which I
> will
> >>>>>> do and send to this list for further discussion. There are still
> doubts
> >>>>>> remaining the non-determinism and it's relation to outputs
> stability vs.
> >>>>>> latency. But I will try to clarify all this in the design document.
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>>    Jan
> >>>>>>
> >>>>>> On 5/22/19 3:49 PM, Maximilian Michels wrote:
> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my
> >>>>>>>> current understanding.
> >>>>>>> In essence your description of how exactly-once works in Flink is
> >>>>>>> correct. The general assumption in Flink is that pipelines must be
> >>>>>>> deterministic and thus produce idempotent writes in the case of
> >>>>>>> failures. However, that doesn't mean Beam sinks can't guarantee a
> bit
> >>>>>>> more with what Flink has to offer.
> >>>>>>>
> >>>>>>> Luke already mentioned the design discussions for
> @RequiresStableInput
> >>>>>>> which ensures idempotent writes for non-deterministic pipelines.
> This
> >>>>>>> is not part of the model but an optional Beam feature.
> >>>>>>>
> >>>>>>> We recently implemented support for @RequiresStableInput in the
> Flink
> >>>>>>> Runner. Reuven mentioned the Flink checkpoint confirmation, which
> >>>>>>> allows us to buffer (and checkpoint) processed data and only emit
> it
> >>>>>>> once a Flink checkpoint has completed.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Max
> >>>>>>>
> >>>>>>> On 21.05.19 16:49, Jan Lukavský wrote:
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>>   > Actually, I think it is a larger (open) question whether
> exactly
> >>>>>>>> once is guaranteed by the model or whether runners are allowed to
> >>>>>>>> relax that. I would think, however, that sources correctly
> >>>>>>>> implemented should be idempotent when run atop an exactly once
> >>>>>>>> infrastructure such as Flink of Dataflow.
> >>>>>>>>
> >>>>>>>> I would assume, that the model basically inherits guarantees of
> >>>>>>>> underlying infrastructure. Because Flink does not work as you
> >>>>>>>> described (atomic commit of inputs, state and outputs), but
> rather a
> >>>>>>>> checkpoint mark is flowing through the DAG much like watermark
> and on
> >>>>>>>> failures operators are restored and data reprocessed, it (IMHO)
> >>>>>>>> implies, that you have exactly once everywhere in the DAG *but*
> >>>>>>>> sinks. That is because sinks cannot be restored to previous state,
> >>>>>>>> instead sinks are supposed to be idempotent in order for the
> exactly
> >>>>>>>> once to really work (or at least be able to commit outputs on
> >>>>>>>> checkpoint in sink). That implies that if you don't have sink
> that is
> >>>>>>>> able to commit outputs atomically on checkpoint, the pipeline
> >>>>>>>> execution should be deterministic upon retries, otherwise shadow
> >>>>>>>> writes from failed paths of the pipeline might appear.
> >>>>>>>>
> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my
> >>>>>>>> current understanding.
> >>>>>>>>
> >>>>>>>>   > Sounds like we should make this clearer.
> >>>>>>>>
> >>>>>>>> I meant that you are right that we must not in any thoughts we are
> >>>>>>>> having forget that streams are by definition out-of-order. That is
> >>>>>>>> property that we cannot change. But - that doesn't limit us from
> >>>>>>>> creating operator that presents the data to UDF as if the stream
> was
> >>>>>>>> ideally sorted. It can do that by introducing latency, of course.
> >>>>>>>>
> >>>>>>>> On 5/21/19 4:01 PM, Robert Bradshaw wrote:
> >>>>>>>>> Reza: One could provide something like this as a utility class,
> but
> >>>>>>>>> one downside is that it is not scale invariant. It requires a
> tuning
> >>>>>>>>> parameter that, if to small, won't mitigate the problem, but if
> to
> >>>>>>>>> big, greatly increases latency. (Possibly one could define a
> dynamic
> >>>>>>>>> session-like window to solve this though...) It also might be
> harder
> >>>>>>>>> for runners that *can* cheaply present stuff in timestamp order
> to
> >>>>>>>>> optimize. (That and, in practice, our annotation-style process
> methods
> >>>>>>>>> don't lend themselves to easy composition.) I think it could
> work in
> >>>>>>>>> specific cases though.
> >>>>>>>>>
> >>>>>>>>> More inline below.
> >>>>>>>>>
> >>>>>>>>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>>>>> Hi Robert,
> >>>>>>>>>>
> >>>>>>>>>>    > Beam has an exactly-once model. If the data was consumed,
> state
> >>>>>>>>>> mutated, and outputs written downstream (these three are
> committed
> >>>>>>>>>> together atomically) it will not be replayed. That does not, of
> >>>>>>>>>> course,
> >>>>>>>>>> solve the non-determanism due to ordering (including the fact
> that two
> >>>>>>>>>> operations reading the same PCollection may view different
> ordering).
> >>>>>>>>>>
> >>>>>>>>>> I think what you describe is a property of a runner, not of the
> model,
> >>>>>>>>>> right? I think if I run my pipeline on Flink I will not get this
> >>>>>>>>>> atomicity, because although Flink uses also exactly-once model
> if
> >>>>>>>>>> might
> >>>>>>>>>> write outputs multiple times.
> >>>>>>>>> Actually, I think it is a larger (open) question whether exactly
> once
> >>>>>>>>> is guaranteed by the model or whether runners are allowed to
> relax
> >>>>>>>>> that. I would think, however, that sources correctly implemented
> >>>>>>>>> should be idempotent when run atop an exactly once
> infrastructure such
> >>>>>>>>> as Flink of Dataflow.
> >>>>>>>>>
> >>>>>>>>>>    > 1) Is it correct for a (Stateful)DoFn to assume elements
> are
> >>>>>>>>>> received
> >>>>>>>>>> in a specific order? In the current model, it is not. Being
> able to
> >>>>>>>>>> read, handle, and produced out-of-order data, including late
> data,
> >>>>>>>>>> is a
> >>>>>>>>>> pretty fundamental property of distributed systems.
> >>>>>>>>>>
> >>>>>>>>>> Yes, absolutely. The argument here is not that Stateful ParDo
> should
> >>>>>>>>>> presume to receive elements in any order, but to _present_ it as
> >>>>>>>>>> such to
> >>>>>>>>>> the user @ProcessElement function.
> >>>>>>>>> Sounds like we should make this clearer.
> >>>>>>>>>
> >>>>>>>>>>    > 2) Given that some operations are easier (or possibly only
> >>>>>>>>>> possible)
> >>>>>>>>>> to write when operating on ordered data, and that different
> runners
> >>>>>>>>>> may
> >>>>>>>>>> have (significantly) cheaper ways to provide this ordering than
> can be
> >>>>>>>>>> done by the user themselves, should we elevate this to a
> property of
> >>>>>>>>>> (Stateful?)DoFns that the runner can provide? I think a
> compelling
> >>>>>>>>>> argument can be made here that we should.
> >>>>>>>>>>
> >>>>>>>>>> +1
> >>>>>>>>>>
> >>>>>>>>>> Jan
> >>>>>>>>>>
> >>>>>>>>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
> >>>>>>>>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz>
> wrote:
> >>>>>>>>>>>>     > I don't see batch vs. streaming as part of the model.
> One
> >>>>>>>>>>>> can have
> >>>>>>>>>>>> microbatch, or even a runner that alternates between different
> >>>>>>>>>>>> modes.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Although I understand motivation of this statement, this
> project
> >>>>>>>>>>>> name is
> >>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What
> does the
> >>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the
> model?
> >>>>>>>>>>> What I mean is that streaming vs. batch is no longer part of
> the
> >>>>>>>>>>> model
> >>>>>>>>>>> (or ideally API), but pushed down to be a concern of the runner
> >>>>>>>>>>> (executor) of the pipeline.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz
> >
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>> Hi Kenn,
> >>>>>>>>>>>>
> >>>>>>>>>>>> OK, so if we introduce annotation, we can have stateful ParDo
> >>>>>>>>>>>> with sorting, that would perfectly resolve my issues. I still
> >>>>>>>>>>>> have some doubts, though. Let me explain. The current
> behavior of
> >>>>>>>>>>>> stateful ParDo has the following properties:
> >>>>>>>>>>>>
> >>>>>>>>>>>>     a) might fail in batch, although runs fine in streaming
> (that
> >>>>>>>>>>>> is due to the buffering, and unbounded lateness in batch,
> which
> >>>>>>>>>>>> was discussed back and forth in this thread)
> >>>>>>>>>>>>
> >>>>>>>>>>>>     b) might be non deterministic (this is because the
> elements
> >>>>>>>>>>>> arrive at somewhat random order, and even if you do the
> operation
> >>>>>>>>>>>> "assign unique ID to elements" this might produce different
> >>>>>>>>>>>> results when run multiple times)
> >>>>>>>>>>> PCollections are *explicitly* unordered. Any operations that
> >>>>>>>>>>> assume or
> >>>>>>>>>>> depend on a specific ordering for correctness (or determinism)
> must
> >>>>>>>>>>> provide that ordering themselves (i.e. tolerate "arbitrary
> shuffling
> >>>>>>>>>>> of inputs"). As you point out, that may be very expensive if
> you have
> >>>>>>>>>>> very hot keys with very large (unbounded) timestamp skew.
> >>>>>>>>>>>
> >>>>>>>>>>> StatefulDoFns are low-level operations that should be used
> with care;
> >>>>>>>>>>> the simpler windowing model gives determinism in the face of
> >>>>>>>>>>> unordered
> >>>>>>>>>>> data (though late data and non-end-of-window triggering
> introduces
> >>>>>>>>>>> some of the non-determanism back in).
> >>>>>>>>>>>
> >>>>>>>>>>>> What worries me most is the property b), because it seems to
> me
> >>>>>>>>>>>> to have serious consequences - not only that if you run twice
> >>>>>>>>>>>> batch pipeline you would get different results, but even on
> >>>>>>>>>>>> streaming, when pipeline fails and gets restarted from
> >>>>>>>>>>>> checkpoint, produced output might differ from the previous run
> >>>>>>>>>>>> and data from the first run might have already been persisted
> >>>>>>>>>>>> into sink. That would create somewhat messy outputs.
> >>>>>>>>>>> Beam has an exactly-once model. If the data was consumed, state
> >>>>>>>>>>> mutated, and outputs written downstream (these three are
> committed
> >>>>>>>>>>> together atomically) it will not be replayed. That does not, of
> >>>>>>>>>>> course, solve the non-determanism due to ordering (including
> the fact
> >>>>>>>>>>> that two operations reading the same PCollection may view
> different
> >>>>>>>>>>> ordering).
> >>>>>>>>>>>
> >>>>>>>>>>>> These two properties makes me think that the current
> >>>>>>>>>>>> implementation is more of a _special case_ than the general
> one.
> >>>>>>>>>>>> The general one would be that your state doesn't have the
> >>>>>>>>>>>> properties to be able to tolerate buffering problems and/or
> >>>>>>>>>>>> non-determinism. Which is the case where you need sorting in
> both
> >>>>>>>>>>>> streaming and batch to be part of the model.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Let me point out one more analogy - that is merging vs.
> >>>>>>>>>>>> non-merging windows. The general case (merging windows)
> implies
> >>>>>>>>>>>> sorting by timestamp in both batch case (explicit) and
> streaming
> >>>>>>>>>>>> (buffering). The special case (non-merging windows) doesn't
> rely
> >>>>>>>>>>>> on any timestamp ordering, so the sorting and buffering can be
> >>>>>>>>>>>> dropped. The underlying root cause of this is the same for
> both
> >>>>>>>>>>>> stateful ParDo and windowing (essentially, assigning window
> >>>>>>>>>>>> labels is a stateful operation when windowing function is
> merging).
> >>>>>>>>>>>>
> >>>>>>>>>>>> The reason for the current behavior of stateful ParDo seems
> to be
> >>>>>>>>>>>> performance, but is it right to abandon correctness in favor
> of
> >>>>>>>>>>>> performance? Wouldn't it be more consistent to have the
> default
> >>>>>>>>>>>> behavior prefer correctness and when you have the specific
> >>>>>>>>>>>> conditions of state function having special properties, then
> you
> >>>>>>>>>>>> can annotate your DoFn (with something like
> >>>>>>>>>>>> @TimeOrderingAgnostic), which would yield a better
> performance in
> >>>>>>>>>>>> that case?
> >>>>>>>>>>> There are two separable questions here.
> >>>>>>>>>>>
> >>>>>>>>>>> 1) Is it correct for a (Stateful)DoFn to assume elements are
> received
> >>>>>>>>>>> in a specific order? In the current model, it is not. Being
> able to
> >>>>>>>>>>> read, handle, and produced out-of-order data, including late
> data, is
> >>>>>>>>>>> a pretty fundamental property of distributed systems.
> >>>>>>>>>>>
> >>>>>>>>>>> 2) Given that some operations are easier (or possibly only
> possible)
> >>>>>>>>>>> to write when operating on ordered data, and that different
> runners
> >>>>>>>>>>> may have (significantly) cheaper ways to provide this ordering
> than
> >>>>>>>>>>> can be done by the user themselves, should we elevate this to a
> >>>>>>>>>>> property of (Stateful?)DoFns that the runner can provide? I
> think a
> >>>>>>>>>>> compelling argument can be made here that we should.
> >>>>>>>>>>>
> >>>>>>>>>>> - Robert
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the nice small example of a calculation that
> depends
> >>>>>>>>>>>> on order. You are right that many state machines have this
> >>>>>>>>>>>> property. I agree w/ you and Luke that it is convenient for
> batch
> >>>>>>>>>>>> processing to sort by event timestamp before running a
> stateful
> >>>>>>>>>>>> ParDo. In streaming you could also implement "sort by event
> >>>>>>>>>>>> timestamp" by buffering until you know all earlier data will
> be
> >>>>>>>>>>>> dropped - a slack buffer up to allowed lateness.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I do not think that it is OK to sort in batch and not in
> >>>>>>>>>>>> streaming. Many state machines diverge very rapidly when
> things
> >>>>>>>>>>>> are out of order. So each runner if they see the
> >>>>>>>>>>>> "@OrderByTimestamp" annotation (or whatever) needs to deliver
> >>>>>>>>>>>> sorted data (by some mix of buffering and dropping), or to
> reject
> >>>>>>>>>>>> the pipeline as unsupported.
> >>>>>>>>>>>>
> >>>>>>>>>>>> And also want to say that this is not the default case - many
> >>>>>>>>>>>> uses of state & timers in ParDo yield different results at the
> >>>>>>>>>>>> element level, but the results are equivalent at in the big
> >>>>>>>>>>>> picture. Such as the example of "assign a unique sequence
> number
> >>>>>>>>>>>> to each element" or "group into batches" it doesn't matter
> >>>>>>>>>>>> exactly what the result is, only that it meets the spec. And
> >>>>>>>>>>>> other cases like user funnels are monotonic enough that you
> also
> >>>>>>>>>>>> don't actually need sorting.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Kenn
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz
> >
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>> Yes, the problem will arise probably mostly when you have not
> >>>>>>>>>>>>> well distributed keys (or too few keys). I'm really not sure
> if
> >>>>>>>>>>>>> a pure GBK with a trigger can solve this - it might help to
> have
> >>>>>>>>>>>>> data driven trigger. There would still be some doubts,
> though.
> >>>>>>>>>>>>> The main question is still here - people say, that sorting by
> >>>>>>>>>>>>> timestamp before stateful ParDo would be prohibitively slow,
> but
> >>>>>>>>>>>>> I don't really see why - the sorting is very probably already
> >>>>>>>>>>>>> there. And if not (hash grouping instead of sorted grouping),
> >>>>>>>>>>>>> then the sorting would affect only user defined
> StatefulParDos.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This would suggest that the best way out of this would be
> really
> >>>>>>>>>>>>> to add annotation, so that the author of the pipeline can
> decide.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If that would be acceptable I think I can try to prepare some
> >>>>>>>>>>>>> basic functionality, but I'm not sure, if I would be able to
> >>>>>>>>>>>>> cover all runners / sdks.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It is read all per key and window and not just read all (this
> >>>>>>>>>>>>> still won't scale with hot keys in the global window). The
> GBK
> >>>>>>>>>>>>> preceding the StatefulParDo will guarantee that you are
> >>>>>>>>>>>>> processing all the values for a specific key and window at
> any
> >>>>>>>>>>>>> given time. Is there a specific window/trigger that is
> missing
> >>>>>>>>>>>>> that you feel would remove the need for you to use
> StatefulParDo?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <
> je...@seznam.cz>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>> Hi Lukasz,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee
> >>>>>>>>>>>>>>> that your StatefulParDo implements the necessary
> "buffering &
> >>>>>>>>>>>>>>> sorting" into state.
> >>>>>>>>>>>>>> Yes, no problem with that. But this whole discussion
> started,
> >>>>>>>>>>>>>> because *this doesn't work on batch*. You simply cannot
> first
> >>>>>>>>>>>>>> read everything from distributed storage and then buffer it
> all
> >>>>>>>>>>>>>> into memory, just to read it again, but sorted. That will
> not
> >>>>>>>>>>>>>> work. And even if it would, it would be a terrible waste of
> >>>>>>>>>>>>>> resources.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Jan
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <
> je...@seznam.cz>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> This discussion brings many really interesting questions
> for
> >>>>>>>>>>>>>>> me. :-)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>     > I don't see batch vs. streaming as part of the
> model. One
> >>>>>>>>>>>>>>> can have
> >>>>>>>>>>>>>>> microbatch, or even a runner that alternates between
> different
> >>>>>>>>>>>>>>> modes.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Although I understand motivation of this statement, this
> >>>>>>>>>>>>>>> project name is
> >>>>>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What
> >>>>>>>>>>>>>>> does the
> >>>>>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the
> model?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Using microbatching, chaining of batch jobs, or pure
> streaming
> >>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>> exactly the "runtime conditions/characteristics" I refer
> to.
> >>>>>>>>>>>>>>> All these
> >>>>>>>>>>>>>>> define several runtime parameters, which in turn define how
> >>>>>>>>>>>>>>> well/badly
> >>>>>>>>>>>>>>> will the pipeline perform and how many resources might be
> >>>>>>>>>>>>>>> needed. From
> >>>>>>>>>>>>>>> my point of view, pure streaming should be the most
> resource
> >>>>>>>>>>>>>>> demanding
> >>>>>>>>>>>>>>> (if not, why bother with batch? why not run everything in
> >>>>>>>>>>>>>>> streaming
> >>>>>>>>>>>>>>> only? what will there remain to "unify"?).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>     > Fortunately, for batch, only the state for a single
> key
> >>>>>>>>>>>>>>> needs to be
> >>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys
> across
> >>>>>>>>>>>>>>> the range
> >>>>>>>>>>>>>>> of skew. Of course if you have few or hot keys, one can
> still
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>> issues (and this is not specific to StatefulDoFns).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Yes, but here is still the presumption that my stateful
> DoFn can
> >>>>>>>>>>>>>>> tolerate arbitrary shuffling of inputs. Let me explain the
> use
> >>>>>>>>>>>>>>> case in
> >>>>>>>>>>>>>>> more detail.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Suppose you have input stream consisting of 1s and 0s (and
> >>>>>>>>>>>>>>> some key for
> >>>>>>>>>>>>>>> each element, which is irrelevant for the demonstration).
> Your
> >>>>>>>>>>>>>>> task is
> >>>>>>>>>>>>>>> to calculate in running global window the actual number of
> >>>>>>>>>>>>>>> changes
> >>>>>>>>>>>>>>> between state 0 and state 1 and vice versa. When the state
> >>>>>>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>> change, you don't calculate anything. If input (for given
> key)
> >>>>>>>>>>>>>>> would be
> >>>>>>>>>>>>>>> (tN denotes timestamp N):
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t1: 1
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t2: 0
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t3: 0
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t4: 1
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t5: 1
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t6: 0
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> then the output should yield (supposing that default state
> is
> >>>>>>>>>>>>>>> zero):
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t1: (one: 1, zero: 0)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t2: (one: 1, zero: 1)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t3: (one: 1, zero: 1)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t4: (one: 2, zero: 1)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t5: (one: 2, zero: 1)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>      t6: (one: 2, zero: 2)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> How would you implement this in current Beam semantics?
> >>>>>>>>>>>>>> I think your saying here that I know that my input is
> ordered
> >>>>>>>>>>>>>> in a specific way and since I assume the order when writing
> my
> >>>>>>>>>>>>>> pipeline I can perform this optimization. But there is
> nothing
> >>>>>>>>>>>>>> preventing a runner from noticing that your processing in
> the
> >>>>>>>>>>>>>> global window with a specific type of trigger and
> re-ordering
> >>>>>>>>>>>>>> your inputs/processing to get better performance (since you
> >>>>>>>>>>>>>> can't use an AfterWatermark trigger for your pipeline in
> >>>>>>>>>>>>>> streaming for the GlobalWindow).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee
> that
> >>>>>>>>>>>>>> your StatefulParDo implements the necessary "buffering &
> >>>>>>>>>>>>>> sorting" into state. I can see why you would want an
> annotation
> >>>>>>>>>>>>>> that says I must have timestamp ordered elements, since it
> >>>>>>>>>>>>>> makes writing certain StatefulParDos much easier.
> StatefulParDo
> >>>>>>>>>>>>>> is a low-level function, it really is the "here you go and
> do
> >>>>>>>>>>>>>> whatever you need to but here be dragons" function while
> >>>>>>>>>>>>>> windowing and triggering is meant to keep many people from
> >>>>>>>>>>>>>> writing StatefulParDo in the first place.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>     > Pipelines that fail in the "worst case" batch
> scenario
> >>>>>>>>>>>>>>> are likely to
> >>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the
> watermark
> >>>>>>>>>>>>>>> falls
> >>>>>>>>>>>>>>> behind in streaming mode as well.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> But the worst case is defined by input of size (available
> >>>>>>>>>>>>>>> resources +
> >>>>>>>>>>>>>>> single byte) -> pipeline fail. Although it could have
> >>>>>>>>>>>>>>> finished, given
> >>>>>>>>>>>>>>> the right conditions.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>     > This might be reasonable, implemented by default by
> >>>>>>>>>>>>>>> buffering
> >>>>>>>>>>>>>>> everything and releasing elements as the watermark
> (+lateness)
> >>>>>>>>>>>>>>> advances,
> >>>>>>>>>>>>>>> but would likely lead to inefficient (though *maybe*
> easier to
> >>>>>>>>>>>>>>> reason
> >>>>>>>>>>>>>>> about) code.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Sure, the pipeline will be less efficient, because it would
> >>>>>>>>>>>>>>> have to
> >>>>>>>>>>>>>>> buffer and sort the inputs. But at least it will produce
> >>>>>>>>>>>>>>> correct results
> >>>>>>>>>>>>>>> in cases where updates to state are order-sensitive.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>     > Would it be roughly equivalent to GBK +
> FlatMap(lambda
> >>>>>>>>>>>>>>> (key, values):
> >>>>>>>>>>>>>>> [(key, value) for value in values])?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'd say roughly yes, but difference would be in the
> trigger.
> >>>>>>>>>>>>>>> The trigger
> >>>>>>>>>>>>>>> should ideally fire as soon as watermark (+lateness)
> crosses
> >>>>>>>>>>>>>>> element
> >>>>>>>>>>>>>>> with lowest timestamp in the buffer. Although this could be
> >>>>>>>>>>>>>>> somehow
> >>>>>>>>>>>>>>> emulated by fixed trigger each X millis.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>     > Or is the underlying desire just to be able to hint
> to
> >>>>>>>>>>>>>>> the runner
> >>>>>>>>>>>>>>> that the code may perform better (e.g. require less
> resources)
> >>>>>>>>>>>>>>> as skew
> >>>>>>>>>>>>>>> is reduced (and hence to order by timestamp iff it's
> cheap)?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> No, the sorting would have to be done in streaming case as
> >>>>>>>>>>>>>>> well. That is
> >>>>>>>>>>>>>>> an imperative of the unified model. I think it is possible
> to
> >>>>>>>>>>>>>>> sort by
> >>>>>>>>>>>>>>> timestamp only in batch case (and do it for *all* batch
> >>>>>>>>>>>>>>> stateful pardos
> >>>>>>>>>>>>>>> without annotation), or introduce annotation, but then make
> >>>>>>>>>>>>>>> the same
> >>>>>>>>>>>>>>> guarantees for streaming case as well.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jan
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
> >>>>>>>>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
> >>>>>>>>>>>>>>>> <je...@seznam.cz> wrote:
> >>>>>>>>>>>>>>>>> Hi Robert,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> yes, I think you rephrased my point - although no
> *explicit*
> >>>>>>>>>>>>>>>>> guarantees
> >>>>>>>>>>>>>>>>> of ordering are given in either mode, there is *implicit*
> >>>>>>>>>>>>>>>>> ordering in
> >>>>>>>>>>>>>>>>> streaming case that is due to nature of the processing -
> the
> >>>>>>>>>>>>>>>>> difference
> >>>>>>>>>>>>>>>>> between watermark and timestamp of elements flowing
> through
> >>>>>>>>>>>>>>>>> the pipeline
> >>>>>>>>>>>>>>>>> are generally low (too high difference leads to the
> >>>>>>>>>>>>>>>>> overbuffering
> >>>>>>>>>>>>>>>>> problem), but there is no such bound on batch.
> >>>>>>>>>>>>>>>> Fortunately, for batch, only the state for a single key
> needs
> >>>>>>>>>>>>>>>> to be
> >>>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys
> >>>>>>>>>>>>>>>> across the
> >>>>>>>>>>>>>>>> range of skew. Of course if you have few or hot keys, one
> can
> >>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>> have issues (and this is not specific to StatefulDoFns).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> As a result, I see a few possible solutions:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>       - the best and most natural seems to be extension
> of
> >>>>>>>>>>>>>>>>> the model, so
> >>>>>>>>>>>>>>>>> that it defines batch as not only "streaming pipeline
> >>>>>>>>>>>>>>>>> executed in batch
> >>>>>>>>>>>>>>>>> fashion", but "pipeline with at least as good runtime
> >>>>>>>>>>>>>>>>> characteristics as
> >>>>>>>>>>>>>>>>> in streaming case, executed in batch fashion", I really
> >>>>>>>>>>>>>>>>> don't think that
> >>>>>>>>>>>>>>>>> there are any conflicts with the current model, or that
> this
> >>>>>>>>>>>>>>>>> could
> >>>>>>>>>>>>>>>>> affect performance, because the required sorting (as
> pointed by
> >>>>>>>>>>>>>>>>> Aljoscha) is very probably already done during
> translation
> >>>>>>>>>>>>>>>>> of stateful
> >>>>>>>>>>>>>>>>> pardos. Also note that this definition only affects user
> >>>>>>>>>>>>>>>>> defined
> >>>>>>>>>>>>>>>>> stateful pardos
> >>>>>>>>>>>>>>>> I don't see batch vs. streaming as part of the model. One
> can
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>> microbatch, or even a runner that alternates between
> >>>>>>>>>>>>>>>> different modes.
> >>>>>>>>>>>>>>>> The model describes what the valid outputs are given a
> >>>>>>>>>>>>>>>> (sometimes
> >>>>>>>>>>>>>>>> partial) set of inputs. It becomes really hard to define
> >>>>>>>>>>>>>>>> things like
> >>>>>>>>>>>>>>>> "as good runtime characteristics." Once you allow any
> >>>>>>>>>>>>>>>> out-of-orderedness, it is not very feasible to try and
> define
> >>>>>>>>>>>>>>>> (and
> >>>>>>>>>>>>>>>> more cheaply implement) a "upper bound" of acceptable
> >>>>>>>>>>>>>>>> out-of-orderedness.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Pipelines that fail in the "worst case" batch scenario are
> >>>>>>>>>>>>>>>> likely to
> >>>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the
> watermark
> >>>>>>>>>>>>>>>> falls
> >>>>>>>>>>>>>>>> behind in streaming mode as well.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>       - another option would be to introduce annotation
> for
> >>>>>>>>>>>>>>>>> DoFns (e.g.
> >>>>>>>>>>>>>>>>> @RequiresStableTimeCharacteristics), which would result
> in
> >>>>>>>>>>>>>>>>> the sorting
> >>>>>>>>>>>>>>>>> in batch case - but - this extension would have to ensure
> >>>>>>>>>>>>>>>>> the sorting in
> >>>>>>>>>>>>>>>>> streaming mode also - it would require definition of
> allowed
> >>>>>>>>>>>>>>>>> lateness,
> >>>>>>>>>>>>>>>>> and triggger (essentially similar to window)
> >>>>>>>>>>>>>>>> This might be reasonable, implemented by default by
> buffering
> >>>>>>>>>>>>>>>> everything and releasing elements as the watermark
> (+lateness)
> >>>>>>>>>>>>>>>> advances, but would likely lead to inefficient (though
> >>>>>>>>>>>>>>>> *maybe* easier
> >>>>>>>>>>>>>>>> to reason about) code. Not sure about the semantics of
> >>>>>>>>>>>>>>>> triggering
> >>>>>>>>>>>>>>>> here, especially data-driven triggers. Would it be roughly
> >>>>>>>>>>>>>>>> equivalent
> >>>>>>>>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for
> >>>>>>>>>>>>>>>> value in
> >>>>>>>>>>>>>>>> values])?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Or is the underlying desire just to be able to hint to the
> >>>>>>>>>>>>>>>> runner that
> >>>>>>>>>>>>>>>> the code may perform better (e.g. require less resources)
> as
> >>>>>>>>>>>>>>>> skew is
> >>>>>>>>>>>>>>>> reduced (and hence to order by timestamp iff it's cheap)?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>       - last option would be to introduce these "higher
> order
> >>>>>>>>>>>>>>>>> guarantees" in
> >>>>>>>>>>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to be
> the
> >>>>>>>>>>>>>>>>> worst
> >>>>>>>>>>>>>>>>> option to me
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I see the first two options quite equally good, although
> the
> >>>>>>>>>>>>>>>>> letter one
> >>>>>>>>>>>>>>>>> is probably more time consuming to implement. But it
> would
> >>>>>>>>>>>>>>>>> bring
> >>>>>>>>>>>>>>>>> additional feature to streaming case as well.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for any thoughts.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>       Jan
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
> >>>>>>>>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
> >>>>>>>>>>>>>>>>>> <je...@seznam.cz> wrote:
> >>>>>>>>>>>>>>>>>>> Hi Reuven,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch
> >>>>>>>>>>>>>>>>>>>> runners.
> >>>>>>>>>>>>>>>>>>> Stateful ParDo works in batch as far, as the logic
> inside
> >>>>>>>>>>>>>>>>>>> the state works for absolutely unbounded
> out-of-orderness
> >>>>>>>>>>>>>>>>>>> of elements. That basically (practically) can work only
> >>>>>>>>>>>>>>>>>>> for cases, where the order of input elements doesn't
> >>>>>>>>>>>>>>>>>>> matter. But, "state" can refer to "state machine", and
> any
> >>>>>>>>>>>>>>>>>>> time you have a state machine involved, then the
> ordering
> >>>>>>>>>>>>>>>>>>> of elements would matter.
> >>>>>>>>>>>>>>>>>> No guarantees on order are provided in *either*
> streaming
> >>>>>>>>>>>>>>>>>> or batch
> >>>>>>>>>>>>>>>>>> mode by the model. However, it is the case that in
> order to
> >>>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>> forward progress most streaming runners attempt to limit
> >>>>>>>>>>>>>>>>>> the amount of
> >>>>>>>>>>>>>>>>>> out-of-orderedness of elements (in terms of event time
> vs.
> >>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>>> time) to make forward progress, which in turn could help
> >>>>>>>>>>>>>>>>>> cap the
> >>>>>>>>>>>>>>>>>> amount of state that must be held concurrently, whereas
> a
> >>>>>>>>>>>>>>>>>> batch runner
> >>>>>>>>>>>>>>>>>> may not allow any state to be safely discarded until
> the whole
> >>>>>>>>>>>>>>>>>> timeline from infinite past to infinite future has been
> >>>>>>>>>>>>>>>>>> observed.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Also, as pointed out, state is not preserved "batch to
> >>>>>>>>>>>>>>>>>> batch" in batch mode.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
> >>>>>>>>>>>>>>>>>> <m...@apache.org> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>       batch semantics and streaming semantics differs
> only
> >>>>>>>>>>>>>>>>>>>> in that I can have GlobalWindow with default trigger
> on
> >>>>>>>>>>>>>>>>>>>> batch and cannot on stream
> >>>>>>>>>>>>>>>>>>> You can have a GlobalWindow in streaming with a default
> >>>>>>>>>>>>>>>>>>> trigger. You
> >>>>>>>>>>>>>>>>>>> could define additional triggers that do early firings.
> >>>>>>>>>>>>>>>>>>> And you could
> >>>>>>>>>>>>>>>>>>> even trigger the global window by advancing the
> watermark
> >>>>>>>>>>>>>>>>>>> to +inf.
> >>>>>>>>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global window
> with
> >>>>>>>>>>>>>>>>>> default
> >>>>>>>>>>>>>>>>>> trigger on unbounded PCollections in the SDK because
> this
> >>>>>>>>>>>>>>>>>> is more
> >>>>>>>>>>>>>>>>>> likely to be user error than an actual desire to have no
> >>>>>>>>>>>>>>>>>> output until
> >>>>>>>>>>>>>>>>>> drain. But it's semantically valid in the model.
>
>

Reply via email to