I think there is more to it than that. You'll probably want retractions integrated into Beam's core triggering.
One example of where retractions are needed is with session windows. Early triggers are fairly broken with session windows because the windows themselves change as more data arrive. So an early trigger might generate data for two separate windows S1 and S2, but after more data arrives those two windows merge into a single S3. Retractions solve this by hooking into the window merging logic, retracting the outputs for S1 and S2 before outputting S3. I don't think this is possible today with a DSL. On Thu, Apr 25, 2024 at 5:46 AM Jan Lukavský <je...@seznam.cz> wrote: > > To implement retraction (at least to do so efficiently) I think you'll > want it integrated in the model. e.g. for combiners one would want to add > the option to subtract values, otherwise you would end up having to store > every element defeating the performance that combiners provide. > > I think we use different words for the same. :) This is what I meant by > "and define appropriate retraction functions to CombineFn and the like". > > On the other hand, you _could_ derive CombineFn with retraction > capabilities, if you have a DSL that provides "retraction accumulation" > function along with "addition accumulation", this should be possible to be > combined into a CombineFn as we have it today. This should only require > that the operation being combined is associative, commutative and have a > valid unary operator '-' (which will be the result of the "retraction > combine"). > On 4/23/24 18:08, Reuven Lax via dev wrote: > > > > On Tue, Apr 23, 2024 at 7:52 AM Jan Lukavský <je...@seznam.cz> wrote: > >> On 4/22/24 20:40, Kenneth Knowles wrote: >> >> I'll go ahead and advertise https://s.apache.org/beam-sink-triggers >> again for this thread. >> >> +1 >> >> >> There are a couple of difficult technical problems in there. One of them >> is backwards-propagating triggering to minimize extra latency. We can >> probably solve this as well as we solve forward-propagating without too >> much trouble. We also can/should leave it abstract so runners can implement >> either through back-propagating local triggering rules or using run-time >> communication to trigger upstream. These actually could interact well with >> stateful ParDo by sending a "trigger now please" message or some such. >> >> Yes, this was what I was referring to as the more "functional" style for >> stateful ParDo. At minimum, it requires adding new callback independent of >> @ProcessElement and @OnTimer - @OnTrigger? >> >> >> But we also probably need retractions that automatically flow through the >> pipeline and update aggregations. Why? Because currently triggers don't >> just control update frequency but actually create new elements each >> time, so they require user fix-up logic to do the right thing with the >> output. When we go to higher levels of abstraction we need this to "just >> work" without changing the pipeline. There have been two (nearly identical) >> propotypes of adding retractions to the DirectRunner as proof of concept. >> But there's also work in all the IOs since they are not retraction-aware. >> Also lots of work in many library transforms where a retraction should be >> computed by running the transform "like normal" but then negating the >> result, but that cannot be the default for ParDo because it is deliberately >> more flexible, we just have to annotate Map and the like. >> >> +1. I think retractions could be implemented as DSL on top of the current >> model. Retractions can be viewed as regular data elements with additional >> metadata (upsert, delete). For ParDo we could add something like >> @RetractElement (and define appropriate retraction functions to CombineFn >> and the like). We could introduce RetractedPCollection or similar for this >> purpose. >> > > To implement retraction (at least to do so efficiently) I think you'll > want it integrated in the model. e.g. for combiners one would want to add > the option to subtract values, otherwise you would end up having to store > every element defeating the performance that combiners provide. > >> >> Getting all this right is a lot of work but would result in a system that >> is simpler to use out-of-the-box and a more robust SQL implementation >> (because you can't use triggers with SQL unless you have retractions or >> some other "just works" mode of computation). It would essentially change >> Beam into a delta-processing engine, which it arguably should be, with >> whole append-only elements being a simplest degenerate case of a delta >> (which would be highly optimized in batch/archival processing). >> >> +1 >> >> >> Kenn >> >> On Tue, Apr 16, 2024 at 2:36 AM Reuven Lax via dev <dev@beam.apache.org> >> wrote: >> >>> Yes, but that's inevitable as stateful ParDo in a sense live outside of >>> most of the window/trigger semantics. Basically a stateful ParDo is the >>> user executing low-level control over these semantics, and controlling >>> output frequency themselves with timers. One could however still propagate >>> the trigger upstream of the stateful ParDo, though I'm not sure if that's >>> the best approach. >>> >>> On Mon, Apr 15, 2024 at 11:31 PM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> On 4/11/24 18:20, Reuven Lax via dev wrote: >>>> >>>> I'm not sure it would require all that. A "basic" implementation could >>>> be done on top of our existing model. Essentially the user would specify >>>> triggers at the sink ParDos, then the runner would walk backwards up the >>>> graph, reverse-propagating these triggers (with some resolution rules aimed >>>> at keeping the minimum trigger latency). The runner could under the covers >>>> simply just apply the appropriate trigger into the Window, using the >>>> current mechanism. Of course building this all into the framework from >>>> scratch would be cleaner, but we could also build this on top of what we >>>> have. >>>> >>>> Any propagation from sink to source would be blocked by any stateful >>>> ParDo, because that does not adhere to the concept of trigger, no? Hence, >>>> we could get the required downstream 'cadence' of outputs, but these would >>>> change only when the upstream ParDo emits any data. Yes, one can argue that >>>> stateful ParDo is supposed to emit data at fast as possible, then this >>>> seems to work. >>>> >>>> >>>> On Thu, Apr 11, 2024 at 5:10 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> I've probably heard about it, but I never read the proposal. Sounds >>>>> great, but that would require to change our ParDos from the 'directive' >>>>> style to something more functional, so that processing of elements, state >>>>> updates and outputting results can be decoupled and managed by the runner >>>>> independently. This goes exactly in the direction of unifying GBK and >>>>> Combine with stateful ParDo. Sounds like something worth exploring for >>>>> Beam >>>>> 3. :) >>>>> >>>>> Anyway, thanks for this discussion, helped me clarify some more white >>>>> spots. >>>>> >>>>> Jan >>>>> On 4/10/24 19:24, Reuven Lax via dev wrote: >>>>> >>>>> Are you familiar with the "sink triggers" proposal? >>>>> >>>>> Essentially while windowing is usually a property of the data, and >>>>> therefore flows downwards through the graph, triggering is usually a >>>>> property of output (i.e. sink) latency - how much are you willing to wait >>>>> to see data, and what semantics do you want for this early data. Ideally >>>>> triggers should be specified separately at the ParDo level (Beam has no >>>>> real notion of Sinks as a special object, so to allow for output >>>>> specification it has to be on the ParDo), and the triggers should >>>>> propagate >>>>> up the graph back to the source. This is in contrast to today where we >>>>> attach triggering to the windowing information. >>>>> >>>>> This was a proposal some years back and there was some effort made to >>>>> implement it, but the implementation never really got off the ground. >>>>> >>>>> On Wed, Apr 10, 2024 at 12:43 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> On 4/9/24 18:33, Kenneth Knowles wrote: >>>>>> >>>>>> At a top level `setWindowingStrategyInternal` exists to set up the >>>>>> metadata without actually assigning windows. If we were more clever we >>>>>> might have found a way for it to not be public... it is something that >>>>>> can >>>>>> easily lead to an invalid pipeline. >>>>>> >>>>>> Yes, that was what hit me about one minute after I started this >>>>>> thread. :) >>>>>> >>>>>> >>>>>> I think "compatible windows" today in Beam doesn't have very good >>>>>> uses anyhow. I do see how when you are flattening PCollections you might >>>>>> also want to explicitly have a function that says "and here is how to >>>>>> reconcile their different metadata". But is it not reasonable to use >>>>>> Window.into(global window)? It doesn't seem like boilerplate to me >>>>>> actually, but something you really want to know is happening. >>>>>> >>>>>> :) >>>>>> >>>>>> Of course this was the way out, but I was somewhat intuitively >>>>>> seeking something that could go this autonomously. >>>>>> >>>>>> Generally speaking, we might have some room for improvement in the >>>>>> way we handle windows and triggers - windows relate only to GBK and >>>>>> stateful ParDo, triggers relate to GBK only. They have no semantics if >>>>>> downstream processing does not use any of these. There could be a >>>>>> pipeline >>>>>> preprocessing stage that would discard (replace with meaningful defaults) >>>>>> any of these metadata that is unused, but can cause Pipeline to fail at >>>>>> construction time. It is also (to me) somewhat questionable if triggers >>>>>> are >>>>>> really a property of a PCollection or a property of a specific transform >>>>>> (GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but that is >>>>>> completely different story :)) because (non-default) triggers are likely >>>>>> not preserved across multiple transforms. Maybe the correct subject of >>>>>> this >>>>>> thread could be "are we sure our windowing and triggering semantics is >>>>>> 100% >>>>>> correct"? Probably the - wrong - expectations at the beginning of this >>>>>> thread were due to conflict in my mental model of how things 'could' work >>>>>> as opposed to how they actually work. :) >>>>>> >>>>>> Jan >>>>>> >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>>> >>>>>>> On 4/6/24 21:23, Reuven Lax via dev wrote: >>>>>>> >>>>>>> So the problem here is that windowFn is a property of the >>>>>>> PCollection, not the element, and the result of Flatten is a single >>>>>>> PCollection. >>>>>>> >>>>>>> Yes. That is the cause of why Flatten.pCollections() needs the same >>>>>>> windowFn. >>>>>>> >>>>>>> >>>>>>> In various cases, there is a notion of "compatible" windows. >>>>>>> Basically given window functions W1 and W2, provide a W3 that "works" >>>>>>> with >>>>>>> both. >>>>>>> >>>>>>> Exactly this would be a nice feature for Flatten, something like >>>>>>> 'windowFn resolve strategy', so that if use does not know the windowFn >>>>>>> of >>>>>>> upstream PCollections this can be somehow resolved at pipeline >>>>>>> construction >>>>>>> time. Alternatively only as a small syntactic sugar, something like: >>>>>>> >>>>>>> >>>>>>> Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy())) >>>>>>> >>>>>>> or anything similar. This can be done in user code, so it is not >>>>>>> something deeper, but might help in some cases. It would be cool if we >>>>>>> could reuse concepts from other cases where such mechanism is needed. >>>>>>> >>>>>>> >>>>>>> Note that Beam already has something similar with side inputs, since >>>>>>> the side input often is in a different window than the main input. >>>>>>> However >>>>>>> main input elements are supposed to see side input elements in the same >>>>>>> window (and in fact main inputs are blocked until the side-input window >>>>>>> is >>>>>>> ready), so we must do a mapping. If for example (and very commonly!) the >>>>>>> side input is in the global window and the main input is in a fixed >>>>>>> window, >>>>>>> by default we will remap the global-window elements into the >>>>>>> main-input's >>>>>>> fixed window. >>>>>>> >>>>>>> This is a one-sided merge function, there is a 'main' and 'side' >>>>>>> input, but the generic symmetric merge might be possible as well. E.g. >>>>>>> if >>>>>>> one PCollection of Flatten is in GlobalWindow, I wonder if there are >>>>>>> cases >>>>>>> where users would actually want to do anything else then apply the same >>>>>>> global windowing strategy to all input PCollections. >>>>>>> >>>>>>> Jan >>>>>>> >>>>>>> >>>>>>> In Side input we also allow the user to control this mapping, so for >>>>>>> example side input elements could always map to the previous fixed >>>>>>> window >>>>>>> (e.g. while processing window 12-1, you want to see summary data of all >>>>>>> records in the previous window 11-12). Users can do this by providing a >>>>>>> WindowMappingFunction to the View - essentially a function from window >>>>>>> to >>>>>>> window. Unfortunately this is hard to use (one must create their own >>>>>>> PCollectionView class) and very poorly documented, so I doubt many users >>>>>>> know about this! >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>>>> >>>>>>>> Immediate self-correction, although setting the strategy directly >>>>>>>> via >>>>>>>> setWindowingStrategyInternal() *seemed* to be working during >>>>>>>> Pipeline >>>>>>>> construction time, during runtime it obviously does not work, >>>>>>>> because >>>>>>>> the PCollection was still windowed using the old windowFn. Make >>>>>>>> sense to >>>>>>>> me, but there remains the other question if we can make flattening >>>>>>>> PCollections with incompatible windowFns more user-friendly. The >>>>>>>> current >>>>>>>> approach where we require the same windowFn for all input >>>>>>>> PCollections >>>>>>>> creates some unnecessary boilerplate code needed on user side. >>>>>>>> >>>>>>>> Jan >>>>>>>> >>>>>>>> On 4/6/24 15:45, Jan Lukavský wrote: >>>>>>>> > Hi, >>>>>>>> > >>>>>>>> > I came across a case where using >>>>>>>> > PCollection#applyWindowingStrategyInternal seems legit in user >>>>>>>> core. >>>>>>>> > The case is roughly as follows: >>>>>>>> > >>>>>>>> > a) compute some streaming statistics >>>>>>>> > >>>>>>>> > b) apply the same transform (say ComputeWindowedAggregation) >>>>>>>> with >>>>>>>> > different parameters on these statistics yielding two windowed >>>>>>>> > PCollections - first is global with early trigger, the other is >>>>>>>> > sliding window, the specific parameters of the windowFns are >>>>>>>> > encapsulated in the ComputeWindowedAggregation transform >>>>>>>> > >>>>>>>> > c) apply the same transform on both of the above PCollections, >>>>>>>> > yielding two PCollections with the same types, but different >>>>>>>> windowFns >>>>>>>> > >>>>>>>> > d) flatten these PCollections into single one (e.g. for >>>>>>>> downstream >>>>>>>> > processing - joining - or flushing to sink) >>>>>>>> > >>>>>>>> > Now, the flatten will not work, because these PCollections have >>>>>>>> > different windowFns. It would be possible to restore the >>>>>>>> windowing for >>>>>>>> > either of them, but it requires to somewhat break the >>>>>>>> encapsulation of >>>>>>>> > the transforms that produce the windowed outputs. A more natural >>>>>>>> > solution is to take the WindowingStrategy from the global >>>>>>>> aggregation >>>>>>>> > and set it via setWindowingStrategyInternal() to the other >>>>>>>> > PCollection. This works, but it uses API that is marked as >>>>>>>> @Internal >>>>>>>> > (and obviously, the name as well suggests it is not intended for >>>>>>>> > client-code usage). >>>>>>>> > >>>>>>>> > The question is, should we make a legitimate version of this >>>>>>>> call? Or >>>>>>>> > should we introduce a way for Flatten.pCollections() to re-window >>>>>>>> the >>>>>>>> > input PCollections appropriately? In the case of conflicting >>>>>>>> > WindowFns, where one of them is GlobalWindowing strategy, it >>>>>>>> seems to >>>>>>>> > me that the user's intention is quite well-defined (this might >>>>>>>> extend >>>>>>>> > to some 'flatten windowFn resolution strategy', maybe). >>>>>>>> > >>>>>>>> > WDYT? >>>>>>>> > >>>>>>>> > Jan >>>>>>>> > >>>>>>>> >>>>>>>