riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687900874
##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+ windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr},
beam.AccumulationMode{Mode: m})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end
of the window
+func TriggerDefault(s beam.Scope) {
+ // create a teststream pipeline and get the pcollection
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ con.AddElements(12000, 4.0, 5.0)
+ con.AdvanceWatermark(13000)
+
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every
input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in
the case of this trigger
Review comment:
Got it fixed. It was because OnTimeBehavior was set to fire always. Now
changed it to fire if non-empty. Will push that changes soon.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]