lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r690004841



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -144,8 +144,8 @@ func (n *DataSource) Process(ctx context.Context) error {
                if n.incrementIndexAndCheckSplit() {
                        return nil
                }
-               // TODO(lostluck) 2020/02/22: Should we include window headers 
or just count the element sizes?
-               ws, t, err := DecodeWindowedValueHeader(wc, r)
+    // TODO(lostluck) 2020/02/22: Should we include window headers or just 
count the element sizes?

Review comment:
       Probably need to run gofmt again.

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,48 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+       windowIntoOption()
+}
+
+type WindowTrigger struct {
+       Name window.Trigger
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
+type AccumulationMode struct {
+       Mode window.AccumulationMode
+}
+
+func (m AccumulationMode) windowIntoOption() {}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-       return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts 
...WindowIntoOption) PCollection {
+       return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, 
error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts 
...WindowIntoOption) (PCollection, error) {
        if !s.IsValid() {
                return PCollection{}, errors.New("invalid scope")
        }
        if !col.IsValid() {
                return PCollection{}, errors.New("invalid input pcollection")
        }
+       ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Trigger{}}
+       for _, opt := range opts {
+               switch opt := opt.(type) {
+               case WindowTrigger:
+                       ws.Trigger = opt.Name
+               case AccumulationMode:
+                       ws.AccumulationMode = opt.Mode
+               default:
+                       panic("Invalid option for Windowing Strategy")

Review comment:
       Since WindowIntoOptions are implementers of interfaces, we can improve 
this by actually communicating what's going wrong. That is something like 
`panic(fmt.Sprintf("Unknown WindowingInto option type: %T: %v", opt, opt))` 
which communicates the type, and if it's printable somehow so it's easier to 
find what needs working on.
   
   Since an error like this is likely an SDK dev error, a panic is fine, as 
there's nothing a user can do about it programmatically.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,135 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
-               AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          makeTrigger(w.Trigger),
+               AccumulationMode: makeAccumulationMode(w.AccumulationMode),
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY,
+       }
+       return ws, nil
+}
+
+func makeAccumulationMode(m window.AccumulationMode) 
pipepb.AccumulationMode_Enum {
+       switch m {
+       case window.Accumulating:
+               return pipepb.AccumulationMode_ACCUMULATING
+       case window.Discarding:
+               return pipepb.AccumulationMode_DISCARDING
+       case window.Unspecified:
+               return pipepb.AccumulationMode_UNSPECIFIED
+       case window.Retracting:
+               return pipepb.AccumulationMode_RETRACTING
+       default:
+               return pipepb.AccumulationMode_DISCARDING
+       }
+}
+
+func makeTrigger(t window.Trigger) *pipepb.Trigger {
+       switch t.Kind {
+       case window.DefaultTrigger:
+               return &pipepb.Trigger{
                        Trigger: &pipepb.Trigger_Default_{
                                Default: &pipepb.Trigger_Default{},
                        },
-               },
-               OutputTime:      pipepb.OutputTime_END_OF_WINDOW,
-               ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
-               AllowedLateness: 0,
-               OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
+               }
+       case window.AlwaysTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Always_{
+                               Always: &pipepb.Trigger_Always{},
+                       },
+               }
+       case window.AfterAnyTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_AfterAny_{
+                               AfterAny: &pipepb.Trigger_AfterAny{
+                                       Subtriggers: 
extractSubtriggers(t.SubTriggers),
+                               },
+                       },
+               }
+       case window.AfterAllTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_AfterAll_{
+                               AfterAll: &pipepb.Trigger_AfterAll{
+                                       Subtriggers: 
extractSubtriggers(t.SubTriggers),
+                               },
+                       },
+               }
+       case window.AfterProcessingTimeTrigger:
+               // TODO: Right now would work only for single delay value.

Review comment:
       Prefer not to leave unreferenced TODOs. Add the JIRA (BEAM-3304 ) for 
these or file a new one for these specific TODOs and list them as subtasks to 
BEAM-3304




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to