A slightly larger concern: it also will force users to create stateful DoFns everywhere to generate these sequence numbers. If I have a ParDo that is not a simple 1:1 transform (i.e. not MapElements), then the ParDo will need to generate its own sequence numbers for ordering, and the only safe way to do so is to use a stateful DoFn. This turns what used to be a simple in-memory DoFn into one that has to access state. Also I believe many runners will not fuse stateful DoFns. While none of this poses a problem for the model, it could make ordering extremely expensive to achieve.
Reuven On Tue, May 28, 2019 at 6:09 AM Jan Lukavský <[email protected]> wrote: > Hi Reuven, > > > It also gets awkward with Flatten - the sequence number is no longer > enough, you must also encode which side of the flatten each element came > from. > > That is a generic need. Even if you read data from Kafka, the offsets are > comparable only inside single partition. So, for Kafka to work as a FIFO > for ordering, elements with same key have to be pushed to the same > partition (otherwise Kafka cannot act as FIFO, because different partitions > can be handled by different brokers, which means different observers and > they therefore might not agree on the order of events). So if we want to > emulate FIFO per key, then the sequence IDs have also be per key. > On 5/28/19 2:33 PM, Reuven Lax wrote: > > Sequence metadata does have the disadvantage that users can no longer use > the types coming from the source. You must create a new type that contains > a sequence number (unless Beam provides this). It also gets awkward with > Flatten - the sequence number is no longer enough, you must also encode > which side of the flatten each element came from. > > On Tue, May 28, 2019 at 3:18 AM Jan Lukavský <[email protected]> wrote: > >> As I understood it, Kenn was supporting the idea that sequence metadata >> is preferable over FIFO. I was trying to point out, that it even should >> provide the same functionally as FIFO, plus one important more - >> reproducibility and ability to being persisted and reused the same way >> in batch and streaming. >> >> There is no doubt, that sequence metadata can be stored in every >> storage. But, regarding some implicit ordering that sources might have - >> yes, of course, data written into HDFS or Cloud Storage has ordering, >> but only partial - inside some bulk (e.g. file) and the ordering is not >> defined correctly on boundaries of these bulks (between files). That is >> why I'd say, that ordering of sources is relevant only for >> (partitioned!) streaming sources and generally always reduces to >> sequence metadata (e.g. offsets). >> >> Jan >> >> On 5/28/19 11:43 AM, Robert Bradshaw wrote: >> > Huge +1 to all Kenn said. >> > >> > Jan, batch sources can have orderings too, just like Kafka. I think >> > it's reasonable (for both batch and streaming) that if a source has an >> > ordering that is an important part of the data, it should preserve >> > this ordering into the data itself (e.g. as sequence numbers, offsets, >> > etc.) >> > >> > On Fri, May 24, 2019 at 10:35 PM Kenneth Knowles <[email protected]> >> wrote: >> >> I strongly prefer explicit sequence metadata over FIFO requirements, >> because: >> >> >> >> - FIFO is complex to specify: for example Dataflow has "per stage >> key-to-key" FIFO today, but it is not guaranteed to remain so (plus "stage" >> is not a portable concept, nor even guaranteed to remain a Dataflow concept) >> >> - complex specifications are by definition poor usability (if >> necessary, then it is what it is) >> >> - overly restricts the runner, reduces parallelism, for example any >> non-stateful ParDo has per-element parallelism, not per "key" >> >> - another perspective on that: FIFO makes everyone pay rather than >> just the transform that requires exactly sequencing >> >> - previous implementation details like reshuffles become part of the >> model >> >> - I'm not even convinced the use cases involved are addressed by >> some careful FIFO restrictions; many sinks re-key and they would all have >> to become aware of how keying of a sequence of "stages" affects the >> end-to-end FIFO >> >> >> >> A noop becoming a non-noop is essentially the mathematical definition >> of moving from higher-level to lower-level abstraction. >> >> >> >> So this strikes at the core question of what level of abstraction Beam >> aims to represent. Lower-level means there are fewer possible >> implementations and it is more tied to the underlying architecture, and >> anything not near-exact match pays a huge penalty. Higher-level means there >> are more implementations possible with different tradeoffs, though they may >> all pay a minor penalty. >> >> >> >> I could be convinced to change my mind, but it needs some extensive >> design, examples, etc. I think it is probably about the most consequential >> design decision in the whole Beam model, around the same level as the >> decision to use ParDo and GBK as the primitives IMO. >> >> >> >> Kenn >> >> >> >> On Thu, May 23, 2019 at 10:17 AM Reuven Lax <[email protected]> wrote: >> >>> Not really. I'm suggesting that some variant of FIFO ordering is >> necessary, which requires either runners natively support FIFO ordering or >> transforms adding some extra sequence number to each record to sort by. >> >>> >> >>> I still think your proposal is very useful by the way. I'm merely >> pointing out that to solve the state-machine problem we probably need >> something more. >> >>> >> >>> Reuven >> >>> >> >>> On Thu, May 23, 2019 at 9:50 AM Jan Lukavský <[email protected]> wrote: >> >>>> Hi, >> >>>> yes. It seems that ordering by user supplied UDF makes sense and I >> will update the design proposal accordingly. >> >>>> Would that solve the issues you mention? >> >>>> Jan >> >>>> ---------- Původní e-mail ---------- >> >>>> Od: Reuven Lax <[email protected]> >> >>>> Komu: dev <[email protected]> >> >>>> Datum: 23. 5. 2019 18:44:38 >> >>>> Předmět: Re: Definition of Unified model >> >>>> >> >>>> I'm simply saying that timestamp ordering is insufficient for state >> machines. I wasn't proposing Kafka as a solution - that was simply an >> example of how people solve this problem in other scenarios. >> >>>> >> >>>> BTW another example of ordering: Imagine today that you have a >> triggered Sum aggregation writing out to a key-value sink. In theory we >> provide no ordering, so the sink might write the triggered sums in the >> wrong order, ending up with an incorrect value in the sink. In this case >> you probably want values ordered by trigger pane index. >> >>>> >> >>>> Reuven >> >>>> >> >>>> On Thu, May 23, 2019 at 8:59 AM Jan Lukavský <[email protected]> >> wrote: >> >>>> >> >>>> Hi Reuven, >> >>>> I share the view point of Robert. I think the isuue you refer to is >> not in reality related to timestamps, but to the fact, that ordering of >> events in time is observer dependent (either caused by relativity, or time >> skew, essentially this has the same consequences). And the resolution in >> fact isn't Kafka, but generally an authoritative observer, that tells you >> "I saw the events in this order". And you either have one (and have the >> outcome of his observation persisted in the data - e.g. as offset in Kafka >> partition), then you should be able to use it (maybe that suggests afterall >> that sorting by some user supplied UDF might make sense), or do not have >> it, and then any interpretation of the data seems to be equally valid. >> Although determinism is fine, of course. >> >>>> Jan >> >>>> ---------- Původní e-mail ---------- >> >>>> Od: Reuven Lax <[email protected]> >> >>>> Komu: dev <[email protected]> >> >>>> Datum: 23. 5. 2019 17:39:12 >> >>>> Předmět: Re: Definition of Unified model >> >>>> >> >>>> So an example would be elements of type "startUserSession" and >> "endUserSession" (website sessions, not Beam sessions). Logically you may >> need to process them in the correct order if you have any sort of >> state-machine logic. However timestamp ordering is never guaranteed to >> match the logical ordering. Not only might you have several elements with >> the same timestamp, but in reality time skew across backend servers can >> cause the events to have timestamps in reverse order of the actual >> causality order. >> >>>> >> >>>> People do solve this problem today though. Publish the events to >> Kafka, making sure that events for the same user end up in the same Kafka >> partition. This ensures that the events appear in the Kafka partitions in >> causality order, even if the timestamp order doesn't match. The your Kafka >> subscriber simply process the elements in each partition in order. >> >>>> >> >>>> I think the ability to impose FIFO causality ordering is what's >> needed for any state-machine work. Timestamp ordering has advantages >> (though often I think the advantage is in state), but does not solve this >> problem. >> >>>> >> >>>> Reuven >> >>>> >> >>>> On Thu, May 23, 2019 at 7:48 AM Robert Bradshaw <[email protected]> >> wrote: >> >>>> >> >>>> Good point. >> >>>> >> >>>> The "implementation-specific" way I would do this is >> >>>> window-by-instant, followed by a DoFn that gets all the elements with >> >>>> the same timestamp and sorts/acts accordingly, but this counts on the >> >>>> runner producing windows in timestamp order (likely?) and also the >> >>>> subsequent DoFn getting them in this order (also likely, due to >> >>>> fusion). >> >>>> >> >>>> One could make the argument that, though it does not provide >> >>>> deterministic behavior, getting elements of the same timestamp in >> >>>> different orders should produce equally valid interpretations of the >> >>>> data. (After all, due to relatively, timestamps are not technically >> >>>> well ordered across space.) I can see how data-dependent tiebreakers >> >>>> could be useful, or promises of preservation of order between >> >>>> operations. >> >>>> >> >>>> - Robert >> >>>> >> >>>> On Thu, May 23, 2019 at 4:18 PM Reuven Lax <[email protected]> wrote: >> >>>>> So Jan's example of state machines is quite a valid use case for >> ordering. However in my experience, timestamp ordering is insufficient for >> state machines. Elements that cause state transitions might come in with >> the exact same timestamp, yet still have a necessary ordering. Especially >> given Beam's decision to have milliseconds timestamps this is possible, but >> even at microsecond or nanosecond precision this can happen at scale. To >> handle state machines you usually need some sort of FIFO ordering along >> with an ordered sources, such as Kafka, not timestamp ordering. >> >>>>> >> >>>>> Reuven >> >>>>> >> >>>>> On Thu, May 23, 2019 at 12:32 AM Jan Lukavský <[email protected]> >> wrote: >> >>>>>> Hi all, >> >>>>>> >> >>>>>> thanks everyone for this discussion. I think I have gathered enough >> >>>>>> feedback to be able to put down a proposition for changes, which I >> will >> >>>>>> do and send to this list for further discussion. There are still >> doubts >> >>>>>> remaining the non-determinism and it's relation to outputs >> stability vs. >> >>>>>> latency. But I will try to clarify all this in the design document. >> >>>>>> >> >>>>>> Thanks, >> >>>>>> >> >>>>>> Jan >> >>>>>> >> >>>>>> On 5/22/19 3:49 PM, Maximilian Michels wrote: >> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my >> >>>>>>>> current understanding. >> >>>>>>> In essence your description of how exactly-once works in Flink is >> >>>>>>> correct. The general assumption in Flink is that pipelines must be >> >>>>>>> deterministic and thus produce idempotent writes in the case of >> >>>>>>> failures. However, that doesn't mean Beam sinks can't guarantee a >> bit >> >>>>>>> more with what Flink has to offer. >> >>>>>>> >> >>>>>>> Luke already mentioned the design discussions for >> @RequiresStableInput >> >>>>>>> which ensures idempotent writes for non-deterministic pipelines. >> This >> >>>>>>> is not part of the model but an optional Beam feature. >> >>>>>>> >> >>>>>>> We recently implemented support for @RequiresStableInput in the >> Flink >> >>>>>>> Runner. Reuven mentioned the Flink checkpoint confirmation, which >> >>>>>>> allows us to buffer (and checkpoint) processed data and only emit >> it >> >>>>>>> once a Flink checkpoint has completed. >> >>>>>>> >> >>>>>>> Cheers, >> >>>>>>> Max >> >>>>>>> >> >>>>>>> On 21.05.19 16:49, Jan Lukavský wrote: >> >>>>>>>> Hi, >> >>>>>>>> >> >>>>>>>> > Actually, I think it is a larger (open) question whether >> exactly >> >>>>>>>> once is guaranteed by the model or whether runners are allowed to >> >>>>>>>> relax that. I would think, however, that sources correctly >> >>>>>>>> implemented should be idempotent when run atop an exactly once >> >>>>>>>> infrastructure such as Flink of Dataflow. >> >>>>>>>> >> >>>>>>>> I would assume, that the model basically inherits guarantees of >> >>>>>>>> underlying infrastructure. Because Flink does not work as you >> >>>>>>>> described (atomic commit of inputs, state and outputs), but >> rather a >> >>>>>>>> checkpoint mark is flowing through the DAG much like watermark >> and on >> >>>>>>>> failures operators are restored and data reprocessed, it (IMHO) >> >>>>>>>> implies, that you have exactly once everywhere in the DAG *but* >> >>>>>>>> sinks. That is because sinks cannot be restored to previous >> state, >> >>>>>>>> instead sinks are supposed to be idempotent in order for the >> exactly >> >>>>>>>> once to really work (or at least be able to commit outputs on >> >>>>>>>> checkpoint in sink). That implies that if you don't have sink >> that is >> >>>>>>>> able to commit outputs atomically on checkpoint, the pipeline >> >>>>>>>> execution should be deterministic upon retries, otherwise shadow >> >>>>>>>> writes from failed paths of the pipeline might appear. >> >>>>>>>> >> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my >> >>>>>>>> current understanding. >> >>>>>>>> >> >>>>>>>> > Sounds like we should make this clearer. >> >>>>>>>> >> >>>>>>>> I meant that you are right that we must not in any thoughts we >> are >> >>>>>>>> having forget that streams are by definition out-of-order. That >> is >> >>>>>>>> property that we cannot change. But - that doesn't limit us from >> >>>>>>>> creating operator that presents the data to UDF as if the stream >> was >> >>>>>>>> ideally sorted. It can do that by introducing latency, of course. >> >>>>>>>> >> >>>>>>>> On 5/21/19 4:01 PM, Robert Bradshaw wrote: >> >>>>>>>>> Reza: One could provide something like this as a utility class, >> but >> >>>>>>>>> one downside is that it is not scale invariant. It requires a >> tuning >> >>>>>>>>> parameter that, if to small, won't mitigate the problem, but if >> to >> >>>>>>>>> big, greatly increases latency. (Possibly one could define a >> dynamic >> >>>>>>>>> session-like window to solve this though...) It also might be >> harder >> >>>>>>>>> for runners that *can* cheaply present stuff in timestamp order >> to >> >>>>>>>>> optimize. (That and, in practice, our annotation-style process >> methods >> >>>>>>>>> don't lend themselves to easy composition.) I think it could >> work in >> >>>>>>>>> specific cases though. >> >>>>>>>>> >> >>>>>>>>> More inline below. >> >>>>>>>>> >> >>>>>>>>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <[email protected]> >> wrote: >> >>>>>>>>>> Hi Robert, >> >>>>>>>>>> >> >>>>>>>>>> > Beam has an exactly-once model. If the data was consumed, >> state >> >>>>>>>>>> mutated, and outputs written downstream (these three are >> committed >> >>>>>>>>>> together atomically) it will not be replayed. That does not, of >> >>>>>>>>>> course, >> >>>>>>>>>> solve the non-determanism due to ordering (including the fact >> that two >> >>>>>>>>>> operations reading the same PCollection may view different >> ordering). >> >>>>>>>>>> >> >>>>>>>>>> I think what you describe is a property of a runner, not of >> the model, >> >>>>>>>>>> right? I think if I run my pipeline on Flink I will not get >> this >> >>>>>>>>>> atomicity, because although Flink uses also exactly-once model >> if >> >>>>>>>>>> might >> >>>>>>>>>> write outputs multiple times. >> >>>>>>>>> Actually, I think it is a larger (open) question whether >> exactly once >> >>>>>>>>> is guaranteed by the model or whether runners are allowed to >> relax >> >>>>>>>>> that. I would think, however, that sources correctly implemented >> >>>>>>>>> should be idempotent when run atop an exactly once >> infrastructure such >> >>>>>>>>> as Flink of Dataflow. >> >>>>>>>>> >> >>>>>>>>>> > 1) Is it correct for a (Stateful)DoFn to assume elements >> are >> >>>>>>>>>> received >> >>>>>>>>>> in a specific order? In the current model, it is not. Being >> able to >> >>>>>>>>>> read, handle, and produced out-of-order data, including late >> data, >> >>>>>>>>>> is a >> >>>>>>>>>> pretty fundamental property of distributed systems. >> >>>>>>>>>> >> >>>>>>>>>> Yes, absolutely. The argument here is not that Stateful ParDo >> should >> >>>>>>>>>> presume to receive elements in any order, but to _present_ it >> as >> >>>>>>>>>> such to >> >>>>>>>>>> the user @ProcessElement function. >> >>>>>>>>> Sounds like we should make this clearer. >> >>>>>>>>> >> >>>>>>>>>> > 2) Given that some operations are easier (or possibly only >> >>>>>>>>>> possible) >> >>>>>>>>>> to write when operating on ordered data, and that different >> runners >> >>>>>>>>>> may >> >>>>>>>>>> have (significantly) cheaper ways to provide this ordering >> than can be >> >>>>>>>>>> done by the user themselves, should we elevate this to a >> property of >> >>>>>>>>>> (Stateful?)DoFns that the runner can provide? I think a >> compelling >> >>>>>>>>>> argument can be made here that we should. >> >>>>>>>>>> >> >>>>>>>>>> +1 >> >>>>>>>>>> >> >>>>>>>>>> Jan >> >>>>>>>>>> >> >>>>>>>>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote: >> >>>>>>>>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <[email protected]> >> wrote: >> >>>>>>>>>>>> > I don't see batch vs. streaming as part of the model. >> One >> >>>>>>>>>>>> can have >> >>>>>>>>>>>> microbatch, or even a runner that alternates between >> different >> >>>>>>>>>>>> modes. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Although I understand motivation of this statement, this >> project >> >>>>>>>>>>>> name is >> >>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What >> does the >> >>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the >> model? >> >>>>>>>>>>> What I mean is that streaming vs. batch is no longer part of >> the >> >>>>>>>>>>> model >> >>>>>>>>>>> (or ideally API), but pushed down to be a concern of the >> runner >> >>>>>>>>>>> (executor) of the pipeline. >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský < >> [email protected]> >> >>>>>>>>>>> wrote: >> >>>>>>>>>>>> Hi Kenn, >> >>>>>>>>>>>> >> >>>>>>>>>>>> OK, so if we introduce annotation, we can have stateful ParDo >> >>>>>>>>>>>> with sorting, that would perfectly resolve my issues. I still >> >>>>>>>>>>>> have some doubts, though. Let me explain. The current >> behavior of >> >>>>>>>>>>>> stateful ParDo has the following properties: >> >>>>>>>>>>>> >> >>>>>>>>>>>> a) might fail in batch, although runs fine in streaming >> (that >> >>>>>>>>>>>> is due to the buffering, and unbounded lateness in batch, >> which >> >>>>>>>>>>>> was discussed back and forth in this thread) >> >>>>>>>>>>>> >> >>>>>>>>>>>> b) might be non deterministic (this is because the >> elements >> >>>>>>>>>>>> arrive at somewhat random order, and even if you do the >> operation >> >>>>>>>>>>>> "assign unique ID to elements" this might produce different >> >>>>>>>>>>>> results when run multiple times) >> >>>>>>>>>>> PCollections are *explicitly* unordered. Any operations that >> >>>>>>>>>>> assume or >> >>>>>>>>>>> depend on a specific ordering for correctness (or >> determinism) must >> >>>>>>>>>>> provide that ordering themselves (i.e. tolerate "arbitrary >> shuffling >> >>>>>>>>>>> of inputs"). As you point out, that may be very expensive if >> you have >> >>>>>>>>>>> very hot keys with very large (unbounded) timestamp skew. >> >>>>>>>>>>> >> >>>>>>>>>>> StatefulDoFns are low-level operations that should be used >> with care; >> >>>>>>>>>>> the simpler windowing model gives determinism in the face of >> >>>>>>>>>>> unordered >> >>>>>>>>>>> data (though late data and non-end-of-window triggering >> introduces >> >>>>>>>>>>> some of the non-determanism back in). >> >>>>>>>>>>> >> >>>>>>>>>>>> What worries me most is the property b), because it seems to >> me >> >>>>>>>>>>>> to have serious consequences - not only that if you run twice >> >>>>>>>>>>>> batch pipeline you would get different results, but even on >> >>>>>>>>>>>> streaming, when pipeline fails and gets restarted from >> >>>>>>>>>>>> checkpoint, produced output might differ from the previous >> run >> >>>>>>>>>>>> and data from the first run might have already been persisted >> >>>>>>>>>>>> into sink. That would create somewhat messy outputs. >> >>>>>>>>>>> Beam has an exactly-once model. If the data was consumed, >> state >> >>>>>>>>>>> mutated, and outputs written downstream (these three are >> committed >> >>>>>>>>>>> together atomically) it will not be replayed. That does not, >> of >> >>>>>>>>>>> course, solve the non-determanism due to ordering (including >> the fact >> >>>>>>>>>>> that two operations reading the same PCollection may view >> different >> >>>>>>>>>>> ordering). >> >>>>>>>>>>> >> >>>>>>>>>>>> These two properties makes me think that the current >> >>>>>>>>>>>> implementation is more of a _special case_ than the general >> one. >> >>>>>>>>>>>> The general one would be that your state doesn't have the >> >>>>>>>>>>>> properties to be able to tolerate buffering problems and/or >> >>>>>>>>>>>> non-determinism. Which is the case where you need sorting in >> both >> >>>>>>>>>>>> streaming and batch to be part of the model. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Let me point out one more analogy - that is merging vs. >> >>>>>>>>>>>> non-merging windows. The general case (merging windows) >> implies >> >>>>>>>>>>>> sorting by timestamp in both batch case (explicit) and >> streaming >> >>>>>>>>>>>> (buffering). The special case (non-merging windows) doesn't >> rely >> >>>>>>>>>>>> on any timestamp ordering, so the sorting and buffering can >> be >> >>>>>>>>>>>> dropped. The underlying root cause of this is the same for >> both >> >>>>>>>>>>>> stateful ParDo and windowing (essentially, assigning window >> >>>>>>>>>>>> labels is a stateful operation when windowing function is >> merging). >> >>>>>>>>>>>> >> >>>>>>>>>>>> The reason for the current behavior of stateful ParDo seems >> to be >> >>>>>>>>>>>> performance, but is it right to abandon correctness in favor >> of >> >>>>>>>>>>>> performance? Wouldn't it be more consistent to have the >> default >> >>>>>>>>>>>> behavior prefer correctness and when you have the specific >> >>>>>>>>>>>> conditions of state function having special properties, then >> you >> >>>>>>>>>>>> can annotate your DoFn (with something like >> >>>>>>>>>>>> @TimeOrderingAgnostic), which would yield a better >> performance in >> >>>>>>>>>>>> that case? >> >>>>>>>>>>> There are two separable questions here. >> >>>>>>>>>>> >> >>>>>>>>>>> 1) Is it correct for a (Stateful)DoFn to assume elements are >> received >> >>>>>>>>>>> in a specific order? In the current model, it is not. Being >> able to >> >>>>>>>>>>> read, handle, and produced out-of-order data, including late >> data, is >> >>>>>>>>>>> a pretty fundamental property of distributed systems. >> >>>>>>>>>>> >> >>>>>>>>>>> 2) Given that some operations are easier (or possibly only >> possible) >> >>>>>>>>>>> to write when operating on ordered data, and that different >> runners >> >>>>>>>>>>> may have (significantly) cheaper ways to provide this >> ordering than >> >>>>>>>>>>> can be done by the user themselves, should we elevate this to >> a >> >>>>>>>>>>> property of (Stateful?)DoFns that the runner can provide? I >> think a >> >>>>>>>>>>> compelling argument can be made here that we should. >> >>>>>>>>>>> >> >>>>>>>>>>> - Robert >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>> Thanks for the nice small example of a calculation that >> depends >> >>>>>>>>>>>> on order. You are right that many state machines have this >> >>>>>>>>>>>> property. I agree w/ you and Luke that it is convenient for >> batch >> >>>>>>>>>>>> processing to sort by event timestamp before running a >> stateful >> >>>>>>>>>>>> ParDo. In streaming you could also implement "sort by event >> >>>>>>>>>>>> timestamp" by buffering until you know all earlier data will >> be >> >>>>>>>>>>>> dropped - a slack buffer up to allowed lateness. >> >>>>>>>>>>>> >> >>>>>>>>>>>> I do not think that it is OK to sort in batch and not in >> >>>>>>>>>>>> streaming. Many state machines diverge very rapidly when >> things >> >>>>>>>>>>>> are out of order. So each runner if they see the >> >>>>>>>>>>>> "@OrderByTimestamp" annotation (or whatever) needs to deliver >> >>>>>>>>>>>> sorted data (by some mix of buffering and dropping), or to >> reject >> >>>>>>>>>>>> the pipeline as unsupported. >> >>>>>>>>>>>> >> >>>>>>>>>>>> And also want to say that this is not the default case - many >> >>>>>>>>>>>> uses of state & timers in ParDo yield different results at >> the >> >>>>>>>>>>>> element level, but the results are equivalent at in the big >> >>>>>>>>>>>> picture. Such as the example of "assign a unique sequence >> number >> >>>>>>>>>>>> to each element" or "group into batches" it doesn't matter >> >>>>>>>>>>>> exactly what the result is, only that it meets the spec. And >> >>>>>>>>>>>> other cases like user funnels are monotonic enough that you >> also >> >>>>>>>>>>>> don't actually need sorting. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Kenn >> >>>>>>>>>>>> >> >>>>>>>>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský < >> [email protected]> >> >>>>>>>>>>>> wrote: >> >>>>>>>>>>>>> Yes, the problem will arise probably mostly when you have >> not >> >>>>>>>>>>>>> well distributed keys (or too few keys). I'm really not >> sure if >> >>>>>>>>>>>>> a pure GBK with a trigger can solve this - it might help to >> have >> >>>>>>>>>>>>> data driven trigger. There would still be some doubts, >> though. >> >>>>>>>>>>>>> The main question is still here - people say, that sorting >> by >> >>>>>>>>>>>>> timestamp before stateful ParDo would be prohibitively >> slow, but >> >>>>>>>>>>>>> I don't really see why - the sorting is very probably >> already >> >>>>>>>>>>>>> there. And if not (hash grouping instead of sorted >> grouping), >> >>>>>>>>>>>>> then the sorting would affect only user defined >> StatefulParDos. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> This would suggest that the best way out of this would be >> really >> >>>>>>>>>>>>> to add annotation, so that the author of the pipeline can >> decide. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> If that would be acceptable I think I can try to prepare >> some >> >>>>>>>>>>>>> basic functionality, but I'm not sure, if I would be able to >> >>>>>>>>>>>>> cover all runners / sdks. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> It is read all per key and window and not just read all >> (this >> >>>>>>>>>>>>> still won't scale with hot keys in the global window). The >> GBK >> >>>>>>>>>>>>> preceding the StatefulParDo will guarantee that you are >> >>>>>>>>>>>>> processing all the values for a specific key and window at >> any >> >>>>>>>>>>>>> given time. Is there a specific window/trigger that is >> missing >> >>>>>>>>>>>>> that you feel would remove the need for you to use >> StatefulParDo? >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský < >> [email protected]> >> >>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>> Hi Lukasz, >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee >> >>>>>>>>>>>>>>> that your StatefulParDo implements the necessary >> "buffering & >> >>>>>>>>>>>>>>> sorting" into state. >> >>>>>>>>>>>>>> Yes, no problem with that. But this whole discussion >> started, >> >>>>>>>>>>>>>> because *this doesn't work on batch*. You simply cannot >> first >> >>>>>>>>>>>>>> read everything from distributed storage and then buffer >> it all >> >>>>>>>>>>>>>> into memory, just to read it again, but sorted. That will >> not >> >>>>>>>>>>>>>> work. And even if it would, it would be a terrible waste of >> >>>>>>>>>>>>>> resources. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Jan >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský < >> [email protected]> >> >>>>>>>>>>>>>> wrote: >> >>>>>>>>>>>>>>> This discussion brings many really interesting questions >> for >> >>>>>>>>>>>>>>> me. :-) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> > I don't see batch vs. streaming as part of the >> model. One >> >>>>>>>>>>>>>>> can have >> >>>>>>>>>>>>>>> microbatch, or even a runner that alternates between >> different >> >>>>>>>>>>>>>>> modes. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Although I understand motivation of this statement, this >> >>>>>>>>>>>>>>> project name is >> >>>>>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What >> >>>>>>>>>>>>>>> does the >> >>>>>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the >> model? >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Using microbatching, chaining of batch jobs, or pure >> streaming >> >>>>>>>>>>>>>>> are >> >>>>>>>>>>>>>>> exactly the "runtime conditions/characteristics" I refer >> to. >> >>>>>>>>>>>>>>> All these >> >>>>>>>>>>>>>>> define several runtime parameters, which in turn define >> how >> >>>>>>>>>>>>>>> well/badly >> >>>>>>>>>>>>>>> will the pipeline perform and how many resources might be >> >>>>>>>>>>>>>>> needed. From >> >>>>>>>>>>>>>>> my point of view, pure streaming should be the most >> resource >> >>>>>>>>>>>>>>> demanding >> >>>>>>>>>>>>>>> (if not, why bother with batch? why not run everything in >> >>>>>>>>>>>>>>> streaming >> >>>>>>>>>>>>>>> only? what will there remain to "unify"?). >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> > Fortunately, for batch, only the state for a single >> key >> >>>>>>>>>>>>>>> needs to be >> >>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys >> across >> >>>>>>>>>>>>>>> the range >> >>>>>>>>>>>>>>> of skew. Of course if you have few or hot keys, one can >> still >> >>>>>>>>>>>>>>> have >> >>>>>>>>>>>>>>> issues (and this is not specific to StatefulDoFns). >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Yes, but here is still the presumption that my stateful >> DoFn can >> >>>>>>>>>>>>>>> tolerate arbitrary shuffling of inputs. Let me explain >> the use >> >>>>>>>>>>>>>>> case in >> >>>>>>>>>>>>>>> more detail. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Suppose you have input stream consisting of 1s and 0s (and >> >>>>>>>>>>>>>>> some key for >> >>>>>>>>>>>>>>> each element, which is irrelevant for the demonstration). >> Your >> >>>>>>>>>>>>>>> task is >> >>>>>>>>>>>>>>> to calculate in running global window the actual number of >> >>>>>>>>>>>>>>> changes >> >>>>>>>>>>>>>>> between state 0 and state 1 and vice versa. When the state >> >>>>>>>>>>>>>>> doesn't >> >>>>>>>>>>>>>>> change, you don't calculate anything. If input (for given >> key) >> >>>>>>>>>>>>>>> would be >> >>>>>>>>>>>>>>> (tN denotes timestamp N): >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t1: 1 >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t2: 0 >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t3: 0 >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t4: 1 >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t5: 1 >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t6: 0 >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> then the output should yield (supposing that default >> state is >> >>>>>>>>>>>>>>> zero): >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t1: (one: 1, zero: 0) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t2: (one: 1, zero: 1) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t3: (one: 1, zero: 1) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t4: (one: 2, zero: 1) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t5: (one: 2, zero: 1) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> t6: (one: 2, zero: 2) >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> How would you implement this in current Beam semantics? >> >>>>>>>>>>>>>> I think your saying here that I know that my input is >> ordered >> >>>>>>>>>>>>>> in a specific way and since I assume the order when >> writing my >> >>>>>>>>>>>>>> pipeline I can perform this optimization. But there is >> nothing >> >>>>>>>>>>>>>> preventing a runner from noticing that your processing in >> the >> >>>>>>>>>>>>>> global window with a specific type of trigger and >> re-ordering >> >>>>>>>>>>>>>> your inputs/processing to get better performance (since you >> >>>>>>>>>>>>>> can't use an AfterWatermark trigger for your pipeline in >> >>>>>>>>>>>>>> streaming for the GlobalWindow). >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee >> that >> >>>>>>>>>>>>>> your StatefulParDo implements the necessary "buffering & >> >>>>>>>>>>>>>> sorting" into state. I can see why you would want an >> annotation >> >>>>>>>>>>>>>> that says I must have timestamp ordered elements, since it >> >>>>>>>>>>>>>> makes writing certain StatefulParDos much easier. >> StatefulParDo >> >>>>>>>>>>>>>> is a low-level function, it really is the "here you go and >> do >> >>>>>>>>>>>>>> whatever you need to but here be dragons" function while >> >>>>>>>>>>>>>> windowing and triggering is meant to keep many people from >> >>>>>>>>>>>>>> writing StatefulParDo in the first place. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> > Pipelines that fail in the "worst case" batch >> scenario >> >>>>>>>>>>>>>>> are likely to >> >>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the >> watermark >> >>>>>>>>>>>>>>> falls >> >>>>>>>>>>>>>>> behind in streaming mode as well. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> But the worst case is defined by input of size (available >> >>>>>>>>>>>>>>> resources + >> >>>>>>>>>>>>>>> single byte) -> pipeline fail. Although it could have >> >>>>>>>>>>>>>>> finished, given >> >>>>>>>>>>>>>>> the right conditions. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> > This might be reasonable, implemented by default by >> >>>>>>>>>>>>>>> buffering >> >>>>>>>>>>>>>>> everything and releasing elements as the watermark >> (+lateness) >> >>>>>>>>>>>>>>> advances, >> >>>>>>>>>>>>>>> but would likely lead to inefficient (though *maybe* >> easier to >> >>>>>>>>>>>>>>> reason >> >>>>>>>>>>>>>>> about) code. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Sure, the pipeline will be less efficient, because it >> would >> >>>>>>>>>>>>>>> have to >> >>>>>>>>>>>>>>> buffer and sort the inputs. But at least it will produce >> >>>>>>>>>>>>>>> correct results >> >>>>>>>>>>>>>>> in cases where updates to state are order-sensitive. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> > Would it be roughly equivalent to GBK + >> FlatMap(lambda >> >>>>>>>>>>>>>>> (key, values): >> >>>>>>>>>>>>>>> [(key, value) for value in values])? >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> I'd say roughly yes, but difference would be in the >> trigger. >> >>>>>>>>>>>>>>> The trigger >> >>>>>>>>>>>>>>> should ideally fire as soon as watermark (+lateness) >> crosses >> >>>>>>>>>>>>>>> element >> >>>>>>>>>>>>>>> with lowest timestamp in the buffer. Although this could >> be >> >>>>>>>>>>>>>>> somehow >> >>>>>>>>>>>>>>> emulated by fixed trigger each X millis. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> > Or is the underlying desire just to be able to hint >> to >> >>>>>>>>>>>>>>> the runner >> >>>>>>>>>>>>>>> that the code may perform better (e.g. require less >> resources) >> >>>>>>>>>>>>>>> as skew >> >>>>>>>>>>>>>>> is reduced (and hence to order by timestamp iff it's >> cheap)? >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> No, the sorting would have to be done in streaming case as >> >>>>>>>>>>>>>>> well. That is >> >>>>>>>>>>>>>>> an imperative of the unified model. I think it is >> possible to >> >>>>>>>>>>>>>>> sort by >> >>>>>>>>>>>>>>> timestamp only in batch case (and do it for *all* batch >> >>>>>>>>>>>>>>> stateful pardos >> >>>>>>>>>>>>>>> without annotation), or introduce annotation, but then >> make >> >>>>>>>>>>>>>>> the same >> >>>>>>>>>>>>>>> guarantees for streaming case as well. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Jan >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote: >> >>>>>>>>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský >> >>>>>>>>>>>>>>>> <[email protected]> wrote: >> >>>>>>>>>>>>>>>>> Hi Robert, >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> yes, I think you rephrased my point - although no >> *explicit* >> >>>>>>>>>>>>>>>>> guarantees >> >>>>>>>>>>>>>>>>> of ordering are given in either mode, there is >> *implicit* >> >>>>>>>>>>>>>>>>> ordering in >> >>>>>>>>>>>>>>>>> streaming case that is due to nature of the processing >> - the >> >>>>>>>>>>>>>>>>> difference >> >>>>>>>>>>>>>>>>> between watermark and timestamp of elements flowing >> through >> >>>>>>>>>>>>>>>>> the pipeline >> >>>>>>>>>>>>>>>>> are generally low (too high difference leads to the >> >>>>>>>>>>>>>>>>> overbuffering >> >>>>>>>>>>>>>>>>> problem), but there is no such bound on batch. >> >>>>>>>>>>>>>>>> Fortunately, for batch, only the state for a single key >> needs >> >>>>>>>>>>>>>>>> to be >> >>>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys >> >>>>>>>>>>>>>>>> across the >> >>>>>>>>>>>>>>>> range of skew. Of course if you have few or hot keys, >> one can >> >>>>>>>>>>>>>>>> still >> >>>>>>>>>>>>>>>> have issues (and this is not specific to StatefulDoFns). >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> As a result, I see a few possible solutions: >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> - the best and most natural seems to be extension >> of >> >>>>>>>>>>>>>>>>> the model, so >> >>>>>>>>>>>>>>>>> that it defines batch as not only "streaming pipeline >> >>>>>>>>>>>>>>>>> executed in batch >> >>>>>>>>>>>>>>>>> fashion", but "pipeline with at least as good runtime >> >>>>>>>>>>>>>>>>> characteristics as >> >>>>>>>>>>>>>>>>> in streaming case, executed in batch fashion", I really >> >>>>>>>>>>>>>>>>> don't think that >> >>>>>>>>>>>>>>>>> there are any conflicts with the current model, or that >> this >> >>>>>>>>>>>>>>>>> could >> >>>>>>>>>>>>>>>>> affect performance, because the required sorting (as >> pointed by >> >>>>>>>>>>>>>>>>> Aljoscha) is very probably already done during >> translation >> >>>>>>>>>>>>>>>>> of stateful >> >>>>>>>>>>>>>>>>> pardos. Also note that this definition only affects user >> >>>>>>>>>>>>>>>>> defined >> >>>>>>>>>>>>>>>>> stateful pardos >> >>>>>>>>>>>>>>>> I don't see batch vs. streaming as part of the model. >> One can >> >>>>>>>>>>>>>>>> have >> >>>>>>>>>>>>>>>> microbatch, or even a runner that alternates between >> >>>>>>>>>>>>>>>> different modes. >> >>>>>>>>>>>>>>>> The model describes what the valid outputs are given a >> >>>>>>>>>>>>>>>> (sometimes >> >>>>>>>>>>>>>>>> partial) set of inputs. It becomes really hard to define >> >>>>>>>>>>>>>>>> things like >> >>>>>>>>>>>>>>>> "as good runtime characteristics." Once you allow any >> >>>>>>>>>>>>>>>> out-of-orderedness, it is not very feasible to try and >> define >> >>>>>>>>>>>>>>>> (and >> >>>>>>>>>>>>>>>> more cheaply implement) a "upper bound" of acceptable >> >>>>>>>>>>>>>>>> out-of-orderedness. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Pipelines that fail in the "worst case" batch scenario >> are >> >>>>>>>>>>>>>>>> likely to >> >>>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the >> watermark >> >>>>>>>>>>>>>>>> falls >> >>>>>>>>>>>>>>>> behind in streaming mode as well. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> - another option would be to introduce annotation >> for >> >>>>>>>>>>>>>>>>> DoFns (e.g. >> >>>>>>>>>>>>>>>>> @RequiresStableTimeCharacteristics), which would result >> in >> >>>>>>>>>>>>>>>>> the sorting >> >>>>>>>>>>>>>>>>> in batch case - but - this extension would have to >> ensure >> >>>>>>>>>>>>>>>>> the sorting in >> >>>>>>>>>>>>>>>>> streaming mode also - it would require definition of >> allowed >> >>>>>>>>>>>>>>>>> lateness, >> >>>>>>>>>>>>>>>>> and triggger (essentially similar to window) >> >>>>>>>>>>>>>>>> This might be reasonable, implemented by default by >> buffering >> >>>>>>>>>>>>>>>> everything and releasing elements as the watermark >> (+lateness) >> >>>>>>>>>>>>>>>> advances, but would likely lead to inefficient (though >> >>>>>>>>>>>>>>>> *maybe* easier >> >>>>>>>>>>>>>>>> to reason about) code. Not sure about the semantics of >> >>>>>>>>>>>>>>>> triggering >> >>>>>>>>>>>>>>>> here, especially data-driven triggers. Would it be >> roughly >> >>>>>>>>>>>>>>>> equivalent >> >>>>>>>>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for >> >>>>>>>>>>>>>>>> value in >> >>>>>>>>>>>>>>>> values])? >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Or is the underlying desire just to be able to hint to >> the >> >>>>>>>>>>>>>>>> runner that >> >>>>>>>>>>>>>>>> the code may perform better (e.g. require less >> resources) as >> >>>>>>>>>>>>>>>> skew is >> >>>>>>>>>>>>>>>> reduced (and hence to order by timestamp iff it's cheap)? >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> - last option would be to introduce these "higher >> order >> >>>>>>>>>>>>>>>>> guarantees" in >> >>>>>>>>>>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to >> be the >> >>>>>>>>>>>>>>>>> worst >> >>>>>>>>>>>>>>>>> option to me >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I see the first two options quite equally good, >> although the >> >>>>>>>>>>>>>>>>> letter one >> >>>>>>>>>>>>>>>>> is probably more time consuming to implement. But it >> would >> >>>>>>>>>>>>>>>>> bring >> >>>>>>>>>>>>>>>>> additional feature to streaming case as well. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Thanks for any thoughts. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Jan >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote: >> >>>>>>>>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský >> >>>>>>>>>>>>>>>>>> <[email protected]> wrote: >> >>>>>>>>>>>>>>>>>>> Hi Reuven, >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch >> >>>>>>>>>>>>>>>>>>>> runners. >> >>>>>>>>>>>>>>>>>>> Stateful ParDo works in batch as far, as the logic >> inside >> >>>>>>>>>>>>>>>>>>> the state works for absolutely unbounded >> out-of-orderness >> >>>>>>>>>>>>>>>>>>> of elements. That basically (practically) can work >> only >> >>>>>>>>>>>>>>>>>>> for cases, where the order of input elements doesn't >> >>>>>>>>>>>>>>>>>>> matter. But, "state" can refer to "state machine", >> and any >> >>>>>>>>>>>>>>>>>>> time you have a state machine involved, then the >> ordering >> >>>>>>>>>>>>>>>>>>> of elements would matter. >> >>>>>>>>>>>>>>>>>> No guarantees on order are provided in *either* >> streaming >> >>>>>>>>>>>>>>>>>> or batch >> >>>>>>>>>>>>>>>>>> mode by the model. However, it is the case that in >> order to >> >>>>>>>>>>>>>>>>>> make >> >>>>>>>>>>>>>>>>>> forward progress most streaming runners attempt to >> limit >> >>>>>>>>>>>>>>>>>> the amount of >> >>>>>>>>>>>>>>>>>> out-of-orderedness of elements (in terms of event time >> vs. >> >>>>>>>>>>>>>>>>>> processing >> >>>>>>>>>>>>>>>>>> time) to make forward progress, which in turn could >> help >> >>>>>>>>>>>>>>>>>> cap the >> >>>>>>>>>>>>>>>>>> amount of state that must be held concurrently, >> whereas a >> >>>>>>>>>>>>>>>>>> batch runner >> >>>>>>>>>>>>>>>>>> may not allow any state to be safely discarded until >> the whole >> >>>>>>>>>>>>>>>>>> timeline from infinite past to infinite future has been >> >>>>>>>>>>>>>>>>>> observed. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Also, as pointed out, state is not preserved "batch to >> >>>>>>>>>>>>>>>>>> batch" in batch mode. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels >> >>>>>>>>>>>>>>>>>> <[email protected]> wrote: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> batch semantics and streaming semantics >> differs only >> >>>>>>>>>>>>>>>>>>>> in that I can have GlobalWindow with default trigger >> on >> >>>>>>>>>>>>>>>>>>>> batch and cannot on stream >> >>>>>>>>>>>>>>>>>>> You can have a GlobalWindow in streaming with a >> default >> >>>>>>>>>>>>>>>>>>> trigger. You >> >>>>>>>>>>>>>>>>>>> could define additional triggers that do early >> firings. >> >>>>>>>>>>>>>>>>>>> And you could >> >>>>>>>>>>>>>>>>>>> even trigger the global window by advancing the >> watermark >> >>>>>>>>>>>>>>>>>>> to +inf. >> >>>>>>>>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global window >> with >> >>>>>>>>>>>>>>>>>> default >> >>>>>>>>>>>>>>>>>> trigger on unbounded PCollections in the SDK because >> this >> >>>>>>>>>>>>>>>>>> is more >> >>>>>>>>>>>>>>>>>> likely to be user error than an actual desire to have >> no >> >>>>>>>>>>>>>>>>>> output until >> >>>>>>>>>>>>>>>>>> drain. But it's semantically valid in the model. >> >>
