[ 
https://issues.apache.org/jira/browse/BEAM-3304?focusedWorklogId=631913&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-631913
 ]

ASF GitHub Bot logged work on BEAM-3304:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Jul/21 23:45
            Start Date: 30/Jul/21 23:45
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r680249151



##########
File path: sdks/go/pkg/beam/pcollection.go
##########
@@ -49,7 +50,11 @@ func (p PCollection) IsValid() bool {
 }
 
 // TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.
+
 // TODO(herohde) 5/30/2017: add windowing strategy and documentation.

Review comment:
       Since this becomes part of the public  API for PCollection, it should 
have a real doc comment
   
   `// WindowingStrategy returns the windowing strategy for the PCollection`
   
   

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,35 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of 
configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways 
throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, 
beam.PCollection) beam.PCollection) {
+       timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+       windowSize := 3 * time.Second
+
+       validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, 
expected ...interface{}) {
+               // Window the data.
+               windowed := beam.WindowInto(s, wfn, in)
+
+               // change below statement to: 
windowed.WindowingStrategy().SetAlways()
+               // to get the decoding error.
+               windowed.WindowingStrategy().SetDefault()

Review comment:
       OK. It pains me to suggest this, but this user experience leaves a 
little to be desired and we have a short window before the Go SDK exits 
experimental where we can make it better. 
   
   I propose we make a `beam.WindowIntoOption` interface type, and change 
`beam.WindowInto` to take a variadic parameter of those to configure the 
trigger and other windowing strategy properties. Then we can type assert and 
apply the options to the windowing strategy without much concern.
   
   This would be similar to how `beam.ParDo` has an option type 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/option.go#L25
   https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/pardo.go#L356
   
   My recommendation would be to make that into a separate change by itself, 
not defining any of the concrete `beam.WindowIntoOption` types and handling, 
and just changing the signature to take in `...options beam.WindowIntoOption` 
as the final parameter (variadic parameters must be the final one in a function 
signature).
   
   While this change would not be 100% backwards compatible, it would be source 
compatible for all reasonable uses of `beam.WindowInto`. The main place it 
would break is if a user were doing something like:
   
   `var myWindowInto func(s Scope, ws *window.Fn, col PCollection) PCollection 
= beam.WindowInto`
   
   which is unlikely and weird to begin with (it wouldn't serve anything beyond 
ensuring the signature never changes).
   
   Then we can rebase and get back to here and finish handling the 
implementation.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger, err := makeTrigger(w.Trigger)
+       if err != nil {
+               return nil, err
+       }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
                AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+       }
+       return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
+       switch t {
+       case window.Default:
+               return &pipepb.Trigger{
                        Trigger: &pipepb.Trigger_Default_{
                                Default: &pipepb.Trigger_Default{},
                        },
-               },
-               OutputTime:      pipepb.OutputTime_END_OF_WINDOW,
-               ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
-               AllowedLateness: 0,
-               OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
+               }, nil
+       case window.Always:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Always_{
+                               Always: &pipepb.Trigger_Always{},
+                       },
+               }, nil
+       default:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Default_{
+                               Default: &pipepb.Trigger_Default{},
+                       },
+               }, nil
        }
-       return ws, nil
 }
-

Review comment:
       Missing blank line.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger, err := makeTrigger(w.Trigger)
+       if err != nil {
+               return nil, err
+       }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
                AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+       }
+       return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {

Review comment:
       ```suggestion
   }
   
   func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,24 +981,46 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger, err := makeTrigger(w.Trigger)
+       if err != nil {
+               return nil, err
+       }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
                AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+       }
+       return ws, nil
+}
+func makeTrigger(t window.TriggerType) (*pipepb.Trigger, error) {

Review comment:
       Probably need to run `go fmt` over your changes, as for some reason 
newlines between function declarations seem to be going missing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 631913)
    Time Spent: 1h  (was: 50m)

> Go triggering support
> ---------------------
>
>                 Key: BEAM-3304
>                 URL: https://issues.apache.org/jira/browse/BEAM-3304
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Ritesh Ghorse
>            Priority: P3
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> `Add support for triggers.
> [https://beam.apache.org/documentation/programming-guide/#triggers] 
> Triggers are special runner side behavior indicating how to handle data WRT 
> the watermark and window. Commonly configuring the processing for “late data” 
> and similar.
> These are not currently implemented for user use in the Go SDK. Reshuffle 
> configures triggers, but it’s not accessible. A correct trigger 
> implementation can at least re-implement Reshuffle in a user pipeline, rather 
> than handled specially within the framework.
>  * Requires extending the window package to be able to configure the various 
> triggers.
>  * Specifically being able to compose triggers as also permitted by the proto.
>  ** 
> [https://github.com/apache/beam/blob/6e7b1c44bc7275ee047afc059fd610cd3f4e5bee/model/pipeline/src/main/proto/beam_runner_api.proto#L1111]
>  
>  * Requires updating the graphx package translate.go to marshal (and 
> unmarshal?) the triggers to and from Beam PipelineProto Windowing strategies.
>  * Requires supporting triggers with the beam.WindowInto transform for user 
> pipeline use as well as complete documentation on its use from the user side.
>  ** 
> [https://github.com/apache/beam/blob/6e7b1c44bc7275ee047afc059fd610cd3f4e5bee/sdks/go/pkg/beam/windowing.go]
>  
>  * Panes need to be decoded, otherwise triggering will cause runtime errors: 
> [https://lists.apache.org/thread.html/r94c42d2d116f6464cd6b689543e5e578edf8310bf7c6e48a0958a56c%40%3Cdev.beam.apache.org%3E]
>  
>  * Handle pane propagation and observation in the exec package, and in user 
> dofns. 
>  ** Panes indicate whether data was on time or not, and similar facets which 
> may be relevant for processing.
>  ** Might simply extend the existing window interface.
>  
> Similar to windowing,  many of the same places as 
> https://issues.apache.org/jira/browse/BEAM-11100 need to be modified.
> At simplest though, it's mostly a runner side construction, with less concern 
> on the exec side, and generally much simpler. 
> Appropriate integration tests against portable runners must be implemented:
> [https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives]
>  
> And optionally add support for the configurable triggers to the the Go Direct 
> Runner. However, the results must be compared and validated against a 
> semantically correct runner like the python portable runner first. At 
> minimum, the Go Direct Runner should be made aware of triggers and produce a 
> coherent error whenever there's a trigger it can't deal with.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to