riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r683717984
##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,32 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+// TriggerWindowSums, much like WindowSums described above has an addition of
configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways
throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope,
beam.PCollection) beam.PCollection) {
+ timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4,
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+ windowSize := 3 * time.Second
+
+ validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection,
expected ...interface{}) {
+ // Window the data.
+ windowed := beam.WindowInto(s, wfn, in,
beam.WindowTrigger{Name: window.Always})
+ // To get the pane decoding error, change above statement to
+ // windowed := beam.WindowInto(s, wfn, in,
beam.WindowTrigger{Name: window.Always})
+ // Perform the appropriate sum operation.
+ sums := sumPerKey(s, windowed)
+ // Drop back to Global windows, and drop the key otherwise
passert.Equals doesn't work.
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ sums = beam.DropKey(s, sums)
+ passert.Equals(s, sums, expected...)
+ }
+
+ // Use fixed windows to divide the data into 3 chunks.
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize),
timestampedData, 15, 15, 15)
Review comment:
Yes. I think there is something missing in the trigger implementation.
Will be working on it next.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]