kennknowles commented on code in PR #33763:
URL: https://github.com/apache/beam/pull/33763#discussion_r1940349411
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,553 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)
+// This file is intended to consolidate handling of WindowingStrategy and
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
+
+ Trigger Trigger // Evaluated during execution.
+}
+
+// IsTriggerReady updates the trigger state with the given input, and returns
+// if the trigger is ready to fire.
Review Comment:
This description is inaccurate, in that it does not mention that it will run
the trigger's `onFire` logic, hence the state will no longer be in the ready
state.
Perhaps
```
// IsTriggerReady updates the trigger state with the given input, checks if
the trigger is ready and, if so,
// advances state according to the trigger's semantics. Returns `true` if
the trigger was fired.
```
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,553 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)
+// This file is intended to consolidate handling of WindowingStrategy and
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
+
+ Trigger Trigger // Evaluated during execution.
+}
+
+// IsTriggerReady updates the trigger state with the given input, and returns
+// if the trigger is ready to fire.
+func (ws WinStrat) IsTriggerReady(input triggerInput, state *StateData) bool {
+ ws.Trigger.onElement(input, state)
+
+ if ws.Trigger.shouldFire(state) {
+ ws.Trigger.onFire(state)
+ return true
+ }
+ return false
}
+// EarliestCompletion marks when we can close a window.
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}
+func (ws WinStrat) IsNeverTrigger() bool {
+ _, ok := ws.Trigger.(*TriggerNever)
+ return ok
+}
+
func (ws WinStrat) String() string {
- return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
+ return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]",
ws.AllowedLateness, ws.Trigger)
+}
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+ newElementCount int // The number of new elements since the last
check.
+ endOfWindowReached bool // Whether or not the end of the window has
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy. A trigger
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+ reset(state *StateData)
+
+ // onElement updates the trigger state based on the provided input.
This may
+ // transition triggers into a fireable state, but will never make them
"finished".
+ onElement(input triggerInput, state *StateData)
+ // shouldFire returns whether the trigger is able to fire or not.
+ shouldFire(state *StateData) bool
+ // onFire commits that the trigger has fired, so triggers may
transition to
+ // a finished state.
+ onFire(state *StateData)
+
+ // TODO handle https://github.com/apache/beam/issues/31438 merging
triggers and state for merging windows (sessions, but also custom merging
windows)
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+ // finished indicates if the trigger has already fired or not.
+ finished bool
+ // extra is where additional data can be stored.
+ extra any
+}
+
+func (ts triggerState) String() string {
+ return fmt.Sprintf("triggerState[finished: %v; state: %v]",
ts.finished, ts.extra)
+}
+
+// nullTrigger is a 0 size object that exists to be embedded in triggers that
+// perform no action on trigger method calls. Triggers with this embedded will
+// gain an implementation of the trigger methods that do nothing, and behavior
+// must be overridden by the trigger for correct evaluation.
+type nullTrigger struct{}
+
+func (nullTrigger) onElement(triggerInput, *StateData) {}
+func (nullTrigger) onFire(*StateData) {}
+func (nullTrigger) reset(*StateData) {}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window
expiration.
+type TriggerNever struct{ nullTrigger }
+
+func (*TriggerNever) shouldFire(*StateData) bool {
+ return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+func (t *TriggerNever) String() string {
+ return "Never"
+}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{ nullTrigger }
+
+func (*TriggerAlways) shouldFire(*StateData) bool {
+ return true
+}
+
+func subTriggersOnElement(t Trigger, input triggerInput, state *StateData,
subTriggers []Trigger) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ for _, sub := range subTriggers {
+ sub.onElement(input, state)
+ }
+}
+
+func subTriggersReset(t Trigger, state *StateData, subTriggers []Trigger) {
+ for _, sub := range subTriggers {
+ sub.reset(state)
+ }
+ delete(state.Trigger, t)
+}
+
+func triggerClearAndFinish(t Trigger, state *StateData) {
+ t.reset(state)
+ ts := state.getTriggerState(t)
+ ts.finished = true
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAlways) String() string {
+ return "Always"
+}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAll) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ shouldFire := true
+ for _, sub := range t.SubTriggers {
+ shouldFire = shouldFire && sub.shouldFire(state)
+ }
+ return shouldFire
+}
+
+func (t *TriggerAfterAll) onFire(state *StateData) {
+ unfinished := false
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ sub.onFire(state)
+ }
+ if !state.getTriggerState(sub).finished {
+ unfinished = true
+ }
+ }
+ if unfinished {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAll) String() string {
+ return fmt.Sprintf("AfterAll[%v]", t.SubTriggers)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAny) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ return true
+ }
+ }
+ return false
+}
+
+func (t *TriggerAfterAny) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAny) String() string {
+ return fmt.Sprintf("AfterAny[%v]", t.SubTriggers)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+ // Only update the first unfinished sub trigger.
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ sub.onElement(input, state)
+ return
+ }
+}
+
+func (t *TriggerAfterEach) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ return sub.shouldFire(state)
+ }
+ return false
+}
+
+func (t *TriggerAfterEach) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ sub.onFire(state)
+ if !state.getTriggerState(sub).finished {
+ return
+ }
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterEach) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterEach) String() string {
+ return fmt.Sprintf("AfterEach[%v]", t.SubTriggers)
+}
+
+// TriggerElementCount triggers when there have been at least the required
number
+// of elements have arrived.
+//
+// TriggerElementCount stores the current element count in it's extra state
field.
+type TriggerElementCount struct {
+ ElementCount int
+}
+
+func (t *TriggerElementCount) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ if ts.extra == nil {
+ ts.extra = int(0)
+ }
+ count := ts.extra.(int) + input.newElementCount
+ ts.extra = count
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerElementCount) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
Review Comment:
Noting that the Java SDK factored it so that an overarching structure
watched the state for finished or not, saving the copy/paste of this check into
every `shouldFire`. But again this call should be moot because the overaching
thing should never be checking `shouldFire` for a trigger that is finished.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)
+// This file is intended to consolidate handling of WindowingStrategy and
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
}
+// EarliestCompletion marks when we can close a window.
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}
func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
}
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+ newElementCount int // The number of new elements since the last
check.
+ endOfWindowReached bool // Whether or not the end of the window has
been reached.
Review Comment:
Plumbing and int and bool are equivalent. The real question is which piece
of code "knows" what the trigger's semantics are relative to the state. This is
fine, though not how I would think of it, since it erases information early and
the context of the trigger has to hold knowledge that rightfully belongs to the
trigger, in my view.
This will change when you have processing time triggers, in which the
trigger can target processing time via transformations like rounding and
offsets. Actually the watermark trigger used to also have that capability but
happily it was removed in
https://github.com/apache/beam/commit/cff1652fb6afbc362704f3d4702a569049ef164a
No action required, just FYIs and bikeshed.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,552 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)
+// This file is intended to consolidate handling of WindowingStrategy and
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
+
+ Trigger Trigger // Evaluated during execution.
+}
+
+// IsTriggerReady updates the trigger state with the given input, and returns
+// if the trigger is ready to fire.
+func (ws WinStrat) IsTriggerReady(input triggerInput, state *StateData) bool {
+ ws.Trigger.onElement(input, state)
+
+ if ws.Trigger.shouldFire(state) {
+ ws.Trigger.onFire(state)
+ return true
+ }
+ return false
}
+// EarliestCompletion marks when we can close a window.
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}
+func (ws WinStrat) IsNeverTrigger() bool {
+ _, ok := ws.Trigger.(*TriggerNever)
+ return ok
+}
+
func (ws WinStrat) String() string {
- return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
+ return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]",
ws.AllowedLateness, ws.Trigger)
+}
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+ newElementCount int // The number of new elements since the last
check.
+ endOfWindowReached bool // Whether or not the end of the window has
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy. A trigger
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+ reset(state *StateData)
+
+ // onElement updates the trigger state based on the provided input.
This may
+ // transition triggers into a fireable state, but will never make them
"finished".
+ onElement(input triggerInput, state *StateData)
+ // shouldFire returns whether the trigger is able to fire or not.
+ shouldFire(state *StateData) bool
+ // onFire commits that the trigger has fired, so triggers may
transition to
+ // a finished state.
+ onFire(state *StateData)
+
+ // TODO merging triggers and state for merging windows
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+ // finished indicates if the trigger has already fired or not.
+ finished bool
+ // extra is where additional data can be stored.
+ extra any
+}
+
+func (ts triggerState) String() string {
+ return fmt.Sprintf("triggerState[finished: %v; state: %v]",
ts.finished, ts.extra)
+}
+
+// nullTrigger is a 0 size object that exists to be embedded in triggers that
+// perform no action on trigger method calls. Triggers with this embedded will
+// gain an implementation of the trigger methods that do nothing, and behavior
+// must be overridden by the trigger for correct evaluation.
+type nullTrigger struct{}
+
+func (nullTrigger) onElement(triggerInput, *StateData) {}
+func (nullTrigger) onFire(*StateData) {}
+func (nullTrigger) reset(*StateData) {}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window
expiration.
+type TriggerNever struct{ nullTrigger }
+
+func (*TriggerNever) shouldFire(*StateData) bool {
+ return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+func (t *TriggerNever) String() string {
+ return "Never"
+}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{ nullTrigger }
+
+func (*TriggerAlways) shouldFire(*StateData) bool {
+ return true
+}
+
+func subTriggersOnElement(t Trigger, input triggerInput, state *StateData,
subTriggers []Trigger) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ for _, sub := range subTriggers {
+ sub.onElement(input, state)
+ }
+}
+
+func subTriggersReset(t Trigger, state *StateData, subTriggers []Trigger) {
+ for _, sub := range subTriggers {
+ sub.reset(state)
+ }
+ delete(state.Trigger, t)
+}
+
+func triggerClearAndFinish(t Trigger, state *StateData) {
+ t.reset(state)
+ ts := state.getTriggerState(t)
+ ts.finished = true
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAlways) String() string {
+ return "Always"
+}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAll) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ shouldFire := true
+ for _, sub := range t.SubTriggers {
+ shouldFire = shouldFire && sub.shouldFire(state)
+ }
+ return shouldFire
+}
+
+func (t *TriggerAfterAll) onFire(state *StateData) {
+ unfinished := false
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ sub.onFire(state)
+ }
+ if !state.getTriggerState(sub).finished {
+ unfinished = true
+ }
+ }
+ if unfinished {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAll) String() string {
+ return fmt.Sprintf("AfterAll[%v]", t.SubTriggers)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAny) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ return true
+ }
+ }
+ return false
+}
+
+func (t *TriggerAfterAny) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAny) String() string {
+ return fmt.Sprintf("AfterAny[%v]", t.SubTriggers)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+ // Only update the first unfinished sub trigger.
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ sub.onElement(input, state)
+ return
+ }
+}
+
+func (t *TriggerAfterEach) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ return sub.shouldFire(state)
+ }
+ return false
+}
+
+func (t *TriggerAfterEach) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ sub.onFire(state)
+ if !state.getTriggerState(sub).finished {
+ return
+ }
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterEach) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterEach) String() string {
+ return fmt.Sprintf("AfterEach[%v]", t.SubTriggers)
+}
+
+// TriggerElementCount triggers when there have been at least the required
number
+// of elements have arrived.
+//
+// TriggerElementCount stores the current element count in it's extra state
field.
+type TriggerElementCount struct {
+ ElementCount int
+}
+
+func (t *TriggerElementCount) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ if ts.extra == nil {
+ ts.extra = int(0)
+ }
+ count := ts.extra.(int) + input.newElementCount
+ ts.extra = count
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerElementCount) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ if ts.extra == nil {
+ return false
+ }
+ return ts.extra.(int) >= t.ElementCount
+}
+
+func (t *TriggerElementCount) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ ts := state.getTriggerState(t)
+ ts.finished = true
+ ts.extra = nil
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerElementCount) reset(state *StateData) {
+ delete(state.Trigger, t)
+}
+
+func (t *TriggerElementCount) String() string {
+ return fmt.Sprintf("ElementCount[%v]", t.ElementCount)
+}
+
+// TriggerOrFinally is ready whenever either of it's subtriggers fire.
+// Ceases to be ready after the Finally trigger shouldFire.
+type TriggerOrFinally struct {
+ Main Trigger // repeated
+ Finally Trigger // terminates execution.
+}
+
+func (t *TriggerOrFinally) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+ t.Main.onElement(input, state)
+ t.Finally.onElement(input, state)
+}
+
+func (t *TriggerOrFinally) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ return t.Main.shouldFire(state) || t.Finally.shouldFire(state)
}
+
+func (t *TriggerOrFinally) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ if t.Finally.shouldFire(state) {
+ t.Finally.onFire(state)
+ ts := state.getTriggerState(t)
+ ts.finished = true
+ state.setTriggerState(t, ts)
+ } else {
+ t.Main.onFire(state)
+ if state.getTriggerState(t.Main).finished {
+ t.Main.reset(state)
+ }
+ }
+}
+
+func (t *TriggerOrFinally) reset(state *StateData) {
+ t.Main.reset(state)
+ t.Finally.reset(state)
+ delete(state.Trigger, t)
+}
+
+func (t *TriggerOrFinally) String() string {
+ return fmt.Sprintf("OrFinally[Repeat:%v Until:%v]", t.Main, t.Finally)
+}
+
+// TriggerRepeatedly is a composite trigger that will fire whenever the
Repeated trigger is ready.
+// If the Repeated trigger is finished, it's state will be reset.
+type TriggerRepeatedly struct {
+ Repeated Trigger
+}
+
+func (t *TriggerRepeatedly) onElement(input triggerInput, state *StateData) {
+ t.Repeated.onElement(input, state)
+}
+
+func (t *TriggerRepeatedly) shouldFire(state *StateData) bool {
+ return t.Repeated.shouldFire(state)
+}
+
+func (t *TriggerRepeatedly) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ t.Repeated.onFire(state)
+ // If the subtrigger is finished, reset it.
+ if repeatedTs := state.getTriggerState(t.Repeated); repeatedTs.finished
{
+ t.Repeated.reset(state)
+ }
+}
+
+func (t *TriggerRepeatedly) reset(state *StateData) {
+ t.Repeated.reset(state)
+ delete(state.Trigger, t)
+}
+
+func (t *TriggerRepeatedly) String() string {
+ return fmt.Sprintf("Repeat[%v]", t.Repeated)
+}
+
+// TriggerAfterEndOfWindow is a composite trigger that will fire whenever the
+// the early Triggers are ready prior to the end of window, implicitly
repeated.
+// After the end of window, the Late trigger will be implicitly repeated.
+//
+// Uses the extra state field to track if the end of the window has been
reached.
+type TriggerAfterEndOfWindow struct {
+ Early, Late Trigger
+}
+
+func (t *TriggerAfterEndOfWindow) onElement(input triggerInput, state
*StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+ if ts.extra == nil {
+ ts.extra = false
+ }
+ previouslyEndOfWindow := ts.extra.(bool)
+ if !previouslyEndOfWindow && input.endOfWindowReached {
+ // We have transitioned. Clear early state and mark it finished
+ triggerClearAndFinish(t.Early, state)
+ if t.Late == nil {
+ triggerClearAndFinish(t, state)
+ return
+ }
+ }
+ ts.extra = input.endOfWindowReached
+ state.setTriggerState(t, ts)
+
+ if !state.getTriggerState(t.Early).finished {
+ t.Early.onElement(input, state)
+ return
+ } else if t.Late != nil {
+ t.Late.onElement(input, state)
+ }
+}
+
+func (t *TriggerAfterEndOfWindow) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ if !state.getTriggerState(t.Early).finished {
+ return t.Early.shouldFire(state) || ts.extra.(bool)
+ } else if t.Late == nil {
+ return false
+ }
+ return t.Late.shouldFire(state)
+}
+
+func (t *TriggerAfterEndOfWindow) onFire(state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+ if !state.getTriggerState(t.Early).finished {
+ if t.Early.shouldFire(state) {
Review Comment:
What you describe could cause weirdness for a corner case that is rare but I
got called out for: if someone has windowing but also the trigger is
`Repeat(ElementCount(n)` or `Repeat(ProcessingTime(30s))` then they are not
expecting an extra firing when the watermark crosses the end of the window.
Presumably Prism would emit at that point. I honestly doubt any reasonable
pipeline would notice the difference, but there you have it. (note that for a
trigger `Repeat(ProcessingTime(30s))` we still mark the final non-late pane as
"on time")
In the Java implementation the end-of-window firing is still governed by the
trigger (but _also_ governed by `OnTimeBehavior`), but the expiry firing is
governed only by `ClosingBehavior`.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,496 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)
+// This file is intended to consolidate handling of WindowingStrategy and
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
+
+ Trigger Trigger // Evaluated during execution.
}
+// IsTriggerReady updates and returns wether the trigger is able to fire or
not.
+func (ws WinStrat) IsTriggerReady(input triggerInput, state *StateData) bool {
+ ws.Trigger.onElement(input, state)
+
+ if ws.Trigger.shouldFire(state) {
+ ws.Trigger.onFire(state)
+ return true
+ }
+ return false
+}
+
+// EarliestCompletion marks when we can close a window.
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}
func (ws WinStrat) String() string {
return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
}
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+ newElementCount int // The number of new elements since the last
check.
+ endOfWindowReached bool // Whether or not the end of the window has
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy. A trigger
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+ reset(state *StateData)
+
+ // onElement updates the trigger state based on the provided input.
This may
+ // transition triggers into a fireable state, but will never make them
"finished".
+ onElement(input triggerInput, state *StateData)
+ // shouldFire returns whether the trigger is able to fire or not.
+ shouldFire(state *StateData) bool
+ // onFire commits that the trigger has fired, so triggers may
transition to
+ // a finished state.
+ onFire(state *StateData)
+
+ // TODO merging triggers and state for merging windows
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+ // finished indicates if the trigger has already fired or not.
+ finished bool
+ // extra is where additional data can be stored.
+ extra any
+}
+
+// nullTrigger is a 0 size object that exists to be embedded in triggers that
+// perform no action on trigger method calls. Triggers with this embedded will
+// gain an implementation of the trigger methods that do nothing, and behavior
+// must be overridden by the trigger for correct evaluation.
+type nullTrigger struct{}
+
+func (nullTrigger) onElement(triggerInput, *StateData) {}
+func (nullTrigger) onFire(*StateData) {}
+func (nullTrigger) reset(*StateData) {}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window
expiration.
+type TriggerNever struct{ nullTrigger }
+
+func (*TriggerNever) shouldFire(*StateData) bool {
+ return false
+}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{ nullTrigger }
+
+func (*TriggerAlways) shouldFire(*StateData) bool {
+ return true
+}
+
+func subTriggersOnElement(t Trigger, input triggerInput, state *StateData,
subTriggers []Trigger) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ for _, sub := range subTriggers {
+ sub.onElement(input, state)
+ }
+}
+
+func subTriggersReset(t Trigger, state *StateData, subTriggers []Trigger) {
+ for _, sub := range subTriggers {
+ sub.reset(state)
+ }
+ delete(state.Trigger, t)
+}
+
+func triggerClearAndFinish(t Trigger, state *StateData) {
+ t.reset(state)
+ ts := state.getTriggerState(t)
+ ts.finished = true
+ state.setTriggerState(t, ts)
+}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAll) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ shouldFire := true
+ for _, sub := range t.SubTriggers {
+ shouldFire = shouldFire && sub.shouldFire(state)
+ }
+ return shouldFire
+}
+
+func (t *TriggerAfterAll) onFire(state *StateData) {
+ unfinished := false
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ sub.onFire(state)
+ }
+ if !state.getTriggerState(sub).finished {
+ unfinished = true
+ }
+ }
+ if unfinished {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAny) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ return true
+ }
+ }
+ return false
+}
+
+func (t *TriggerAfterAny) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+ // Only update the first unfinished sub trigger.
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ sub.onElement(input, state)
+ return
+ }
+}
+
+func (t *TriggerAfterEach) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ return sub.shouldFire(state)
+ }
+ return false
+}
+
+func (t *TriggerAfterEach) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ sub.onFire(state)
+ if !state.getTriggerState(sub).finished {
+ return
+ }
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterEach) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+// TriggerElementCount triggers when there have been at least the required
number
+// of elements have arrived.
+//
+// TriggerElementCount stores the current element count in it's extra state
field.
+type TriggerElementCount struct {
+ ElementCount int
+}
+
+func (t *TriggerElementCount) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ if ts.extra == nil {
+ ts.extra = int(0)
+ }
+ count := ts.extra.(int) + input.newElementCount
+ ts.extra = count
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerElementCount) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ if ts.extra == nil {
+ return false
+ }
+ return ts.extra.(int) >= t.ElementCount
Review Comment:
I think the `shouldFire` method can be called willy nilly before, during,
after, the `onElement` method without panicking. It is just that the reaction
to the elements you've presented may transition it from `false` to `true`.
I believe `ElementCount(0)` and `ElementCount(1)` have the same pragmatics.
No reason to make a corner case where we don't need one. Every integer <= 1
just has the same behavior.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,553 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)
+// This file is intended to consolidate handling of WindowingStrategy and
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
// WinStrat configures the windowing strategy for the stage, based on the
// stage's input PCollection.
type WinStrat struct {
AllowedLateness time.Duration // Used to extend duration
+
+ Trigger Trigger // Evaluated during execution.
+}
+
+// IsTriggerReady updates the trigger state with the given input, and returns
+// if the trigger is ready to fire.
+func (ws WinStrat) IsTriggerReady(input triggerInput, state *StateData) bool {
+ ws.Trigger.onElement(input, state)
+
+ if ws.Trigger.shouldFire(state) {
+ ws.Trigger.onFire(state)
+ return true
+ }
+ return false
}
+// EarliestCompletion marks when we can close a window.
func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.AllowedLateness)
}
+func (ws WinStrat) IsNeverTrigger() bool {
+ _, ok := ws.Trigger.(*TriggerNever)
+ return ok
+}
+
func (ws WinStrat) String() string {
- return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
+ return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]",
ws.AllowedLateness, ws.Trigger)
+}
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+ newElementCount int // The number of new elements since the last
check.
+ endOfWindowReached bool // Whether or not the end of the window has
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy. A trigger
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+ reset(state *StateData)
+
+ // onElement updates the trigger state based on the provided input.
This may
+ // transition triggers into a fireable state, but will never make them
"finished".
+ onElement(input triggerInput, state *StateData)
+ // shouldFire returns whether the trigger is able to fire or not.
+ shouldFire(state *StateData) bool
+ // onFire commits that the trigger has fired, so triggers may
transition to
+ // a finished state.
+ onFire(state *StateData)
+
+ // TODO handle https://github.com/apache/beam/issues/31438 merging
triggers and state for merging windows (sessions, but also custom merging
windows)
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+ // finished indicates if the trigger has already fired or not.
+ finished bool
+ // extra is where additional data can be stored.
+ extra any
+}
+
+func (ts triggerState) String() string {
+ return fmt.Sprintf("triggerState[finished: %v; state: %v]",
ts.finished, ts.extra)
+}
+
+// nullTrigger is a 0 size object that exists to be embedded in triggers that
+// perform no action on trigger method calls. Triggers with this embedded will
+// gain an implementation of the trigger methods that do nothing, and behavior
+// must be overridden by the trigger for correct evaluation.
+type nullTrigger struct{}
+
+func (nullTrigger) onElement(triggerInput, *StateData) {}
+func (nullTrigger) onFire(*StateData) {}
+func (nullTrigger) reset(*StateData) {}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window
expiration.
+type TriggerNever struct{ nullTrigger }
+
+func (*TriggerNever) shouldFire(*StateData) bool {
+ return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+func (t *TriggerNever) String() string {
+ return "Never"
+}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{ nullTrigger }
+
+func (*TriggerAlways) shouldFire(*StateData) bool {
+ return true
+}
+
+func subTriggersOnElement(t Trigger, input triggerInput, state *StateData,
subTriggers []Trigger) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+
+ for _, sub := range subTriggers {
+ sub.onElement(input, state)
+ }
+}
+
+func subTriggersReset(t Trigger, state *StateData, subTriggers []Trigger) {
+ for _, sub := range subTriggers {
+ sub.reset(state)
+ }
+ delete(state.Trigger, t)
+}
+
+func triggerClearAndFinish(t Trigger, state *StateData) {
+ t.reset(state)
+ ts := state.getTriggerState(t)
+ ts.finished = true
+ state.setTriggerState(t, ts)
+}
+
+func (t *TriggerAlways) String() string {
+ return "Always"
+}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAll) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ shouldFire := true
+ for _, sub := range t.SubTriggers {
+ shouldFire = shouldFire && sub.shouldFire(state)
+ }
+ return shouldFire
+}
+
+func (t *TriggerAfterAll) onFire(state *StateData) {
+ unfinished := false
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ sub.onFire(state)
+ }
+ if !state.getTriggerState(sub).finished {
+ unfinished = true
+ }
+ }
+ if unfinished {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAll) String() string {
+ return fmt.Sprintf("AfterAll[%v]", t.SubTriggers)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) onElement(input triggerInput, state *StateData) {
+ subTriggersOnElement(t, input, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAny) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if sub.shouldFire(state) {
+ return true
+ }
+ }
+ return false
+}
+
+func (t *TriggerAfterAny) onFire(state *StateData) {
+ if !t.shouldFire(state) {
+ return
+ }
+ triggerClearAndFinish(t, state)
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+ subTriggersReset(t, state, t.SubTriggers)
+}
+
+func (t *TriggerAfterAny) String() string {
+ return fmt.Sprintf("AfterAny[%v]", t.SubTriggers)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+ SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) onElement(input triggerInput, state *StateData) {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return
+ }
+ // Only update the first unfinished sub trigger.
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ sub.onElement(input, state)
+ return
+ }
+}
+
+func (t *TriggerAfterEach) shouldFire(state *StateData) bool {
+ ts := state.getTriggerState(t)
+ if ts.finished {
+ return false
+ }
+ for _, sub := range t.SubTriggers {
+ if state.getTriggerState(sub).finished {
+ continue
+ }
+ return sub.shouldFire(state)
+ }
+ return false
+}
+
+func (t *TriggerAfterEach) onFire(state *StateData) {
+ if !t.shouldFire(state) {
Review Comment:
This is interesting. In general I don't think each `onFire` needs to check
`shouldFire` and exit early. Even if it represents an erroneous call in that
state, it will cause probably spookier behavior than the behavior that would
result from it just doing what the caller says.
It is probably fine either way. The Java codebase does it differently, with
the only such check being in the `AfterFirstStateMachine` call for `onFire`
which checks which of its subtriggers is eligible for fire and tells them to
fire. TBH these are all corners of triggers semantics with no user value and
probably no users. I'm OK with it the way it is, too. Python simply didn't
implement any of the triggers that would be problematic.
Incidentally this is a reason to combine `shouldFire`, `onFire`, and
`onBundleMetadata` into one call like you had before :-)
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go:
##########
@@ -44,3 +44,391 @@ func TestEarliestCompletion(t *testing.T) {
}
}
}
+
+func TestTriggers_isReady(t *testing.T) {
Review Comment:
Aha I found the file:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
There are fewer cases in here that I would expect/hope. But this is where
Robert started to gather them in an SDK-independent way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]