riteshghorse commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r748369532



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {
                // TODO: Change to call UnixMilli() once we move to only 
supporting a go version > 1.17.
                offsetMillis = offset.Unix()*1e3 + 
int64(offset.Nanosecond())/1e6
        }
-       tr.TimestampTransforms = append(tr.TimestampTransforms, 
AlignToTransform{
+       tr.timestampTransforms = append(tr.timestampTransforms, 
AlignToTransform{
                Period: int64(period / time.Millisecond),
                Offset: offsetMillis,
        })
        return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+       subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+       return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-       return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+       return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late 
firing.
+type AfterEndOfWindowTrigger struct {
+       earlyFiring Trigger
+       lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {

Review comment:
       Makes sense.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -65,59 +119,13 @@ type AlignToTransform struct {
 
 func (AlignToTransform) timestampTransform() {}
 
-const (
-       DefaultTrigger                         string = "Trigger_Default_"
-       AlwaysTrigger                          string = "Trigger_Always_"
-       AfterAnyTrigger                        string = "Trigger_AfterAny_"
-       AfterAllTrigger                        string = "Trigger_AfterAll_"
-       AfterProcessingTimeTrigger             string = 
"Trigger_AfterProcessing_Time_"
-       ElementCountTrigger                    string = "Trigger_ElementCount_"
-       AfterEndOfWindowTrigger                string = 
"Trigger_AfterEndOfWindow_"
-       RepeatTrigger                          string = "Trigger_Repeat_"
-       OrFinallyTrigger                       string = "Trigger_OrFinally_"
-       NeverTrigger                           string = "Trigger_Never_"
-       AfterSynchronizedProcessingTimeTrigger string = 
"Trigger_AfterSynchronizedProcessingTime_"
-)
-
-// Default constructs a default trigger that fires once after the end of 
window.
-// Late Data is discarded.
-func Default() Trigger {
-       return Trigger{Kind: DefaultTrigger}
-}
-
-// Always constructs a trigger that fires immediately
-// whenever an element is received.
-//
-// Equivalent to trigger.Repeat(trigger.AfterCount(1))
-func Always() Trigger {
-       return Trigger{Kind: AlwaysTrigger}
-}
-
-// AfterCount constructs a trigger that fires after
-// at least `count` number of elements are processed.
-func AfterCount(count int32) Trigger {
-       return Trigger{Kind: ElementCountTrigger, ElementCount: count}
-}
-
-// AfterProcessingTime constructs a trigger that fires relative to
-// when input first arrives.
-//
-// Must be configured with calls to PlusDelay, or AlignedTo. May be
-// configured with additional delay.
-func AfterProcessingTime() Trigger {
-       return Trigger{Kind: AfterProcessingTimeTrigger}
-}
-
 // PlusDelay configures an AfterProcessingTime trigger to fire after a 
specified delay,
 // no smaller than a millisecond.
-func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
-       if tr.Kind != AfterProcessingTimeTrigger {
-               panic(fmt.Errorf("can't apply processing delay to %s, want: 
AfterProcessingTimeTrigger", tr.Kind))
-       }
+func (tr *AfterProcessingTimeTrigger) PlusDelay(delay time.Duration) Trigger {

Review comment:
       Got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to