lostluck commented on code in PR #24307:
URL: https://github.com/apache/beam/pull/24307#discussion_r1029897281


##########
sdks/go/pkg/beam/core/graph/coder/coder.go:
##########
@@ -177,6 +177,7 @@ const (
        Iterable           Kind = "I"
        KV                 Kind = "KV"
        LP                 Kind = "LP" // Explicitly length prefixed, likely at 
the runner's direction.
+       IWCValue           Kind = "IWCvalue"

Review Comment:
   Note how nothing else in this `coder` package calls out it's coderness in 
their short names. I'd just call this `IntervalWindow` with IW as the short 
version.
   
   The 'value' sufixes for WindowedValue and ParamWindowedValue indicate 
they're wrapping a value themselves, rather than being windows themselves, like 
the one we're adding here.



##########
sdks/go/pkg/beam/core/graph/coder/coder.go:
##########
@@ -177,6 +177,7 @@ const (
        Iterable           Kind = "I"
        KV                 Kind = "KV"
        LP                 Kind = "LP" // Explicitly length prefixed, likely at 
the runner's direction.
+       IWCValue           Kind = "IWCvalue"
 
        Window Kind = "window" // A debug wrapper around a window coder.

Review Comment:
   It bother's me we can't re-use this, but I agree we shouldn't mix these up 
just now. It's an internal implementation detail at least, so it can be cleaned 
up at some future point.



##########
sdks/go/pkg/beam/core/runtime/graphx/coder.go:
##########
@@ -245,6 +245,10 @@ func (b *CoderUnmarshaller) makeCoder(id string, c 
*pipepb.Coder) (*coder.Coder,
                                t := typex.New(root, 
append([]typex.FullType{key.T}, coder.Types(values)...)...)
                                return &coder.Coder{Kind: kind, T: t, 
Components: append([]*coder.Coder{key}, values...)}, nil
                        }
+               case urnIntervalWindow:
+                       // If interval window in a KV, this may be a mapping 
function.
+                       // Special case since windows are not normally used 
directly as FullValues.
+                       return coder.NewIntervalWindowCoder(), nil

Review Comment:
   Per the proto "map_windows" spec, the input/output is "KV<[]byte,Window>" 
but as written here, this just returns "window".  It'd be better if we follow 
the normal structure, generalizing to handle interval windows better instead of 
special casing here.



##########
sdks/go/pkg/beam/core/runtime/exec/window.go:
##########
@@ -97,6 +98,55 @@ func (w *WindowInto) String() string {
        return fmt.Sprintf("WindowInto[%v]. Out:%v", w.Fn, w.Out.ID())
 }
 
+type MapWindows struct {
+       UID UnitID
+       Fn  WindowMapper
+       Out Node
+}
+
+func (m *MapWindows) ID() UnitID {
+       return m.UID
+}
+
+func (m *MapWindows) Up(_ context.Context) error {
+       return nil
+}
+
+func (m *MapWindows) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+       return m.Out.StartBundle(ctx, id, data)
+}
+
+func (m *MapWindows) ProcessElement(ctx context.Context, elm *FullValue, 
values ...ReStream) error {
+       w, ok := elm.Elm.(window.IntervalWindow)
+       if !ok {
+               return errors.Errorf("not an IntervalWindow, got %T", elm.Elm)
+       }
+       newW, err := m.Fn.MapWindow(w)
+       if err != nil {
+               return err
+       }
+       out := &FullValue{
+               Elm:       elm.Elm,

Review Comment:
   As written here, it looks like we assume the output nonce is the same as the 
input window, which may not be the case in all runners. As the coder is 
written, and picked for a KV, it looks like the mapped element never gets coded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to