lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r680249151
##########
File path: sdks/go/pkg/beam/pcollection.go
##########
@@ -49,7 +50,11 @@ func (p PCollection) IsValid() bool {
}
// TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.
+
// TODO(herohde) 5/30/2017: add windowing strategy and documentation.
Review comment:
Since this becomes part of the public API for PCollection, it should
have a real doc comment
`// WindowingStrategy returns the windowing strategy for the PCollection`
##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,35 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+// TriggerWindowSums, much like WindowSums described above has an addition of
configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways
throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope,
beam.PCollection) beam.PCollection) {
+ timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4,
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+ windowSize := 3 * time.Second
+
+ validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection,
expected ...interface{}) {
+ // Window the data.
+ windowed := beam.WindowInto(s, wfn, in)
+
+ // change below statement to:
windowed.WindowingStrategy().SetAlways()
+ // to get the decoding error.
+ windowed.WindowingStrategy().SetDefault()
Review comment:
OK. It pains me to suggest this, but this user experience leaves a
little to be desired and we have a short window before the Go SDK exits
experimental where we can make it better.
I propose we make a `beam.WindowIntoOption` interface type, and change
`beam.WindowInto` to take a variadic parameter of those to configure the
trigger and other windowing strategy properties. Then we can type assert and
apply the options to the windowing strategy without much concern.
This would be similar to how `beam.ParDo` has an option type
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/option.go#L25
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/pardo.go#L356
My recommendation would be to make that into a separate change by itself,
not defining any of the concrete `beam.WindowIntoOption` types and handling,
and just changing the signature to take in `...options beam.WindowIntoOption`
as the final parameter (variadic parameters must be the final one in a function
signature).
While this change would not be 100% backwards compatible, it would be source
compatible for all reasonable uses of `beam.WindowInto`. The main place it
would break is if a user were doing something like:
`var myWindowInto func(s Scope, ws *window.Fn, col PCollection) PCollection
= beam.WindowInto`
which is unlikely and weird to begin with (it wouldn't serve anything beyond
ensuring the signature never changes).
Then we can rebase and get back to here and finish handling the
implementation.
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+ trigger, err := makeTrigger(w.Trigger)
+ if err != nil {
+ return nil, err
+ }
+
ws := &pipepb.WindowingStrategy{
WindowFn: windowFn,
MergeStatus: mergeStat,
AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: windowCoderId,
- Trigger: &pipepb.Trigger{
+ Trigger: trigger,
+ OutputTime: pipepb.OutputTime_END_OF_WINDOW,
+ ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ AllowedLateness: 0,
+ OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
+ }
+ return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
+ switch t {
+ case window.Default:
+ return &pipepb.Trigger{
Trigger: &pipepb.Trigger_Default_{
Default: &pipepb.Trigger_Default{},
},
- },
- OutputTime: pipepb.OutputTime_END_OF_WINDOW,
- ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
- AllowedLateness: 0,
- OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
+ }, nil
+ case window.Always:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Always_{
+ Always: &pipepb.Trigger_Always{},
+ },
+ }, nil
+ default:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Default_{
+ Default: &pipepb.Trigger_Default{},
+ },
+ }, nil
}
- return ws, nil
}
-
Review comment:
Missing blank line.
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+ trigger, err := makeTrigger(w.Trigger)
+ if err != nil {
+ return nil, err
+ }
+
ws := &pipepb.WindowingStrategy{
WindowFn: windowFn,
MergeStatus: mergeStat,
AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: windowCoderId,
- Trigger: &pipepb.Trigger{
+ Trigger: trigger,
+ OutputTime: pipepb.OutputTime_END_OF_WINDOW,
+ ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ AllowedLateness: 0,
+ OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
+ }
+ return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
Review comment:
```suggestion
}
func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
```
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+ trigger, err := makeTrigger(w.Trigger)
+ if err != nil {
+ return nil, err
+ }
+
ws := &pipepb.WindowingStrategy{
WindowFn: windowFn,
MergeStatus: mergeStat,
AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: windowCoderId,
- Trigger: &pipepb.Trigger{
+ Trigger: trigger,
+ OutputTime: pipepb.OutputTime_END_OF_WINDOW,
+ ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ AllowedLateness: 0,
+ OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
+ }
+ return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
Review comment:
Probably need to run `go fmt` over your changes, as for some reason
newlines between function declarations seem to be going missing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]