kennknowles commented on code in PR #33763:
URL: https://github.com/apache/beam/pull/33763#discussion_r1930853432


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1395,6 +1408,8 @@ keysPerBundle:
                                holdsInBundle[e.holdTimestamp] += 1
                                // Clear the "fired" timer so subsequent 
matches can be ignored.
                                delete(dnt.timers, timerKey{family: e.family, 
tag: e.tag, window: e.window})
+                       } else {
+                               dataInBundle = true

Review Comment:
   style nit: prefer to make the shorter block the `then` clause of an if 
statement



##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 )
 
+// This file is intended to consolidate handling of WindowingStrategy and 
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in 
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per 
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
 // WinStrat configures the windowing strategy for the stage, based on the
 // stage's input PCollection.
 type WinStrat struct {
        AllowedLateness time.Duration // Used to extend duration
 }
 
+// EarliestCompletion marks when we can close a window.
 func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
        return w.MaxTimestamp().Add(ws.AllowedLateness)
 }
 
 func (ws WinStrat) String() string {
        return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
 }
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+       newElementCount    int  // The number of new elements since the last 
check.
+       endOfWindowReached bool // Whether or not the end of the window has 
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy.  A trigger 
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+       // IsReady determines if the trigger is ready to fire based on the 
current state.
+       isReady(input triggerInput, state *StateData) bool
+       reset(state *StateData)
+
+       // TODO merging triggers and state for merging windows
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+       // finished indicates if the trigger has already fired or not.
+       finished bool
+       // extra is where additional data can be stored.
+       extra any
+}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window 
expiration.
+type TriggerNever struct{}
+
+func (*TriggerNever) isReady(triggerInput, *StateData) bool {
+       return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window 
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{}
+
+func (*TriggerAlways) isReady(triggerInput, *StateData) bool {
+       return true
+}
+
+func (t *TriggerAlways) reset(state *StateData) {}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       // Track fired states.
+       if ts.extra == nil {
+               ts.extra = map[Trigger]bool{}
+       }
+       isFinished := ts.extra.(map[Trigger]bool)
+       for _, t := range t.SubTriggers {
+               // Don't re-evaluate.
+               if isFinished[t] {
+                       continue
+               }
+               if t.isReady(input, state) {
+                       isFinished[t] = true
+               }
+       }
+       ready := len(isFinished) == len(t.SubTriggers)
+       if ready {
+               ts.finished = true
+       }
+       state.setTriggerState(t, ts)
+       return ready
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       anyReady := false
+       for _, t := range t.SubTriggers {
+               anyReady = t.isReady(input, state)
+               if anyReady {
+                       break
+               }
+       }
+       if anyReady {
+               ts.finished = true
+               for _, sub := range t.SubTriggers {
+                       // clear any subtrigger state, as we're fireing anyway.
+                       sub.reset(state)
+               }
+       }
+       state.setTriggerState(t, ts)
+       return anyReady
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       // Use extra for the current trigger index.
+       if ts.extra == nil {
+               ts.extra = int(0)
+       }
+       current := ts.extra.(int)
+       ready := false
+       if t.SubTriggers[current].isReady(input, state) {
+               current++

Review Comment:
   `isReady` should be stateless. A surrounding trigger (or other context) 
could inquire if a trigger is ready but then choose not to fire it. For example 
`AfterAll(AfterEach(...), Never())`
   
   I think you probably do need to add an `onFire` method to do the state 
advancement. It is also where `Repeat` should reset its subtrigger.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 )
 
+// This file is intended to consolidate handling of WindowingStrategy and 
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in 
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per 
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
 // WinStrat configures the windowing strategy for the stage, based on the
 // stage's input PCollection.
 type WinStrat struct {
        AllowedLateness time.Duration // Used to extend duration
 }
 
+// EarliestCompletion marks when we can close a window.
 func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
        return w.MaxTimestamp().Add(ws.AllowedLateness)
 }
 
 func (ws WinStrat) String() string {
        return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
 }
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+       newElementCount    int  // The number of new elements since the last 
check.
+       endOfWindowReached bool // Whether or not the end of the window has 
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy.  A trigger 
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+       // IsReady determines if the trigger is ready to fire based on the 
current state.
+       isReady(input triggerInput, state *StateData) bool
+       reset(state *StateData)
+
+       // TODO merging triggers and state for merging windows
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+       // finished indicates if the trigger has already fired or not.
+       finished bool
+       // extra is where additional data can be stored.
+       extra any
+}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window 
expiration.
+type TriggerNever struct{}
+
+func (*TriggerNever) isReady(triggerInput, *StateData) bool {
+       return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window 
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{}
+
+func (*TriggerAlways) isReady(triggerInput, *StateData) bool {
+       return true
+}
+
+func (t *TriggerAlways) reset(state *StateData) {}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       // Track fired states.

Review Comment:
   Don't need to track fired states. We just don't fire the subtriggers until 
they are all ready. And since this trigger is "finished" after a single firing, 
it doesn't matter what the subtriggers say about being finished. Incidentally I 
suggest explicitly documenting the types of the `extra` state for each trigger 
state machine.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 )
 
+// This file is intended to consolidate handling of WindowingStrategy and 
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in 
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per 
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
 // WinStrat configures the windowing strategy for the stage, based on the
 // stage's input PCollection.
 type WinStrat struct {
        AllowedLateness time.Duration // Used to extend duration
 }
 
+// EarliestCompletion marks when we can close a window.
 func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
        return w.MaxTimestamp().Add(ws.AllowedLateness)
 }
 
 func (ws WinStrat) String() string {
        return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
 }
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+       newElementCount    int  // The number of new elements since the last 
check.
+       endOfWindowReached bool // Whether or not the end of the window has 
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy.  A trigger 
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+       // IsReady determines if the trigger is ready to fire based on the 
current state.
+       isReady(input triggerInput, state *StateData) bool

Review Comment:
   `isReady` should be stateless and separated from methods that advance the 
state (`onElement`, `onFire` etc). See example below in `AfterEach`. This 
factoring is actually among the most important benefits of moving to the 
`isReady` model.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 )
 
+// This file is intended to consolidate handling of WindowingStrategy and 
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in 
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per 
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
 // WinStrat configures the windowing strategy for the stage, based on the
 // stage's input PCollection.
 type WinStrat struct {
        AllowedLateness time.Duration // Used to extend duration
 }
 
+// EarliestCompletion marks when we can close a window.
 func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
        return w.MaxTimestamp().Add(ws.AllowedLateness)
 }
 
 func (ws WinStrat) String() string {
        return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
 }
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+       newElementCount    int  // The number of new elements since the last 
check.
+       endOfWindowReached bool // Whether or not the end of the window has 
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy.  A trigger 
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+       // IsReady determines if the trigger is ready to fire based on the 
current state.
+       isReady(input triggerInput, state *StateData) bool
+       reset(state *StateData)
+
+       // TODO merging triggers and state for merging windows
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+       // finished indicates if the trigger has already fired or not.
+       finished bool
+       // extra is where additional data can be stored.
+       extra any
+}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window 
expiration.
+type TriggerNever struct{}
+
+func (*TriggerNever) isReady(triggerInput, *StateData) bool {
+       return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window 
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{}
+
+func (*TriggerAlways) isReady(triggerInput, *StateData) bool {
+       return true
+}
+
+func (t *TriggerAlways) reset(state *StateData) {}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       // Track fired states.
+       if ts.extra == nil {
+               ts.extra = map[Trigger]bool{}
+       }
+       isFinished := ts.extra.(map[Trigger]bool)
+       for _, t := range t.SubTriggers {
+               // Don't re-evaluate.
+               if isFinished[t] {
+                       continue
+               }
+               if t.isReady(input, state) {
+                       isFinished[t] = true
+               }
+       }
+       ready := len(isFinished) == len(t.SubTriggers)
+       if ready {
+               ts.finished = true
+       }
+       state.setTriggerState(t, ts)
+       return ready
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       anyReady := false
+       for _, t := range t.SubTriggers {
+               anyReady = t.isReady(input, state)
+               if anyReady {
+                       break
+               }
+       }
+       if anyReady {
+               ts.finished = true
+               for _, sub := range t.SubTriggers {
+                       // clear any subtrigger state, as we're fireing anyway.
+                       sub.reset(state)
+               }
+       }
+       state.setTriggerState(t, ts)
+       return anyReady
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       // Use extra for the current trigger index.
+       if ts.extra == nil {
+               ts.extra = int(0)
+       }
+       current := ts.extra.(int)
+       ready := false
+       if t.SubTriggers[current].isReady(input, state) {
+               current++
+               ready = true
+       }
+       if current >= len(t.SubTriggers) {
+               ts.finished = true
+       }
+       ts.extra = current
+       state.setTriggerState(t, ts)
+       return ready
+}
+
+func (t *TriggerAfterEach) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerElementCount triggers when there have been at least the required 
number
+// of elements have arrived.
+type TriggerElementCount struct {
+       ElementCount int
+}
+
+func (t *TriggerElementCount) isReady(input triggerInput, state *StateData) 
bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+
+       if ts.extra == nil {
+               ts.extra = int(0)
+       }
+       count := ts.extra.(int) + input.newElementCount
+       ts.extra = count
+       ready := count >= t.ElementCount
+       if ready {
+               ts.finished = true
+       }
+       state.setTriggerState(t, ts)

Review Comment:
   This is benign but also not necessary. Just default it to 0 in `onElement`.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1195,6 +1195,7 @@ func (ss *stageState) AddPending(newPending []element) 
int {
                }
                newPending = origPending
        }
+       //slog.Warn("AddPending", "stage", ss.ID, "pending", newPending)

Review Comment:
   just noting that this is still here



##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 )
 
+// This file is intended to consolidate handling of WindowingStrategy and 
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in 
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per 
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
 // WinStrat configures the windowing strategy for the stage, based on the
 // stage's input PCollection.
 type WinStrat struct {
        AllowedLateness time.Duration // Used to extend duration
 }
 
+// EarliestCompletion marks when we can close a window.
 func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
        return w.MaxTimestamp().Add(ws.AllowedLateness)
 }
 
 func (ws WinStrat) String() string {
        return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
 }
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+       newElementCount    int  // The number of new elements since the last 
check.
+       endOfWindowReached bool // Whether or not the end of the window has 
been reached.

Review Comment:
   I think a trigger can more generally be targeting a particular watermark 
moment or event. I'm not endorsing it, just the way things are.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1526,6 +1541,7 @@ func (ss *stageState) makeInProgressBundle(genBundID 
func() string, toProcess []
        ss.inprogressKeysByBundle[bundID] = newKeys
        ss.inprogressKeys.merge(newKeys)
        ss.inprogressHoldsByBundle[bundID] = holdsInBundle
+       //slog.Warn("makeInProgressBundle", "stage", ss.ID, "toProcess", 
toProcess)

Review Comment:
   ditto



##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -23,16 +23,351 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 )
 
+// This file is intended to consolidate handling of WindowingStrategy and 
trigger
+// logic independantly from the bulk of the ElementManager.
+//
+// Triggers by their nature only apply to Aggregation transforms, in 
particular:
+// GroupByKeys and CoGroupByKeys are aggregations.
+// Triggers also affect downstream side inputs. That is, a side input consumer
+// is vaccuously an aggregation.
+//
+// Triggers are PerKey+PerWindow, and they may or may not use state per 
trigger.
+//
+// The unique state per trigger is the trickiest bit to handle. In principle
+// it could just be handled by the existing state system, which might be
+// sufficient, but it's not prepared for unique tagging per trigger itself.
+// It would also add additional overhead since state is kept as the serialized
+// bytes, instead of in a manipulatable form.
+//
+// Instead, each key+window state cell contains a trigger specific state map,
+// handled via pointer equality from the trigger itself.
+
 // WinStrat configures the windowing strategy for the stage, based on the
 // stage's input PCollection.
 type WinStrat struct {
        AllowedLateness time.Duration // Used to extend duration
 }
 
+// EarliestCompletion marks when we can close a window.
 func (ws WinStrat) EarliestCompletion(w typex.Window) mtime.Time {
        return w.MaxTimestamp().Add(ws.AllowedLateness)
 }
 
 func (ws WinStrat) String() string {
        return fmt.Sprintf("WinStrat[AllowedLateness:%v]", ws.AllowedLateness)
 }
+
+// triggerInput represents a Key + window + stage's trigger conditions.
+type triggerInput struct {
+       newElementCount    int  // The number of new elements since the last 
check.
+       endOfWindowReached bool // Whether or not the end of the window has 
been reached.
+}
+
+// Trigger represents a trigger for a windowing strategy.  A trigger 
determines when
+// to fire a window based on the arrival of elements and the passage of time.
+//
+// See https://s.apache.org/beam-triggers for a more detailed look at triggers.
+type Trigger interface {
+       // IsReady determines if the trigger is ready to fire based on the 
current state.
+       isReady(input triggerInput, state *StateData) bool
+       reset(state *StateData)
+
+       // TODO merging triggers and state for merging windows
+}
+
+// triggerState retains additional state for a given trigger execution.
+// Each trigger is responsible for maintaining it's own state as needed.
+type triggerState struct {
+       // finished indicates if the trigger has already fired or not.
+       finished bool
+       // extra is where additional data can be stored.
+       extra any
+}
+
+// TriggerNever is never ready.
+// There will only be an ON_TIME output and a final output at window 
expiration.
+type TriggerNever struct{}
+
+func (*TriggerNever) isReady(triggerInput, *StateData) bool {
+       return false
+}
+
+func (t *TriggerNever) reset(state *StateData) {}
+
+// TriggerAlways is always ready.
+// There will be an output for every element, and a final output at window 
expiration.
+// Equivalent to TriggerRepeatedly {TriggerElementCount{1}}
+type TriggerAlways struct{}
+
+func (*TriggerAlways) isReady(triggerInput, *StateData) bool {
+       return true
+}
+
+func (t *TriggerAlways) reset(state *StateData) {}
+
+// TriggerAfterAll is ready when all subTriggers are ready.
+// There will be an output when all subTriggers are ready.
+// Logically, an "AND" trigger.
+type TriggerAfterAll struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAll) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       // Track fired states.
+       if ts.extra == nil {
+               ts.extra = map[Trigger]bool{}
+       }
+       isFinished := ts.extra.(map[Trigger]bool)
+       for _, t := range t.SubTriggers {
+               // Don't re-evaluate.
+               if isFinished[t] {
+                       continue
+               }
+               if t.isReady(input, state) {
+                       isFinished[t] = true
+               }
+       }
+       ready := len(isFinished) == len(t.SubTriggers)
+       if ready {
+               ts.finished = true
+       }
+       state.setTriggerState(t, ts)
+       return ready
+}
+
+func (t *TriggerAfterAll) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerAfterAny is ready the first time any of the subTriggers are ready.
+// Logically, an "OR" trigger.
+type TriggerAfterAny struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterAny) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       anyReady := false
+       for _, t := range t.SubTriggers {
+               anyReady = t.isReady(input, state)
+               if anyReady {
+                       break
+               }
+       }
+       if anyReady {
+               ts.finished = true
+               for _, sub := range t.SubTriggers {
+                       // clear any subtrigger state, as we're fireing anyway.
+                       sub.reset(state)
+               }
+       }
+       state.setTriggerState(t, ts)
+       return anyReady
+}
+
+func (t *TriggerAfterAny) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerAfterEach processes each trigger before executing the next.
+// Starting with the first subtrigger, ready when the _current_ subtrigger
+// is ready. After output, advances the current trigger by one.
+type TriggerAfterEach struct {
+       SubTriggers []Trigger
+}
+
+func (t *TriggerAfterEach) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+       // Use extra for the current trigger index.
+       if ts.extra == nil {
+               ts.extra = int(0)
+       }
+       current := ts.extra.(int)
+       ready := false
+       if t.SubTriggers[current].isReady(input, state) {
+               current++
+               ready = true
+       }
+       if current >= len(t.SubTriggers) {
+               ts.finished = true
+       }
+       ts.extra = current
+       state.setTriggerState(t, ts)
+       return ready
+}
+
+func (t *TriggerAfterEach) reset(state *StateData) {
+       for _, sub := range t.SubTriggers {
+               sub.reset(state)
+       }
+       delete(state.Trigger, t)
+}
+
+// TriggerElementCount triggers when there have been at least the required 
number
+// of elements have arrived.
+type TriggerElementCount struct {
+       ElementCount int
+}
+
+func (t *TriggerElementCount) isReady(input triggerInput, state *StateData) 
bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+
+       if ts.extra == nil {
+               ts.extra = int(0)
+       }
+       count := ts.extra.(int) + input.newElementCount
+       ts.extra = count
+       ready := count >= t.ElementCount
+       if ready {
+               ts.finished = true
+       }
+       state.setTriggerState(t, ts)
+       return ready
+}
+
+func (t *TriggerElementCount) reset(state *StateData) {
+       delete(state.Trigger, t)
+}
+
+// TriggerOrFinally is ready whenever either of it's subtriggers fire.
+// Ceases to be ready after the Finally trigger isReady.
+type TriggerOrFinally struct {
+       Main    Trigger // repeated
+       Finally Trigger // terminates execution.
+}
+
+func (t *TriggerOrFinally) isReady(input triggerInput, state *StateData) bool {
+       ts := state.getTriggerState(t)
+       if ts.finished {
+               return false
+       }
+
+       mainReady := t.Main.isReady(input, state)
+       finallyReady := t.Finally.isReady(input, state)
+
+       if mainReady || finallyReady {
+               t.Main.reset(state)
+       }
+
+       if finallyReady {
+               t.Finally.reset(state)
+               ts.finished = true
+       }
+
+       state.setTriggerState(t, ts)
+       return mainReady || finallyReady
+}
+
+func (t *TriggerOrFinally) reset(state *StateData) {
+       t.Main.reset(state)
+       t.Finally.reset(state)
+       delete(state.Trigger, t)
+}
+
+// TriggerRepeatedly is a composite trigger that will fire whenever the 
Repeated trigger is ready.
+// If the Repeated trigger is finished, it's state will be reset.
+type TriggerRepeatedly struct {
+       Repeated Trigger
+}
+
+func (t *TriggerRepeatedly) isReady(input triggerInput, state *StateData) bool 
{
+       mainReady := t.Repeated.isReady(input, state)
+
+       // If the subtrigger is finished, reset it.
+       if repeatedTs := state.getTriggerState(t.Repeated); repeatedTs.finished 
{
+               t.Repeated.reset(state)

Review Comment:
   At this point I'm repeating myself but just to add one more example, this 
reset should happen in `onFire`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to