lostluck commented on a change in pull request #15952:
URL: https://github.com/apache/beam/pull/15952#discussion_r747922760
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -65,59 +119,13 @@ type AlignToTransform struct {
func (AlignToTransform) timestampTransform() {}
-const (
- DefaultTrigger string = "Trigger_Default_"
- AlwaysTrigger string = "Trigger_Always_"
- AfterAnyTrigger string = "Trigger_AfterAny_"
- AfterAllTrigger string = "Trigger_AfterAll_"
- AfterProcessingTimeTrigger string =
"Trigger_AfterProcessing_Time_"
- ElementCountTrigger string = "Trigger_ElementCount_"
- AfterEndOfWindowTrigger string =
"Trigger_AfterEndOfWindow_"
- RepeatTrigger string = "Trigger_Repeat_"
- OrFinallyTrigger string = "Trigger_OrFinally_"
- NeverTrigger string = "Trigger_Never_"
- AfterSynchronizedProcessingTimeTrigger string =
"Trigger_AfterSynchronizedProcessingTime_"
-)
-
-// Default constructs a default trigger that fires once after the end of
window.
-// Late Data is discarded.
-func Default() Trigger {
- return Trigger{Kind: DefaultTrigger}
-}
-
-// Always constructs a trigger that fires immediately
-// whenever an element is received.
-//
-// Equivalent to trigger.Repeat(trigger.AfterCount(1))
-func Always() Trigger {
- return Trigger{Kind: AlwaysTrigger}
-}
-
-// AfterCount constructs a trigger that fires after
-// at least `count` number of elements are processed.
-func AfterCount(count int32) Trigger {
- return Trigger{Kind: ElementCountTrigger, ElementCount: count}
-}
-
-// AfterProcessingTime constructs a trigger that fires relative to
-// when input first arrives.
-//
-// Must be configured with calls to PlusDelay, or AlignedTo. May be
-// configured with additional delay.
-func AfterProcessingTime() Trigger {
- return Trigger{Kind: AfterProcessingTimeTrigger}
-}
-
// PlusDelay configures an AfterProcessingTime trigger to fire after a
specified delay,
// no smaller than a millisecond.
-func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
- if tr.Kind != AfterProcessingTimeTrigger {
- panic(fmt.Errorf("can't apply processing delay to %s, want:
AfterProcessingTimeTrigger", tr.Kind))
- }
+func (tr *AfterProcessingTimeTrigger) PlusDelay(delay time.Duration) Trigger {
Review comment:
A Go idiom that applies here is "Accept interfaces, return concrete
types" in this case, don't return the Trigger interface, return the
`*AfterProcessingTimeTrigger` type. If it needs to be used as the Trigger
interface, it will be, but doing so other wise prevents the chaining.
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
// TODO: Change to call UnixMilli() once we move to only
supporting a go version > 1.17.
offsetMillis = offset.Unix()*1e3 +
int64(offset.Nanosecond())/1e6
}
- tr.TimestampTransforms = append(tr.TimestampTransforms,
AlignToTransform{
+ tr.timestampTransforms = append(tr.timestampTransforms,
AlignToTransform{
Period: int64(period / time.Millisecond),
Offset: offsetMillis,
})
return tr
}
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+ subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+ return t.subTrigger
+}
+
// Repeat constructs a trigger that fires a trigger repeatedly
// once the condition has been met.
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(tr Trigger) Trigger {
- return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+ return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late
firing.
+type AfterEndOfWindowTrigger struct {
+ earlyFiring Trigger
+ lateFiring Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+ return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+ return t.lateFiring
}
// AfterEndOfWindow constructs a trigger that is configurable for early firing
// (before the end of window) and late firing (after the end of window).
//
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
- defaultEarly := Default()
- return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger:
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+ return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
}
// EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
// repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
- if tr.Kind != AfterEndOfWindowTrigger {
- panic(fmt.Errorf("can't apply early firing to %s, want:
AfterEndOfWindowTrigger", tr.Kind))
- }
- tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {
+ tr.earlyFiring = early
return tr
}
// LateFiring configures an AfterEndOfWindow trigger with an implicitly
// repeated trigger that applies after the end of the window.
//
// Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
- if tr.Kind != AfterEndOfWindowTrigger {
- panic(fmt.Errorf("can't apply late firing to %s, want:
AfterEndOfWindowTrigger", tr.Kind))
- }
- tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) Trigger {
Review comment:
Similarly, return `*AfterEndOfWindowTrigger`
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
// TODO: Change to call UnixMilli() once we move to only
supporting a go version > 1.17.
offsetMillis = offset.Unix()*1e3 +
int64(offset.Nanosecond())/1e6
}
- tr.TimestampTransforms = append(tr.TimestampTransforms,
AlignToTransform{
+ tr.timestampTransforms = append(tr.timestampTransforms,
AlignToTransform{
Period: int64(period / time.Millisecond),
Offset: offsetMillis,
})
return tr
}
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+ subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+ return t.subTrigger
+}
+
// Repeat constructs a trigger that fires a trigger repeatedly
// once the condition has been met.
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(tr Trigger) Trigger {
- return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+ return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late
firing.
+type AfterEndOfWindowTrigger struct {
+ earlyFiring Trigger
+ lateFiring Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
Review comment:
Note that the `EarlyTrigger` and `LateTrigger`, we *must* return the
`Trigger` interface, since we don't know anything more precise, since the user
could have set anything. It's OK in this case.
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -126,10 +134,7 @@ func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
//
// * Period may not be smaller than a millisecond.
// * Offset may be a zero time (time.Time{}).
-func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
- if tr.Kind != AfterProcessingTimeTrigger {
- panic(fmt.Errorf("can't apply processing delay to %s, want:
AfterProcessingTimeTrigger", tr.Kind))
- }
+func (tr *AfterProcessingTimeTrigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
Review comment:
Similarly, return `*AfterProcessingTimeTrigger`
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
// TODO: Change to call UnixMilli() once we move to only
supporting a go version > 1.17.
offsetMillis = offset.Unix()*1e3 +
int64(offset.Nanosecond())/1e6
}
- tr.TimestampTransforms = append(tr.TimestampTransforms,
AlignToTransform{
+ tr.timestampTransforms = append(tr.timestampTransforms,
AlignToTransform{
Period: int64(period / time.Millisecond),
Offset: offsetMillis,
})
return tr
}
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+ subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+ return t.subTrigger
+}
+
// Repeat constructs a trigger that fires a trigger repeatedly
// once the condition has been met.
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(tr Trigger) Trigger {
- return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+ return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late
firing.
+type AfterEndOfWindowTrigger struct {
+ earlyFiring Trigger
+ lateFiring Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+ return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+ return t.lateFiring
}
// AfterEndOfWindow constructs a trigger that is configurable for early firing
// (before the end of window) and late firing (after the end of window).
//
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
- defaultEarly := Default()
- return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger:
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+ return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
}
// EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
// repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
- if tr.Kind != AfterEndOfWindowTrigger {
- panic(fmt.Errorf("can't apply early firing to %s, want:
AfterEndOfWindowTrigger", tr.Kind))
- }
- tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {
Review comment:
Similarly, return `*AfterEndOfWindowTrigger`
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
// TODO: Change to call UnixMilli() once we move to only
supporting a go version > 1.17.
offsetMillis = offset.Unix()*1e3 +
int64(offset.Nanosecond())/1e6
}
- tr.TimestampTransforms = append(tr.TimestampTransforms,
AlignToTransform{
+ tr.timestampTransforms = append(tr.timestampTransforms,
AlignToTransform{
Period: int64(period / time.Millisecond),
Offset: offsetMillis,
})
return tr
}
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+ subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+ return t.subTrigger
+}
+
// Repeat constructs a trigger that fires a trigger repeatedly
// once the condition has been met.
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(tr Trigger) Trigger {
- return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+ return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late
firing.
+type AfterEndOfWindowTrigger struct {
+ earlyFiring Trigger
+ lateFiring Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+ return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+ return t.lateFiring
}
// AfterEndOfWindow constructs a trigger that is configurable for early firing
// (before the end of window) and late firing (after the end of window).
//
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
- defaultEarly := Default()
- return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger:
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+ return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
Review comment:
Any reason why the default was removed here?
Alternatively (as a separate PR), we should probably have a validation
function somewhere for triggers when they're set to make sure they aren't going
to lose data or similar:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py
See the `may_lose_data` methods.
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
// TODO: Change to call UnixMilli() once we move to only
supporting a go version > 1.17.
offsetMillis = offset.Unix()*1e3 +
int64(offset.Nanosecond())/1e6
}
- tr.TimestampTransforms = append(tr.TimestampTransforms,
AlignToTransform{
+ tr.timestampTransforms = append(tr.timestampTransforms,
AlignToTransform{
Period: int64(period / time.Millisecond),
Offset: offsetMillis,
})
return tr
}
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+ subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+ return t.subTrigger
+}
+
// Repeat constructs a trigger that fires a trigger repeatedly
// once the condition has been met.
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(tr Trigger) Trigger {
- return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
+ return &RepeatTrigger{subTrigger: tr}
+}
+
+// AfterEndOfWindowTrigger provides option to set triggers for early and late
firing.
+type AfterEndOfWindowTrigger struct {
+ earlyFiring Trigger
+ lateFiring Trigger
+}
+
+func (t AfterEndOfWindowTrigger) trigger() {}
+
+// EarlyTrigger returns the Early Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) EarlyTrigger() Trigger {
+ return t.earlyFiring
+}
+
+// LateTrigger returns the Late Firing trigger for AfterEndOfWindowTrigger.
+func (t *AfterEndOfWindowTrigger) LateTrigger() Trigger {
+ return t.lateFiring
}
// AfterEndOfWindow constructs a trigger that is configurable for early firing
// (before the end of window) and late firing (after the end of window).
//
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
-// Override it with EarlyFiring and LateFiring methods on this trigger.
-func AfterEndOfWindow() Trigger {
- defaultEarly := Default()
- return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger:
&defaultEarly, LateTrigger: nil}
+// Must call EarlyFiring or LateFiring method on this trigger at the time of
setting.
+func AfterEndOfWindow() *AfterEndOfWindowTrigger {
+ return &AfterEndOfWindowTrigger{earlyFiring: nil, lateFiring: nil}
}
// EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
// repeated trigger that applies before the end of the window.
-func (tr Trigger) EarlyFiring(early Trigger) Trigger {
- if tr.Kind != AfterEndOfWindowTrigger {
- panic(fmt.Errorf("can't apply early firing to %s, want:
AfterEndOfWindowTrigger", tr.Kind))
- }
- tr.EarlyTrigger = &early
+func (tr *AfterEndOfWindowTrigger) EarlyFiring(early Trigger) Trigger {
+ tr.earlyFiring = early
return tr
}
// LateFiring configures an AfterEndOfWindow trigger with an implicitly
// repeated trigger that applies after the end of the window.
//
// Not setting a late firing trigger means elements are discarded.
-func (tr Trigger) LateFiring(late Trigger) Trigger {
- if tr.Kind != AfterEndOfWindowTrigger {
- panic(fmt.Errorf("can't apply late firing to %s, want:
AfterEndOfWindowTrigger", tr.Kind))
- }
- tr.LateTrigger = &late
+func (tr *AfterEndOfWindowTrigger) LateFiring(late Trigger) Trigger {
+ tr.lateFiring = late
return tr
}
+
+// TODO(BEAM-3304) Add support for composite triggers.
+// Below defined triggers do not work as of now.
+// Intended for framework use only.
Review comment:
It's irritating, but since these types below are Exported, you'll need
to add the "TODO,(BEAM-3304)" or "NYI(BEAM-3304)" to each of their doc strings.
Only people who actually read the source will discover this comment block
as written, which is not how users will find these. (autocomplete may suggest
them, etc).
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
##########
@@ -138,49 +143,114 @@ func (tr Trigger) AlignedTo(period time.Duration, offset
time.Time) Trigger {
// TODO: Change to call UnixMilli() once we move to only
supporting a go version > 1.17.
offsetMillis = offset.Unix()*1e3 +
int64(offset.Nanosecond())/1e6
}
- tr.TimestampTransforms = append(tr.TimestampTransforms,
AlignToTransform{
+ tr.timestampTransforms = append(tr.timestampTransforms,
AlignToTransform{
Period: int64(period / time.Millisecond),
Offset: offsetMillis,
})
return tr
}
+// RepeatTrigger fires a sub-trigger repeatedly.
+type RepeatTrigger struct {
+ subTrigger Trigger
+}
+
+func (t RepeatTrigger) trigger() {}
+
+// SubTrigger returns the trigger to be repeated.
+func (t *RepeatTrigger) SubTrigger() Trigger {
+ return t.subTrigger
+}
+
// Repeat constructs a trigger that fires a trigger repeatedly
// once the condition has been met.
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(tr Trigger) Trigger {
Review comment:
Similarly, return `*RepeatTrigger`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]