lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r680249151



##########
File path: sdks/go/pkg/beam/pcollection.go
##########
@@ -49,7 +50,11 @@ func (p PCollection) IsValid() bool {
 }
 
 // TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.
+
 // TODO(herohde) 5/30/2017: add windowing strategy and documentation.

Review comment:
       Since this becomes part of the public  API for PCollection, it should 
have a real doc comment
   
   `// WindowingStrategy returns the windowing strategy for the PCollection`
   
   

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,35 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of 
configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways 
throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, 
beam.PCollection) beam.PCollection) {
+       timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+       windowSize := 3 * time.Second
+
+       validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, 
expected ...interface{}) {
+               // Window the data.
+               windowed := beam.WindowInto(s, wfn, in)
+
+               // change below statement to: 
windowed.WindowingStrategy().SetAlways()
+               // to get the decoding error.
+               windowed.WindowingStrategy().SetDefault()

Review comment:
       OK. It pains me to suggest this, but this user experience leaves a 
little to be desired and we have a short window before the Go SDK exits 
experimental where we can make it better. 
   
   I propose we make a `beam.WindowIntoOption` interface type, and change 
`beam.WindowInto` to take a variadic parameter of those to configure the 
trigger and other windowing strategy properties. Then we can type assert and 
apply the options to the windowing strategy without much concern.
   
   This would be similar to how `beam.ParDo` has an option type 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/option.go#L25
   https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/pardo.go#L356
   
   My recommendation would be to make that into a separate change by itself, 
not defining any of the concrete `beam.WindowIntoOption` types and handling, 
and just changing the signature to take in `...options beam.WindowIntoOption` 
as the final parameter (variadic parameters must be the final one in a function 
signature).
   
   While this change would not be 100% backwards compatible, it would be source 
compatible for all reasonable uses of `beam.WindowInto`. The main place it 
would break is if a user were doing something like:
   
   `var myWindowInto func(s Scope, ws *window.Fn, col PCollection) PCollection 
= beam.WindowInto`
   
   which is unlikely and weird to begin with (it wouldn't serve anything beyond 
ensuring the signature never changes).
   
   Then we can rebase and get back to here and finish handling the 
implementation.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger, err := makeTrigger(w.Trigger)
+       if err != nil {
+               return nil, err
+       }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
                AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+       }
+       return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
+       switch t {
+       case window.Default:
+               return &pipepb.Trigger{
                        Trigger: &pipepb.Trigger_Default_{
                                Default: &pipepb.Trigger_Default{},
                        },
-               },
-               OutputTime:      pipepb.OutputTime_END_OF_WINDOW,
-               ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
-               AllowedLateness: 0,
-               OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
+               }, nil
+       case window.Always:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Always_{
+                               Always: &pipepb.Trigger_Always{},
+                       },
+               }, nil
+       default:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Default_{
+                               Default: &pipepb.Trigger_Default{},
+                       },
+               }, nil
        }
-       return ws, nil
 }
-

Review comment:
       Missing blank line.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger, err := makeTrigger(w.Trigger)
+       if err != nil {
+               return nil, err
+       }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
                AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+       }
+       return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {

Review comment:
       ```suggestion
   }
   
   func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger, err := makeTrigger(w.Trigger)
+       if err != nil {
+               return nil, err
+       }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
                AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+       }
+       return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {

Review comment:
       Probably need to run `go fmt` over your changes, as for some reason 
newlines between function declarations seem to be going missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to