lostluck commented on issue #31438: URL: https://github.com/apache/beam/issues/31438#issuecomment-2651982022
At this point aggregation triggers are implemented in Prism, with the following caveats: * No support for ProcessingTime and SynchronizedProcessingTime triggers. * No support for triggered Side inputs. * No support for triggers with merging windows (and the extension of using a merging window PCollection as a side input as well but this isn't common or easy to reason about). ProcessingTime triggers aren't conceptually difficult, but do require recording the first time (in ProcessingTime) an element was seen in a pane, and then also allowing processing time to be part of the shouldFire evaluation. Such Triggers must be able to fire based on a processing time timer, which isn't currently plumbed in for Trigger use in Prism. Triggered Side inputs, are allowing the downstream transforms with the side input to be allowed to begin processing on signals other than the side input window closing. There will be some adjustments necessary for AddPendingSide, since the view of the data will need to be consistent and stable. Merging Windows first requires moving where the aggregation occurs: Right now it happens in the internal package as implemented in handlerunner.go. This should migrate to the Engine package. Care needs to be taken though. Merging Windows generally requires bundles to be sent and returned to the SDK in order to perform the merge for custom windows. The Java and Python SDKs may be able to do this by default for Session windows as a first obtuse pass, but Session Windows, as a known function, should just be merged Prism side. Once we know which windows must merge for a key, the state can then be merged for the trigger tree. This means the triggers should be extended with a onMerge handling, to ultimately merge the state for the triggers. It makes sense to me to keep the State lookup handling for the merged windows A + B = C to be external to the triggers. That keeps the core logic of the trigger merges to be unit-testable since we're just passing in 3 StateData, and operating on them . eg. `onMerge(A, B, C *StateData)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
