lostluck commented on code in PR #24307:
URL: https://github.com/apache/beam/pull/24307#discussion_r1029897281
##########
sdks/go/pkg/beam/core/graph/coder/coder.go:
##########
@@ -177,6 +177,7 @@ const (
Iterable Kind = "I"
KV Kind = "KV"
LP Kind = "LP" // Explicitly length prefixed, likely at
the runner's direction.
+ IWCValue Kind = "IWCvalue"
Review Comment:
Note how nothing else in this `coder` package calls out it's coderness in
their short names. I'd just call this `IntervalWindow` with IW as the short
version.
The 'value' sufixes for WindowedValue and ParamWindowedValue indicate
they're wrapping a value themselves, rather than being windows themselves, like
the one we're adding here.
##########
sdks/go/pkg/beam/core/graph/coder/coder.go:
##########
@@ -177,6 +177,7 @@ const (
Iterable Kind = "I"
KV Kind = "KV"
LP Kind = "LP" // Explicitly length prefixed, likely at
the runner's direction.
+ IWCValue Kind = "IWCvalue"
Window Kind = "window" // A debug wrapper around a window coder.
Review Comment:
It bother's me we can't re-use this, but I agree we shouldn't mix these up
just now. It's an internal implementation detail at least, so it can be cleaned
up at some future point.
##########
sdks/go/pkg/beam/core/runtime/graphx/coder.go:
##########
@@ -245,6 +245,10 @@ func (b *CoderUnmarshaller) makeCoder(id string, c
*pipepb.Coder) (*coder.Coder,
t := typex.New(root,
append([]typex.FullType{key.T}, coder.Types(values)...)...)
return &coder.Coder{Kind: kind, T: t,
Components: append([]*coder.Coder{key}, values...)}, nil
}
+ case urnIntervalWindow:
+ // If interval window in a KV, this may be a mapping
function.
+ // Special case since windows are not normally used
directly as FullValues.
+ return coder.NewIntervalWindowCoder(), nil
Review Comment:
Per the proto "map_windows" spec, the input/output is "KV<[]byte,Window>"
but as written here, this just returns "window". It'd be better if we follow
the normal structure, generalizing to handle interval windows better instead of
special casing here.
##########
sdks/go/pkg/beam/core/runtime/exec/window.go:
##########
@@ -97,6 +98,55 @@ func (w *WindowInto) String() string {
return fmt.Sprintf("WindowInto[%v]. Out:%v", w.Fn, w.Out.ID())
}
+type MapWindows struct {
+ UID UnitID
+ Fn WindowMapper
+ Out Node
+}
+
+func (m *MapWindows) ID() UnitID {
+ return m.UID
+}
+
+func (m *MapWindows) Up(_ context.Context) error {
+ return nil
+}
+
+func (m *MapWindows) StartBundle(ctx context.Context, id string, data
DataContext) error {
+ return m.Out.StartBundle(ctx, id, data)
+}
+
+func (m *MapWindows) ProcessElement(ctx context.Context, elm *FullValue,
values ...ReStream) error {
+ w, ok := elm.Elm.(window.IntervalWindow)
+ if !ok {
+ return errors.Errorf("not an IntervalWindow, got %T", elm.Elm)
+ }
+ newW, err := m.Fn.MapWindow(w)
+ if err != nil {
+ return err
+ }
+ out := &FullValue{
+ Elm: elm.Elm,
Review Comment:
As written here, it looks like we assume the output nonce is the same as the
input window, which may not be the case in all runners. As the coder is
written, and picked for a KV, it looks like the mapped element never gets coded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]