camphillips22 commented on issue #23106:
URL: https://github.com/apache/beam/issues/23106#issuecomment-1318712021
Thanks for the feedback!
> decoding the appropriate payload proto, and using that
information to configure an exec node that exists on the exec side of the
pipeline, in this case, taking an timestamp, and producing the appropriate
window for it, based on the provided windowing strategy (I assume).
Specific payload (likely a specific proto message) should be listed in the
proto next to the URN enum in beam_fn_api.proto.
I actually got this far myself, but I'm having trouble understanding how to
write the node's `ProcessElement`. It's not clear to me what the shape of the
data is here and there's not another node that's similar, so it's taking some
time to understand how to write that bit. From my experimentation, the
`FullValue` is empty, so I'm assuming that I need create the coders and
evaluate the `ReStream`, but haven't gotten to that yet.
As far as integration tests, I also took the approach you suggested with a
pipeline that looks like:
```golang
func MapWindows(s beam.Scope) {
col := beam.ParDo(s, &makeTimestampedData{Data: []int{4, 9, 2, 3, 5, 7,
8, 1, 6}}, beam.Impulse(s))
windowed := beam.WindowInto(s, window.NewFixedWindows(3*time.Second),
col)
mean := stats.Mean(s, windowed)
filtered := beam.ParDo(s, filterAbove, windowed, beam.SideInput{Input:
mean})
globalFiltered := beam.WindowInto(s, window.NewGlobalWindows(),
filtered)
passert.Sum(s, globalFiltered, "a", 4, 30)
}
```
The problem with this was that it never hit the map_windows URN. Not sure if
there's some difference between streaming and batch pipelines here though. I
wasn't sure how to test a streaming pipeline in a test like this (well,
presumably the `teststream` package, but that's not supported by dataflow), so
I ended up slightly modifying the streaming wordcap example to test it on
dataflow. Not spending money though would be great, so I'll try to get local
flink set up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]