lostluck commented on code in PR #33763:
URL: https://github.com/apache/beam/pull/33763#discussion_r1931051241
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)
+// This file is intended to consolidate handling of WindowingStrategy and
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
}
+// EarliestCompletion marks when we can close a window.
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}
func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
}
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+ newElementCount int // The number of new elements since the last
check.
+ endOfWindowReached bool // Whether or not the end of the window has
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy. A trigger
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+ // IsReady determines if the trigger is ready to fire based on the
current state.
+ isReady(input triggerInput, state *StateData) bool
+ reset(state *StateData)
+
+ // TODO merging triggers and state for merging windows
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+ // finished indicates if the trigger has already fired or not.
+ finished bool
+ // extra is where additional data can be stored.
+ extra any
+}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window
expiration.
+type TriggerNever struct{}
+
+func (*TriggerNever) isReady(triggerInput, *StateData) bool {
+ return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{}
+
+func (*TriggerAlways) isReady(triggerInput, *StateData) bool {
+ return true
+}
+
+func (t *TriggerAlways) reset(state *StateData) {}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) isReady(input triggerInput, state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ // Track fired states.
+ if ts.extra == nil {
+ ts.extra = map[Trigger]bool{}
+ }
+ isFinished := ts.extra.(map[Trigger]bool)
+ for _, t := range t.SubTriggers {
+ // Don't re-evaluate.
+ if isFinished[t] {
+ continue
+ }
+ if t.isReady(input, state) {
+ isFinished[t] = true
+ }
+ }
+ ready := len(isFinished) == len(t.SubTriggers)
+ if ready {
+ ts.finished = true
+ }
+ state.setTriggerState(t, ts)
+ return ready
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+ for _, sub := range t.SubTriggers {
+ sub.reset(state)
+ }
+ delete(state.Trigger, t)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) isReady(input triggerInput, state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ anyReady := false
+ for _, t := range t.SubTriggers {
+ anyReady = t.isReady(input, state)
+ if anyReady {
+ break
+ }
+ }
+ if anyReady {
+ ts.finished = true
+ for _, sub := range t.SubTriggers {
+ // clear any subtrigger state, as we're fireing anyway.
+ sub.reset(state)
+ }
+ }
+ state.setTriggerState(t, ts)
+ return anyReady
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+ for _, sub := range t.SubTriggers {
+ sub.reset(state)
+ }
+ delete(state.Trigger, t)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) isReady(input triggerInput, state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ // Use extra for the current trigger index.
+ if ts.extra == nil {
+ ts.extra = int(0)
+ }
+ current := ts.extra.(int)
+ ready := false
+ if t.SubTriggers[current].isReady(input, state) {
+ current++
+ ready = true
+ }
+ if current >= len(t.SubTriggers) {
+ ts.finished = true
+ }
+ ts.extra = current
+ state.setTriggerState(t, ts)
+ return ready
+}
+
+func (t *TriggerAfterEach) reset(state *StateData) {
+ for _, sub := range t.SubTriggers {
+ sub.reset(state)
+ }
+ delete(state.Trigger, t)
+}
+
+// TriggerElementCount triggers when there have been at least the required
number
+// of elements have arrived.
+type TriggerElementCount struct {
+ ElementCount int
+}
+
+func (t *TriggerElementCount) isReady(input triggerInput, state *StateData)
bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+
+ if ts.extra == nil {
+ ts.extra = int(0)
+ }
+ count := ts.extra.(int) + input.newElementCount
+ ts.extra = count
+ ready := count >= t.ElementCount
+ if ready {
+ ts.finished = true
+ }
+ state.setTriggerState(t, ts)
Review Comment:
It is necessary since triggerState is passed by value and not by pointer at
present. It gains more reasonable zero value semantics, since we don't have to
do both a single value initialization, *and* any extra value initialization. It
also makes the state handling explicit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]