Sequence metadata does have the disadvantage that users can no longer use the types coming from the source. You must create a new type that contains a sequence number (unless Beam provides this). It also gets awkward with Flatten - the sequence number is no longer enough, you must also encode which side of the flatten each element came from.
On Tue, May 28, 2019 at 3:18 AM Jan Lukavský <je...@seznam.cz> wrote: > As I understood it, Kenn was supporting the idea that sequence metadata > is preferable over FIFO. I was trying to point out, that it even should > provide the same functionally as FIFO, plus one important more - > reproducibility and ability to being persisted and reused the same way > in batch and streaming. > > There is no doubt, that sequence metadata can be stored in every > storage. But, regarding some implicit ordering that sources might have - > yes, of course, data written into HDFS or Cloud Storage has ordering, > but only partial - inside some bulk (e.g. file) and the ordering is not > defined correctly on boundaries of these bulks (between files). That is > why I'd say, that ordering of sources is relevant only for > (partitioned!) streaming sources and generally always reduces to > sequence metadata (e.g. offsets). > > Jan > > On 5/28/19 11:43 AM, Robert Bradshaw wrote: > > Huge +1 to all Kenn said. > > > > Jan, batch sources can have orderings too, just like Kafka. I think > > it's reasonable (for both batch and streaming) that if a source has an > > ordering that is an important part of the data, it should preserve > > this ordering into the data itself (e.g. as sequence numbers, offsets, > > etc.) > > > > On Fri, May 24, 2019 at 10:35 PM Kenneth Knowles <k...@apache.org> > wrote: > >> I strongly prefer explicit sequence metadata over FIFO requirements, > because: > >> > >> - FIFO is complex to specify: for example Dataflow has "per stage > key-to-key" FIFO today, but it is not guaranteed to remain so (plus "stage" > is not a portable concept, nor even guaranteed to remain a Dataflow concept) > >> - complex specifications are by definition poor usability (if > necessary, then it is what it is) > >> - overly restricts the runner, reduces parallelism, for example any > non-stateful ParDo has per-element parallelism, not per "key" > >> - another perspective on that: FIFO makes everyone pay rather than > just the transform that requires exactly sequencing > >> - previous implementation details like reshuffles become part of the > model > >> - I'm not even convinced the use cases involved are addressed by some > careful FIFO restrictions; many sinks re-key and they would all have to > become aware of how keying of a sequence of "stages" affects the end-to-end > FIFO > >> > >> A noop becoming a non-noop is essentially the mathematical definition > of moving from higher-level to lower-level abstraction. > >> > >> So this strikes at the core question of what level of abstraction Beam > aims to represent. Lower-level means there are fewer possible > implementations and it is more tied to the underlying architecture, and > anything not near-exact match pays a huge penalty. Higher-level means there > are more implementations possible with different tradeoffs, though they may > all pay a minor penalty. > >> > >> I could be convinced to change my mind, but it needs some extensive > design, examples, etc. I think it is probably about the most consequential > design decision in the whole Beam model, around the same level as the > decision to use ParDo and GBK as the primitives IMO. > >> > >> Kenn > >> > >> On Thu, May 23, 2019 at 10:17 AM Reuven Lax <re...@google.com> wrote: > >>> Not really. I'm suggesting that some variant of FIFO ordering is > necessary, which requires either runners natively support FIFO ordering or > transforms adding some extra sequence number to each record to sort by. > >>> > >>> I still think your proposal is very useful by the way. I'm merely > pointing out that to solve the state-machine problem we probably need > something more. > >>> > >>> Reuven > >>> > >>> On Thu, May 23, 2019 at 9:50 AM Jan Lukavský <je...@seznam.cz> wrote: > >>>> Hi, > >>>> yes. It seems that ordering by user supplied UDF makes sense and I > will update the design proposal accordingly. > >>>> Would that solve the issues you mention? > >>>> Jan > >>>> ---------- Původní e-mail ---------- > >>>> Od: Reuven Lax <re...@google.com> > >>>> Komu: dev <dev@beam.apache.org> > >>>> Datum: 23. 5. 2019 18:44:38 > >>>> Předmět: Re: Definition of Unified model > >>>> > >>>> I'm simply saying that timestamp ordering is insufficient for state > machines. I wasn't proposing Kafka as a solution - that was simply an > example of how people solve this problem in other scenarios. > >>>> > >>>> BTW another example of ordering: Imagine today that you have a > triggered Sum aggregation writing out to a key-value sink. In theory we > provide no ordering, so the sink might write the triggered sums in the > wrong order, ending up with an incorrect value in the sink. In this case > you probably want values ordered by trigger pane index. > >>>> > >>>> Reuven > >>>> > >>>> On Thu, May 23, 2019 at 8:59 AM Jan Lukavský <je...@seznam.cz> wrote: > >>>> > >>>> Hi Reuven, > >>>> I share the view point of Robert. I think the isuue you refer to is > not in reality related to timestamps, but to the fact, that ordering of > events in time is observer dependent (either caused by relativity, or time > skew, essentially this has the same consequences). And the resolution in > fact isn't Kafka, but generally an authoritative observer, that tells you > "I saw the events in this order". And you either have one (and have the > outcome of his observation persisted in the data - e.g. as offset in Kafka > partition), then you should be able to use it (maybe that suggests afterall > that sorting by some user supplied UDF might make sense), or do not have > it, and then any interpretation of the data seems to be equally valid. > Although determinism is fine, of course. > >>>> Jan > >>>> ---------- Původní e-mail ---------- > >>>> Od: Reuven Lax <re...@google.com> > >>>> Komu: dev <dev@beam.apache.org> > >>>> Datum: 23. 5. 2019 17:39:12 > >>>> Předmět: Re: Definition of Unified model > >>>> > >>>> So an example would be elements of type "startUserSession" and > "endUserSession" (website sessions, not Beam sessions). Logically you may > need to process them in the correct order if you have any sort of > state-machine logic. However timestamp ordering is never guaranteed to > match the logical ordering. Not only might you have several elements with > the same timestamp, but in reality time skew across backend servers can > cause the events to have timestamps in reverse order of the actual > causality order. > >>>> > >>>> People do solve this problem today though. Publish the events to > Kafka, making sure that events for the same user end up in the same Kafka > partition. This ensures that the events appear in the Kafka partitions in > causality order, even if the timestamp order doesn't match. The your Kafka > subscriber simply process the elements in each partition in order. > >>>> > >>>> I think the ability to impose FIFO causality ordering is what's > needed for any state-machine work. Timestamp ordering has advantages > (though often I think the advantage is in state), but does not solve this > problem. > >>>> > >>>> Reuven > >>>> > >>>> On Thu, May 23, 2019 at 7:48 AM Robert Bradshaw <rober...@google.com> > wrote: > >>>> > >>>> Good point. > >>>> > >>>> The "implementation-specific" way I would do this is > >>>> window-by-instant, followed by a DoFn that gets all the elements with > >>>> the same timestamp and sorts/acts accordingly, but this counts on the > >>>> runner producing windows in timestamp order (likely?) and also the > >>>> subsequent DoFn getting them in this order (also likely, due to > >>>> fusion). > >>>> > >>>> One could make the argument that, though it does not provide > >>>> deterministic behavior, getting elements of the same timestamp in > >>>> different orders should produce equally valid interpretations of the > >>>> data. (After all, due to relatively, timestamps are not technically > >>>> well ordered across space.) I can see how data-dependent tiebreakers > >>>> could be useful, or promises of preservation of order between > >>>> operations. > >>>> > >>>> - Robert > >>>> > >>>> On Thu, May 23, 2019 at 4:18 PM Reuven Lax <re...@google.com> wrote: > >>>>> So Jan's example of state machines is quite a valid use case for > ordering. However in my experience, timestamp ordering is insufficient for > state machines. Elements that cause state transitions might come in with > the exact same timestamp, yet still have a necessary ordering. Especially > given Beam's decision to have milliseconds timestamps this is possible, but > even at microsecond or nanosecond precision this can happen at scale. To > handle state machines you usually need some sort of FIFO ordering along > with an ordered sources, such as Kafka, not timestamp ordering. > >>>>> > >>>>> Reuven > >>>>> > >>>>> On Thu, May 23, 2019 at 12:32 AM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>>> Hi all, > >>>>>> > >>>>>> thanks everyone for this discussion. I think I have gathered enough > >>>>>> feedback to be able to put down a proposition for changes, which I > will > >>>>>> do and send to this list for further discussion. There are still > doubts > >>>>>> remaining the non-determinism and it's relation to outputs > stability vs. > >>>>>> latency. But I will try to clarify all this in the design document. > >>>>>> > >>>>>> Thanks, > >>>>>> > >>>>>> Jan > >>>>>> > >>>>>> On 5/22/19 3:49 PM, Maximilian Michels wrote: > >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my > >>>>>>>> current understanding. > >>>>>>> In essence your description of how exactly-once works in Flink is > >>>>>>> correct. The general assumption in Flink is that pipelines must be > >>>>>>> deterministic and thus produce idempotent writes in the case of > >>>>>>> failures. However, that doesn't mean Beam sinks can't guarantee a > bit > >>>>>>> more with what Flink has to offer. > >>>>>>> > >>>>>>> Luke already mentioned the design discussions for > @RequiresStableInput > >>>>>>> which ensures idempotent writes for non-deterministic pipelines. > This > >>>>>>> is not part of the model but an optional Beam feature. > >>>>>>> > >>>>>>> We recently implemented support for @RequiresStableInput in the > Flink > >>>>>>> Runner. Reuven mentioned the Flink checkpoint confirmation, which > >>>>>>> allows us to buffer (and checkpoint) processed data and only emit > it > >>>>>>> once a Flink checkpoint has completed. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Max > >>>>>>> > >>>>>>> On 21.05.19 16:49, Jan Lukavský wrote: > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> > Actually, I think it is a larger (open) question whether > exactly > >>>>>>>> once is guaranteed by the model or whether runners are allowed to > >>>>>>>> relax that. I would think, however, that sources correctly > >>>>>>>> implemented should be idempotent when run atop an exactly once > >>>>>>>> infrastructure such as Flink of Dataflow. > >>>>>>>> > >>>>>>>> I would assume, that the model basically inherits guarantees of > >>>>>>>> underlying infrastructure. Because Flink does not work as you > >>>>>>>> described (atomic commit of inputs, state and outputs), but > rather a > >>>>>>>> checkpoint mark is flowing through the DAG much like watermark > and on > >>>>>>>> failures operators are restored and data reprocessed, it (IMHO) > >>>>>>>> implies, that you have exactly once everywhere in the DAG *but* > >>>>>>>> sinks. That is because sinks cannot be restored to previous state, > >>>>>>>> instead sinks are supposed to be idempotent in order for the > exactly > >>>>>>>> once to really work (or at least be able to commit outputs on > >>>>>>>> checkpoint in sink). That implies that if you don't have sink > that is > >>>>>>>> able to commit outputs atomically on checkpoint, the pipeline > >>>>>>>> execution should be deterministic upon retries, otherwise shadow > >>>>>>>> writes from failed paths of the pipeline might appear. > >>>>>>>> > >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my > >>>>>>>> current understanding. > >>>>>>>> > >>>>>>>> > Sounds like we should make this clearer. > >>>>>>>> > >>>>>>>> I meant that you are right that we must not in any thoughts we are > >>>>>>>> having forget that streams are by definition out-of-order. That is > >>>>>>>> property that we cannot change. But - that doesn't limit us from > >>>>>>>> creating operator that presents the data to UDF as if the stream > was > >>>>>>>> ideally sorted. It can do that by introducing latency, of course. > >>>>>>>> > >>>>>>>> On 5/21/19 4:01 PM, Robert Bradshaw wrote: > >>>>>>>>> Reza: One could provide something like this as a utility class, > but > >>>>>>>>> one downside is that it is not scale invariant. It requires a > tuning > >>>>>>>>> parameter that, if to small, won't mitigate the problem, but if > to > >>>>>>>>> big, greatly increases latency. (Possibly one could define a > dynamic > >>>>>>>>> session-like window to solve this though...) It also might be > harder > >>>>>>>>> for runners that *can* cheaply present stuff in timestamp order > to > >>>>>>>>> optimize. (That and, in practice, our annotation-style process > methods > >>>>>>>>> don't lend themselves to easy composition.) I think it could > work in > >>>>>>>>> specific cases though. > >>>>>>>>> > >>>>>>>>> More inline below. > >>>>>>>>> > >>>>>>>>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>>>>>>> Hi Robert, > >>>>>>>>>> > >>>>>>>>>> > Beam has an exactly-once model. If the data was consumed, > state > >>>>>>>>>> mutated, and outputs written downstream (these three are > committed > >>>>>>>>>> together atomically) it will not be replayed. That does not, of > >>>>>>>>>> course, > >>>>>>>>>> solve the non-determanism due to ordering (including the fact > that two > >>>>>>>>>> operations reading the same PCollection may view different > ordering). > >>>>>>>>>> > >>>>>>>>>> I think what you describe is a property of a runner, not of the > model, > >>>>>>>>>> right? I think if I run my pipeline on Flink I will not get this > >>>>>>>>>> atomicity, because although Flink uses also exactly-once model > if > >>>>>>>>>> might > >>>>>>>>>> write outputs multiple times. > >>>>>>>>> Actually, I think it is a larger (open) question whether exactly > once > >>>>>>>>> is guaranteed by the model or whether runners are allowed to > relax > >>>>>>>>> that. I would think, however, that sources correctly implemented > >>>>>>>>> should be idempotent when run atop an exactly once > infrastructure such > >>>>>>>>> as Flink of Dataflow. > >>>>>>>>> > >>>>>>>>>> > 1) Is it correct for a (Stateful)DoFn to assume elements > are > >>>>>>>>>> received > >>>>>>>>>> in a specific order? In the current model, it is not. Being > able to > >>>>>>>>>> read, handle, and produced out-of-order data, including late > data, > >>>>>>>>>> is a > >>>>>>>>>> pretty fundamental property of distributed systems. > >>>>>>>>>> > >>>>>>>>>> Yes, absolutely. The argument here is not that Stateful ParDo > should > >>>>>>>>>> presume to receive elements in any order, but to _present_ it as > >>>>>>>>>> such to > >>>>>>>>>> the user @ProcessElement function. > >>>>>>>>> Sounds like we should make this clearer. > >>>>>>>>> > >>>>>>>>>> > 2) Given that some operations are easier (or possibly only > >>>>>>>>>> possible) > >>>>>>>>>> to write when operating on ordered data, and that different > runners > >>>>>>>>>> may > >>>>>>>>>> have (significantly) cheaper ways to provide this ordering than > can be > >>>>>>>>>> done by the user themselves, should we elevate this to a > property of > >>>>>>>>>> (Stateful?)DoFns that the runner can provide? I think a > compelling > >>>>>>>>>> argument can be made here that we should. > >>>>>>>>>> > >>>>>>>>>> +1 > >>>>>>>>>> > >>>>>>>>>> Jan > >>>>>>>>>> > >>>>>>>>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote: > >>>>>>>>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <je...@seznam.cz> > wrote: > >>>>>>>>>>>> > I don't see batch vs. streaming as part of the model. > One > >>>>>>>>>>>> can have > >>>>>>>>>>>> microbatch, or even a runner that alternates between different > >>>>>>>>>>>> modes. > >>>>>>>>>>>> > >>>>>>>>>>>> Although I understand motivation of this statement, this > project > >>>>>>>>>>>> name is > >>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What > does the > >>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the > model? > >>>>>>>>>>> What I mean is that streaming vs. batch is no longer part of > the > >>>>>>>>>>> model > >>>>>>>>>>> (or ideally API), but pushed down to be a concern of the runner > >>>>>>>>>>> (executor) of the pipeline. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <je...@seznam.cz > > > >>>>>>>>>>> wrote: > >>>>>>>>>>>> Hi Kenn, > >>>>>>>>>>>> > >>>>>>>>>>>> OK, so if we introduce annotation, we can have stateful ParDo > >>>>>>>>>>>> with sorting, that would perfectly resolve my issues. I still > >>>>>>>>>>>> have some doubts, though. Let me explain. The current > behavior of > >>>>>>>>>>>> stateful ParDo has the following properties: > >>>>>>>>>>>> > >>>>>>>>>>>> a) might fail in batch, although runs fine in streaming > (that > >>>>>>>>>>>> is due to the buffering, and unbounded lateness in batch, > which > >>>>>>>>>>>> was discussed back and forth in this thread) > >>>>>>>>>>>> > >>>>>>>>>>>> b) might be non deterministic (this is because the > elements > >>>>>>>>>>>> arrive at somewhat random order, and even if you do the > operation > >>>>>>>>>>>> "assign unique ID to elements" this might produce different > >>>>>>>>>>>> results when run multiple times) > >>>>>>>>>>> PCollections are *explicitly* unordered. Any operations that > >>>>>>>>>>> assume or > >>>>>>>>>>> depend on a specific ordering for correctness (or determinism) > must > >>>>>>>>>>> provide that ordering themselves (i.e. tolerate "arbitrary > shuffling > >>>>>>>>>>> of inputs"). As you point out, that may be very expensive if > you have > >>>>>>>>>>> very hot keys with very large (unbounded) timestamp skew. > >>>>>>>>>>> > >>>>>>>>>>> StatefulDoFns are low-level operations that should be used > with care; > >>>>>>>>>>> the simpler windowing model gives determinism in the face of > >>>>>>>>>>> unordered > >>>>>>>>>>> data (though late data and non-end-of-window triggering > introduces > >>>>>>>>>>> some of the non-determanism back in). > >>>>>>>>>>> > >>>>>>>>>>>> What worries me most is the property b), because it seems to > me > >>>>>>>>>>>> to have serious consequences - not only that if you run twice > >>>>>>>>>>>> batch pipeline you would get different results, but even on > >>>>>>>>>>>> streaming, when pipeline fails and gets restarted from > >>>>>>>>>>>> checkpoint, produced output might differ from the previous run > >>>>>>>>>>>> and data from the first run might have already been persisted > >>>>>>>>>>>> into sink. That would create somewhat messy outputs. > >>>>>>>>>>> Beam has an exactly-once model. If the data was consumed, state > >>>>>>>>>>> mutated, and outputs written downstream (these three are > committed > >>>>>>>>>>> together atomically) it will not be replayed. That does not, of > >>>>>>>>>>> course, solve the non-determanism due to ordering (including > the fact > >>>>>>>>>>> that two operations reading the same PCollection may view > different > >>>>>>>>>>> ordering). > >>>>>>>>>>> > >>>>>>>>>>>> These two properties makes me think that the current > >>>>>>>>>>>> implementation is more of a _special case_ than the general > one. > >>>>>>>>>>>> The general one would be that your state doesn't have the > >>>>>>>>>>>> properties to be able to tolerate buffering problems and/or > >>>>>>>>>>>> non-determinism. Which is the case where you need sorting in > both > >>>>>>>>>>>> streaming and batch to be part of the model. > >>>>>>>>>>>> > >>>>>>>>>>>> Let me point out one more analogy - that is merging vs. > >>>>>>>>>>>> non-merging windows. The general case (merging windows) > implies > >>>>>>>>>>>> sorting by timestamp in both batch case (explicit) and > streaming > >>>>>>>>>>>> (buffering). The special case (non-merging windows) doesn't > rely > >>>>>>>>>>>> on any timestamp ordering, so the sorting and buffering can be > >>>>>>>>>>>> dropped. The underlying root cause of this is the same for > both > >>>>>>>>>>>> stateful ParDo and windowing (essentially, assigning window > >>>>>>>>>>>> labels is a stateful operation when windowing function is > merging). > >>>>>>>>>>>> > >>>>>>>>>>>> The reason for the current behavior of stateful ParDo seems > to be > >>>>>>>>>>>> performance, but is it right to abandon correctness in favor > of > >>>>>>>>>>>> performance? Wouldn't it be more consistent to have the > default > >>>>>>>>>>>> behavior prefer correctness and when you have the specific > >>>>>>>>>>>> conditions of state function having special properties, then > you > >>>>>>>>>>>> can annotate your DoFn (with something like > >>>>>>>>>>>> @TimeOrderingAgnostic), which would yield a better > performance in > >>>>>>>>>>>> that case? > >>>>>>>>>>> There are two separable questions here. > >>>>>>>>>>> > >>>>>>>>>>> 1) Is it correct for a (Stateful)DoFn to assume elements are > received > >>>>>>>>>>> in a specific order? In the current model, it is not. Being > able to > >>>>>>>>>>> read, handle, and produced out-of-order data, including late > data, is > >>>>>>>>>>> a pretty fundamental property of distributed systems. > >>>>>>>>>>> > >>>>>>>>>>> 2) Given that some operations are easier (or possibly only > possible) > >>>>>>>>>>> to write when operating on ordered data, and that different > runners > >>>>>>>>>>> may have (significantly) cheaper ways to provide this ordering > than > >>>>>>>>>>> can be done by the user themselves, should we elevate this to a > >>>>>>>>>>> property of (Stateful?)DoFns that the runner can provide? I > think a > >>>>>>>>>>> compelling argument can be made here that we should. > >>>>>>>>>>> > >>>>>>>>>>> - Robert > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the nice small example of a calculation that > depends > >>>>>>>>>>>> on order. You are right that many state machines have this > >>>>>>>>>>>> property. I agree w/ you and Luke that it is convenient for > batch > >>>>>>>>>>>> processing to sort by event timestamp before running a > stateful > >>>>>>>>>>>> ParDo. In streaming you could also implement "sort by event > >>>>>>>>>>>> timestamp" by buffering until you know all earlier data will > be > >>>>>>>>>>>> dropped - a slack buffer up to allowed lateness. > >>>>>>>>>>>> > >>>>>>>>>>>> I do not think that it is OK to sort in batch and not in > >>>>>>>>>>>> streaming. Many state machines diverge very rapidly when > things > >>>>>>>>>>>> are out of order. So each runner if they see the > >>>>>>>>>>>> "@OrderByTimestamp" annotation (or whatever) needs to deliver > >>>>>>>>>>>> sorted data (by some mix of buffering and dropping), or to > reject > >>>>>>>>>>>> the pipeline as unsupported. > >>>>>>>>>>>> > >>>>>>>>>>>> And also want to say that this is not the default case - many > >>>>>>>>>>>> uses of state & timers in ParDo yield different results at the > >>>>>>>>>>>> element level, but the results are equivalent at in the big > >>>>>>>>>>>> picture. Such as the example of "assign a unique sequence > number > >>>>>>>>>>>> to each element" or "group into batches" it doesn't matter > >>>>>>>>>>>> exactly what the result is, only that it meets the spec. And > >>>>>>>>>>>> other cases like user funnels are monotonic enough that you > also > >>>>>>>>>>>> don't actually need sorting. > >>>>>>>>>>>> > >>>>>>>>>>>> Kenn > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz > > > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> Yes, the problem will arise probably mostly when you have not > >>>>>>>>>>>>> well distributed keys (or too few keys). I'm really not sure > if > >>>>>>>>>>>>> a pure GBK with a trigger can solve this - it might help to > have > >>>>>>>>>>>>> data driven trigger. There would still be some doubts, > though. > >>>>>>>>>>>>> The main question is still here - people say, that sorting by > >>>>>>>>>>>>> timestamp before stateful ParDo would be prohibitively slow, > but > >>>>>>>>>>>>> I don't really see why - the sorting is very probably already > >>>>>>>>>>>>> there. And if not (hash grouping instead of sorted grouping), > >>>>>>>>>>>>> then the sorting would affect only user defined > StatefulParDos. > >>>>>>>>>>>>> > >>>>>>>>>>>>> This would suggest that the best way out of this would be > really > >>>>>>>>>>>>> to add annotation, so that the author of the pipeline can > decide. > >>>>>>>>>>>>> > >>>>>>>>>>>>> If that would be acceptable I think I can try to prepare some > >>>>>>>>>>>>> basic functionality, but I'm not sure, if I would be able to > >>>>>>>>>>>>> cover all runners / sdks. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> It is read all per key and window and not just read all (this > >>>>>>>>>>>>> still won't scale with hot keys in the global window). The > GBK > >>>>>>>>>>>>> preceding the StatefulParDo will guarantee that you are > >>>>>>>>>>>>> processing all the values for a specific key and window at > any > >>>>>>>>>>>>> given time. Is there a specific window/trigger that is > missing > >>>>>>>>>>>>> that you feel would remove the need for you to use > StatefulParDo? > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský < > je...@seznam.cz> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> Hi Lukasz, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee > >>>>>>>>>>>>>>> that your StatefulParDo implements the necessary > "buffering & > >>>>>>>>>>>>>>> sorting" into state. > >>>>>>>>>>>>>> Yes, no problem with that. But this whole discussion > started, > >>>>>>>>>>>>>> because *this doesn't work on batch*. You simply cannot > first > >>>>>>>>>>>>>> read everything from distributed storage and then buffer it > all > >>>>>>>>>>>>>> into memory, just to read it again, but sorted. That will > not > >>>>>>>>>>>>>> work. And even if it would, it would be a terrible waste of > >>>>>>>>>>>>>> resources. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Jan > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský < > je...@seznam.cz> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> This discussion brings many really interesting questions > for > >>>>>>>>>>>>>>> me. :-) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > I don't see batch vs. streaming as part of the > model. One > >>>>>>>>>>>>>>> can have > >>>>>>>>>>>>>>> microbatch, or even a runner that alternates between > different > >>>>>>>>>>>>>>> modes. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Although I understand motivation of this statement, this > >>>>>>>>>>>>>>> project name is > >>>>>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What > >>>>>>>>>>>>>>> does the > >>>>>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the > model? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Using microbatching, chaining of batch jobs, or pure > streaming > >>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>> exactly the "runtime conditions/characteristics" I refer > to. > >>>>>>>>>>>>>>> All these > >>>>>>>>>>>>>>> define several runtime parameters, which in turn define how > >>>>>>>>>>>>>>> well/badly > >>>>>>>>>>>>>>> will the pipeline perform and how many resources might be > >>>>>>>>>>>>>>> needed. From > >>>>>>>>>>>>>>> my point of view, pure streaming should be the most > resource > >>>>>>>>>>>>>>> demanding > >>>>>>>>>>>>>>> (if not, why bother with batch? why not run everything in > >>>>>>>>>>>>>>> streaming > >>>>>>>>>>>>>>> only? what will there remain to "unify"?). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Fortunately, for batch, only the state for a single > key > >>>>>>>>>>>>>>> needs to be > >>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys > across > >>>>>>>>>>>>>>> the range > >>>>>>>>>>>>>>> of skew. Of course if you have few or hot keys, one can > still > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>> issues (and this is not specific to StatefulDoFns). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Yes, but here is still the presumption that my stateful > DoFn can > >>>>>>>>>>>>>>> tolerate arbitrary shuffling of inputs. Let me explain the > use > >>>>>>>>>>>>>>> case in > >>>>>>>>>>>>>>> more detail. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Suppose you have input stream consisting of 1s and 0s (and > >>>>>>>>>>>>>>> some key for > >>>>>>>>>>>>>>> each element, which is irrelevant for the demonstration). > Your > >>>>>>>>>>>>>>> task is > >>>>>>>>>>>>>>> to calculate in running global window the actual number of > >>>>>>>>>>>>>>> changes > >>>>>>>>>>>>>>> between state 0 and state 1 and vice versa. When the state > >>>>>>>>>>>>>>> doesn't > >>>>>>>>>>>>>>> change, you don't calculate anything. If input (for given > key) > >>>>>>>>>>>>>>> would be > >>>>>>>>>>>>>>> (tN denotes timestamp N): > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t1: 1 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t2: 0 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t3: 0 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t4: 1 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t5: 1 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t6: 0 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> then the output should yield (supposing that default state > is > >>>>>>>>>>>>>>> zero): > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t1: (one: 1, zero: 0) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t2: (one: 1, zero: 1) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t3: (one: 1, zero: 1) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t4: (one: 2, zero: 1) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t5: (one: 2, zero: 1) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> t6: (one: 2, zero: 2) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> How would you implement this in current Beam semantics? > >>>>>>>>>>>>>> I think your saying here that I know that my input is > ordered > >>>>>>>>>>>>>> in a specific way and since I assume the order when writing > my > >>>>>>>>>>>>>> pipeline I can perform this optimization. But there is > nothing > >>>>>>>>>>>>>> preventing a runner from noticing that your processing in > the > >>>>>>>>>>>>>> global window with a specific type of trigger and > re-ordering > >>>>>>>>>>>>>> your inputs/processing to get better performance (since you > >>>>>>>>>>>>>> can't use an AfterWatermark trigger for your pipeline in > >>>>>>>>>>>>>> streaming for the GlobalWindow). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee > that > >>>>>>>>>>>>>> your StatefulParDo implements the necessary "buffering & > >>>>>>>>>>>>>> sorting" into state. I can see why you would want an > annotation > >>>>>>>>>>>>>> that says I must have timestamp ordered elements, since it > >>>>>>>>>>>>>> makes writing certain StatefulParDos much easier. > StatefulParDo > >>>>>>>>>>>>>> is a low-level function, it really is the "here you go and > do > >>>>>>>>>>>>>> whatever you need to but here be dragons" function while > >>>>>>>>>>>>>> windowing and triggering is meant to keep many people from > >>>>>>>>>>>>>> writing StatefulParDo in the first place. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Pipelines that fail in the "worst case" batch > scenario > >>>>>>>>>>>>>>> are likely to > >>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the > watermark > >>>>>>>>>>>>>>> falls > >>>>>>>>>>>>>>> behind in streaming mode as well. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> But the worst case is defined by input of size (available > >>>>>>>>>>>>>>> resources + > >>>>>>>>>>>>>>> single byte) -> pipeline fail. Although it could have > >>>>>>>>>>>>>>> finished, given > >>>>>>>>>>>>>>> the right conditions. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > This might be reasonable, implemented by default by > >>>>>>>>>>>>>>> buffering > >>>>>>>>>>>>>>> everything and releasing elements as the watermark > (+lateness) > >>>>>>>>>>>>>>> advances, > >>>>>>>>>>>>>>> but would likely lead to inefficient (though *maybe* > easier to > >>>>>>>>>>>>>>> reason > >>>>>>>>>>>>>>> about) code. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Sure, the pipeline will be less efficient, because it would > >>>>>>>>>>>>>>> have to > >>>>>>>>>>>>>>> buffer and sort the inputs. But at least it will produce > >>>>>>>>>>>>>>> correct results > >>>>>>>>>>>>>>> in cases where updates to state are order-sensitive. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Would it be roughly equivalent to GBK + > FlatMap(lambda > >>>>>>>>>>>>>>> (key, values): > >>>>>>>>>>>>>>> [(key, value) for value in values])? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'd say roughly yes, but difference would be in the > trigger. > >>>>>>>>>>>>>>> The trigger > >>>>>>>>>>>>>>> should ideally fire as soon as watermark (+lateness) > crosses > >>>>>>>>>>>>>>> element > >>>>>>>>>>>>>>> with lowest timestamp in the buffer. Although this could be > >>>>>>>>>>>>>>> somehow > >>>>>>>>>>>>>>> emulated by fixed trigger each X millis. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > Or is the underlying desire just to be able to hint > to > >>>>>>>>>>>>>>> the runner > >>>>>>>>>>>>>>> that the code may perform better (e.g. require less > resources) > >>>>>>>>>>>>>>> as skew > >>>>>>>>>>>>>>> is reduced (and hence to order by timestamp iff it's > cheap)? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> No, the sorting would have to be done in streaming case as > >>>>>>>>>>>>>>> well. That is > >>>>>>>>>>>>>>> an imperative of the unified model. I think it is possible > to > >>>>>>>>>>>>>>> sort by > >>>>>>>>>>>>>>> timestamp only in batch case (and do it for *all* batch > >>>>>>>>>>>>>>> stateful pardos > >>>>>>>>>>>>>>> without annotation), or introduce annotation, but then make > >>>>>>>>>>>>>>> the same > >>>>>>>>>>>>>>> guarantees for streaming case as well. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Jan > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote: > >>>>>>>>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský > >>>>>>>>>>>>>>>> <je...@seznam.cz> wrote: > >>>>>>>>>>>>>>>>> Hi Robert, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> yes, I think you rephrased my point - although no > *explicit* > >>>>>>>>>>>>>>>>> guarantees > >>>>>>>>>>>>>>>>> of ordering are given in either mode, there is *implicit* > >>>>>>>>>>>>>>>>> ordering in > >>>>>>>>>>>>>>>>> streaming case that is due to nature of the processing - > the > >>>>>>>>>>>>>>>>> difference > >>>>>>>>>>>>>>>>> between watermark and timestamp of elements flowing > through > >>>>>>>>>>>>>>>>> the pipeline > >>>>>>>>>>>>>>>>> are generally low (too high difference leads to the > >>>>>>>>>>>>>>>>> overbuffering > >>>>>>>>>>>>>>>>> problem), but there is no such bound on batch. > >>>>>>>>>>>>>>>> Fortunately, for batch, only the state for a single key > needs > >>>>>>>>>>>>>>>> to be > >>>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys > >>>>>>>>>>>>>>>> across the > >>>>>>>>>>>>>>>> range of skew. Of course if you have few or hot keys, one > can > >>>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>> have issues (and this is not specific to StatefulDoFns). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> As a result, I see a few possible solutions: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> - the best and most natural seems to be extension > of > >>>>>>>>>>>>>>>>> the model, so > >>>>>>>>>>>>>>>>> that it defines batch as not only "streaming pipeline > >>>>>>>>>>>>>>>>> executed in batch > >>>>>>>>>>>>>>>>> fashion", but "pipeline with at least as good runtime > >>>>>>>>>>>>>>>>> characteristics as > >>>>>>>>>>>>>>>>> in streaming case, executed in batch fashion", I really > >>>>>>>>>>>>>>>>> don't think that > >>>>>>>>>>>>>>>>> there are any conflicts with the current model, or that > this > >>>>>>>>>>>>>>>>> could > >>>>>>>>>>>>>>>>> affect performance, because the required sorting (as > pointed by > >>>>>>>>>>>>>>>>> Aljoscha) is very probably already done during > translation > >>>>>>>>>>>>>>>>> of stateful > >>>>>>>>>>>>>>>>> pardos. Also note that this definition only affects user > >>>>>>>>>>>>>>>>> defined > >>>>>>>>>>>>>>>>> stateful pardos > >>>>>>>>>>>>>>>> I don't see batch vs. streaming as part of the model. One > can > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>> microbatch, or even a runner that alternates between > >>>>>>>>>>>>>>>> different modes. > >>>>>>>>>>>>>>>> The model describes what the valid outputs are given a > >>>>>>>>>>>>>>>> (sometimes > >>>>>>>>>>>>>>>> partial) set of inputs. It becomes really hard to define > >>>>>>>>>>>>>>>> things like > >>>>>>>>>>>>>>>> "as good runtime characteristics." Once you allow any > >>>>>>>>>>>>>>>> out-of-orderedness, it is not very feasible to try and > define > >>>>>>>>>>>>>>>> (and > >>>>>>>>>>>>>>>> more cheaply implement) a "upper bound" of acceptable > >>>>>>>>>>>>>>>> out-of-orderedness. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Pipelines that fail in the "worst case" batch scenario are > >>>>>>>>>>>>>>>> likely to > >>>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the > watermark > >>>>>>>>>>>>>>>> falls > >>>>>>>>>>>>>>>> behind in streaming mode as well. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> - another option would be to introduce annotation > for > >>>>>>>>>>>>>>>>> DoFns (e.g. > >>>>>>>>>>>>>>>>> @RequiresStableTimeCharacteristics), which would result > in > >>>>>>>>>>>>>>>>> the sorting > >>>>>>>>>>>>>>>>> in batch case - but - this extension would have to ensure > >>>>>>>>>>>>>>>>> the sorting in > >>>>>>>>>>>>>>>>> streaming mode also - it would require definition of > allowed > >>>>>>>>>>>>>>>>> lateness, > >>>>>>>>>>>>>>>>> and triggger (essentially similar to window) > >>>>>>>>>>>>>>>> This might be reasonable, implemented by default by > buffering > >>>>>>>>>>>>>>>> everything and releasing elements as the watermark > (+lateness) > >>>>>>>>>>>>>>>> advances, but would likely lead to inefficient (though > >>>>>>>>>>>>>>>> *maybe* easier > >>>>>>>>>>>>>>>> to reason about) code. Not sure about the semantics of > >>>>>>>>>>>>>>>> triggering > >>>>>>>>>>>>>>>> here, especially data-driven triggers. Would it be roughly > >>>>>>>>>>>>>>>> equivalent > >>>>>>>>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for > >>>>>>>>>>>>>>>> value in > >>>>>>>>>>>>>>>> values])? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Or is the underlying desire just to be able to hint to the > >>>>>>>>>>>>>>>> runner that > >>>>>>>>>>>>>>>> the code may perform better (e.g. require less resources) > as > >>>>>>>>>>>>>>>> skew is > >>>>>>>>>>>>>>>> reduced (and hence to order by timestamp iff it's cheap)? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> - last option would be to introduce these "higher > order > >>>>>>>>>>>>>>>>> guarantees" in > >>>>>>>>>>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to be > the > >>>>>>>>>>>>>>>>> worst > >>>>>>>>>>>>>>>>> option to me > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I see the first two options quite equally good, although > the > >>>>>>>>>>>>>>>>> letter one > >>>>>>>>>>>>>>>>> is probably more time consuming to implement. But it > would > >>>>>>>>>>>>>>>>> bring > >>>>>>>>>>>>>>>>> additional feature to streaming case as well. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks for any thoughts. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Jan > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote: > >>>>>>>>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský > >>>>>>>>>>>>>>>>>> <je...@seznam.cz> wrote: > >>>>>>>>>>>>>>>>>>> Hi Reuven, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch > >>>>>>>>>>>>>>>>>>>> runners. > >>>>>>>>>>>>>>>>>>> Stateful ParDo works in batch as far, as the logic > inside > >>>>>>>>>>>>>>>>>>> the state works for absolutely unbounded > out-of-orderness > >>>>>>>>>>>>>>>>>>> of elements. That basically (practically) can work only > >>>>>>>>>>>>>>>>>>> for cases, where the order of input elements doesn't > >>>>>>>>>>>>>>>>>>> matter. But, "state" can refer to "state machine", and > any > >>>>>>>>>>>>>>>>>>> time you have a state machine involved, then the > ordering > >>>>>>>>>>>>>>>>>>> of elements would matter. > >>>>>>>>>>>>>>>>>> No guarantees on order are provided in *either* > streaming > >>>>>>>>>>>>>>>>>> or batch > >>>>>>>>>>>>>>>>>> mode by the model. However, it is the case that in > order to > >>>>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>> forward progress most streaming runners attempt to limit > >>>>>>>>>>>>>>>>>> the amount of > >>>>>>>>>>>>>>>>>> out-of-orderedness of elements (in terms of event time > vs. > >>>>>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>>> time) to make forward progress, which in turn could help > >>>>>>>>>>>>>>>>>> cap the > >>>>>>>>>>>>>>>>>> amount of state that must be held concurrently, whereas > a > >>>>>>>>>>>>>>>>>> batch runner > >>>>>>>>>>>>>>>>>> may not allow any state to be safely discarded until > the whole > >>>>>>>>>>>>>>>>>> timeline from infinite past to infinite future has been > >>>>>>>>>>>>>>>>>> observed. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Also, as pointed out, state is not preserved "batch to > >>>>>>>>>>>>>>>>>> batch" in batch mode. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels > >>>>>>>>>>>>>>>>>> <m...@apache.org> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> batch semantics and streaming semantics differs > only > >>>>>>>>>>>>>>>>>>>> in that I can have GlobalWindow with default trigger > on > >>>>>>>>>>>>>>>>>>>> batch and cannot on stream > >>>>>>>>>>>>>>>>>>> You can have a GlobalWindow in streaming with a default > >>>>>>>>>>>>>>>>>>> trigger. You > >>>>>>>>>>>>>>>>>>> could define additional triggers that do early firings. > >>>>>>>>>>>>>>>>>>> And you could > >>>>>>>>>>>>>>>>>>> even trigger the global window by advancing the > watermark > >>>>>>>>>>>>>>>>>>> to +inf. > >>>>>>>>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global window > with > >>>>>>>>>>>>>>>>>> default > >>>>>>>>>>>>>>>>>> trigger on unbounded PCollections in the SDK because > this > >>>>>>>>>>>>>>>>>> is more > >>>>>>>>>>>>>>>>>> likely to be user error than an actual desire to have no > >>>>>>>>>>>>>>>>>> output until > >>>>>>>>>>>>>>>>>> drain. But it's semantically valid in the model. > >