riteshghorse commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748369532
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
// TODO: Change to call UnixMilli() once we move to only
supporting a go version > 1.17.
offsetMillis = offset.Unix()*1e3 +
int64(offset.Nanosecond())/1e6
}
- tr.TimestampTransforms = append(tr.TimestampTransforms,
AlignToTransform{
+ tr.timestampTransforms = append(tr.timestampTransforms,
AlignToTransform{
Period: int64(period / time.Millisecond),
Offset: offsetMillis,
})
return tr
}
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+ subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+ return t.subTrigger
+}
+
// Repeat constructs a trigger that fires a trigger repeatedly
// once the condition has been met.
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(tr Trigger) Trigger {
- return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+ return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late
firing.
+type AfterEndOfWindowTrigger struct {
+ earlyFiring Trigger
+ lateFiring Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
Review comment:
Makes sense.
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -65,59 +119,13 @@ type AlignToTransform struct {
func (AlignToTransform) timestampTransform() {}
-const (
- DefaultTrigger string = "Trigger_Default_"
- AlwaysTrigger string = "Trigger_Always_"
- AfterAnyTrigger string = "Trigger_AfterAny_"
- AfterAllTrigger string = "Trigger_AfterAll_"
- AfterProcessingTimeTrigger string =
"Trigger_AfterProcessing_Time_"
- ElementCountTrigger string = "Trigger_ElementCount_"
- AfterEndOfWindowTrigger string =
"Trigger_AfterEndOfWindow_"
- RepeatTrigger string = "Trigger_Repeat_"
- OrFinallyTrigger string = "Trigger_OrFinally_"
- NeverTrigger string = "Trigger_Never_"
- AfterSynchronizedProcessingTimeTrigger string =
"Trigger_AfterSynchronizedProcessingTime_"
-)
-
-// Default constructs a default trigger that fires once after the end of
window.
-// Late Data is discarded.
-func Default() Trigger {
- return Trigger{Kind: DefaultTrigger}
-}
-
-// Always constructs a trigger that fires immediately
-// whenever an element is received.
-//
-// Equivalent to trigger.Repeat(trigger.AfterCount(1))
-func Always() Trigger {
- return Trigger{Kind: AlwaysTrigger}
-}
-
-// AfterCount constructs a trigger that fires after
-// at least `count` number of elements are processed.
-func AfterCount(count int32) Trigger {
- return Trigger{Kind: ElementCountTrigger, ElementCount: count}
-}
-
-// AfterProcessingTime constructs a trigger that fires relative to
-// when input first arrives.
-//
-// Must be configured with calls to PlusDelay, or AlignedTo. May be
-// configured with additional delay.
-func AfterProcessingTime() Trigger {
- return Trigger{Kind: AfterProcessingTimeTrigger}
-}
-
// PlusDelay configures an AfterProcessingTime trigger to fire after a
specified delay,
// no smaller than a millisecond.
-func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
- if tr.Kind != AfterProcessingTimeTrigger {
- panic(fmt.Errorf("can't apply processing delay to %s, want:
AfterProcessingTimeTrigger", tr.Kind))
- }
+func (tr *AfterProcessingTimeTrigger) PlusDelay(delay time.Duration) Trigger {
Review comment:
Got it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]