lostluck commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r747922760



##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -65,59 +119,13 @@ type AlignToTransform struct {
 
 func (AlignToTransform) timestampTransform() {}
 
-const (
-       DefaultTrigger                         string = "Trigger_Default_"
-       AlwaysTrigger                          string = "Trigger_Always_"
-       AfterAnyTrigger                        string = "Trigger_AfterAny_"
-       AfterAllTrigger                        string = "Trigger_AfterAll_"
-       AfterProcessingTimeTrigger             string = 
"Trigger_AfterProcessing_Time_"
-       ElementCountTrigger                    string = "Trigger_ElementCount_"
-       AfterEndOfWindowTrigger                string = 
"Trigger_AfterEndOfWindow_"
-       RepeatTrigger                          string = "Trigger_Repeat_"
-       OrFinallyTrigger                       string = "Trigger_OrFinally_"
-       NeverTrigger                           string = "Trigger_Never_"
-       AfterSynchronizedProcessingTimeTrigger string = 
"Trigger_AfterSynchronizedProcessingTime_"
-)
-
-// Default constructs a default trigger that fires once after the end of 
window.
-// Late Data is discarded.
-func Default() Trigger {
-       return Trigger{Kind: DefaultTrigger}
-}
-
-// Always constructs a trigger that fires immediately
-// whenever an element is received.
-//
-// Equivalent to trigger.Repeat(trigger.AfterCount(1))
-func Always() Trigger {
-       return Trigger{Kind: AlwaysTrigger}
-}
-
-// AfterCount constructs a trigger that fires after
-// at least `count` number of elements are processed.
-func AfterCount(count int32) Trigger {
-       return Trigger{Kind: ElementCountTrigger, ElementCount: count}
-}
-
-// AfterProcessingTime constructs a trigger that fires relative to
-// when input first arrives.
-//
-// Must be configured with calls to PlusDelay, or AlignedTo. May be
-// configured with additional delay.
-func AfterProcessingTime() Trigger {
-       return Trigger{Kind: AfterProcessingTimeTrigger}
-}
-
 // PlusDelay configures an AfterProcessingTime trigger to fire after a 
specified delay,
 // no smaller than a millisecond.
-func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
-       if tr.Kind != AfterProcessingTimeTrigger {
-               panic(fmt.Errorf("can't apply processing delay to %s, want: 
AfterProcessingTimeTrigger", tr.Kind))
-       }
+func (tr *AfterProcessingTimeTrigger) PlusDelay(delay time.Duration) Trigger {

Review comment:
       A Go idiom that applies here is "Accept interfaces, return concrete 
types" in this case, don't return the Trigger interface, return the 
`*AfterProcessingTimeTrigger` type. If it needs to be used as the Trigger 
interface, it will be, but doing so other wise prevents the chaining.
   

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {
                // TODO: Change to call UnixMilli() once we move to only 
supporting a go version > 1.17.
                offsetMillis = offset.Unix()*1e3 + 
int64(offset.Nanosecond())/1e6
        }
-       tr.TimestampTransforms = append(tr.TimestampTransforms, 
AlignToTransform{
+       tr.timestampTransforms = append(tr.timestampTransforms, 
AlignToTransform{
                Period: int64(period / time.Millisecond),
                Offset: offsetMillis,
        })
        return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+       subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+       return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-       return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+       return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late 
firing.
+type AfterEndOfWindowTrigger struct {
+       earlyFiring Trigger
+       lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+       return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+       return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-       defaultEarly := Default()
-       return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: 
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of 
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+       return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-       if tr.Kind != AfterEndOfWindowTrigger {
-               panic(fmt.Errorf("can't apply early firing to %s, want: 
AfterEndOfWindowTrigger", tr.Kind))
-       }
-       tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {
+       tr.earlyFiring = early
        return tr
 }
 
 // LateFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies after the end of the window.
 //
 // Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
-       if tr.Kind != AfterEndOfWindowTrigger {
-               panic(fmt.Errorf("can't apply late firing to %s, want: 
AfterEndOfWindowTrigger", tr.Kind))
-       }
-       tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) Trigger {

Review comment:
       Similarly, return `*AfterEndOfWindowTrigger`

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {
                // TODO: Change to call UnixMilli() once we move to only 
supporting a go version > 1.17.
                offsetMillis = offset.Unix()*1e3 + 
int64(offset.Nanosecond())/1e6
        }
-       tr.TimestampTransforms = append(tr.TimestampTransforms, 
AlignToTransform{
+       tr.timestampTransforms = append(tr.timestampTransforms, 
AlignToTransform{
                Period: int64(period / time.Millisecond),
                Offset: offsetMillis,
        })
        return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+       subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+       return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-       return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+       return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late 
firing.
+type AfterEndOfWindowTrigger struct {
+       earlyFiring Trigger
+       lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {

Review comment:
       Note that the `EarlyTrigger` and `LateTrigger`, we *must* return the 
`Trigger` interface, since we don't know anything more precise, since the user 
could have set anything. It's OK in this case.

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -126,10 +134,7 @@ func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
 //
 // * Period may not be smaller than a millisecond.
 // * Offset may be a zero time (time.Time{}).
-func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
-       if tr.Kind != AfterProcessingTimeTrigger {
-               panic(fmt.Errorf("can't apply processing delay to %s, want: 
AfterProcessingTimeTrigger", tr.Kind))
-       }
+func (tr *AfterProcessingTimeTrigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {

Review comment:
       Similarly, return `*AfterProcessingTimeTrigger`

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {
                // TODO: Change to call UnixMilli() once we move to only 
supporting a go version > 1.17.
                offsetMillis = offset.Unix()*1e3 + 
int64(offset.Nanosecond())/1e6
        }
-       tr.TimestampTransforms = append(tr.TimestampTransforms, 
AlignToTransform{
+       tr.timestampTransforms = append(tr.timestampTransforms, 
AlignToTransform{
                Period: int64(period / time.Millisecond),
                Offset: offsetMillis,
        })
        return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+       subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+       return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-       return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+       return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late 
firing.
+type AfterEndOfWindowTrigger struct {
+       earlyFiring Trigger
+       lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+       return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+       return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-       defaultEarly := Default()
-       return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: 
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of 
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+       return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-       if tr.Kind != AfterEndOfWindowTrigger {
-               panic(fmt.Errorf("can't apply early firing to %s, want: 
AfterEndOfWindowTrigger", tr.Kind))
-       }
-       tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {

Review comment:
       Similarly, return `*AfterEndOfWindowTrigger`

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {
                // TODO: Change to call UnixMilli() once we move to only 
supporting a go version > 1.17.
                offsetMillis = offset.Unix()*1e3 + 
int64(offset.Nanosecond())/1e6
        }
-       tr.TimestampTransforms = append(tr.TimestampTransforms, 
AlignToTransform{
+       tr.timestampTransforms = append(tr.timestampTransforms, 
AlignToTransform{
                Period: int64(period / time.Millisecond),
                Offset: offsetMillis,
        })
        return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+       subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+       return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-       return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+       return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late 
firing.
+type AfterEndOfWindowTrigger struct {
+       earlyFiring Trigger
+       lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+       return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+       return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-       defaultEarly := Default()
-       return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: 
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of 
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+       return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}

Review comment:
       Any reason why the default was removed here?
   
   Alternatively (as a separate PR), we should probably have a validation 
function somewhere for triggers when they're set to make sure they aren't going 
to lose data or similar:
   
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py
   
   See the `may_lose_data` methods.  

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {
                // TODO: Change to call UnixMilli() once we move to only 
supporting a go version > 1.17.
                offsetMillis = offset.Unix()*1e3 + 
int64(offset.Nanosecond())/1e6
        }
-       tr.TimestampTransforms = append(tr.TimestampTransforms, 
AlignToTransform{
+       tr.timestampTransforms = append(tr.timestampTransforms, 
AlignToTransform{
                Period: int64(period / time.Millisecond),
                Offset: offsetMillis,
        })
        return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+       subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+       return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {
-       return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+       return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late 
firing.
+type AfterEndOfWindowTrigger struct {
+       earlyFiring Trigger
+       lateFiring  Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+       return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+       return t.lateFiring
 }
 
 // AfterEndOfWindow constructs a trigger that is configurable for early firing
 // (before the end of window) and late firing (after the end of window).
 //
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
-       defaultEarly := Default()
-       return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: 
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of 
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+       return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
 }
 
 // EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
-       if tr.Kind != AfterEndOfWindowTrigger {
-               panic(fmt.Errorf("can't apply early firing to %s, want: 
AfterEndOfWindowTrigger", tr.Kind))
-       }
-       tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {
+       tr.earlyFiring = early
        return tr
 }
 
 // LateFiring configures an AfterEndOfWindow trigger with an implicitly
 // repeated trigger that applies after the end of the window.
 //
 // Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
-       if tr.Kind != AfterEndOfWindowTrigger {
-               panic(fmt.Errorf("can't apply late firing to %s, want: 
AfterEndOfWindowTrigger", tr.Kind))
-       }
-       tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) Trigger {
+       tr.lateFiring = late
        return tr
 }
+
+// TODO(BEAM-3304) Add support for composite triggers.
+// Below defined triggers do not work as of now.
+// Intended for framework use only.

Review comment:
       It's irritating, but since these types below are Exported, you'll need 
to add the "TODO,(BEAM-3304)" or "NYI(BEAM-3304)" to each of their doc strings.
   
    Only people who actually read the source will discover this comment block 
as written, which is not how users will find these. (autocomplete may suggest 
them, etc). 

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset 
time.Time) Trigger {
                // TODO: Change to call UnixMilli() once we move to only 
supporting a go version > 1.17.
                offsetMillis = offset.Unix()*1e3 + 
int64(offset.Nanosecond())/1e6
        }
-       tr.TimestampTransforms = append(tr.TimestampTransforms, 
AlignToTransform{
+       tr.timestampTransforms = append(tr.timestampTransforms, 
AlignToTransform{
                Period: int64(period / time.Millisecond),
                Offset: offsetMillis,
        })
        return tr
 }
 
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+       subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+       return t.subTrigger
+}
+
 // Repeat constructs a trigger that fires a trigger repeatedly
 // once the condition has been met.
 //
 // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
 func Repeat(tr Trigger) Trigger {

Review comment:
       Similarly, return `*RepeatTrigger`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to