riteshghorse commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687919886



##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr 
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+       windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, 
beam.AccumulationMode{Mode: m})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end 
of the window
+func TriggerDefault(s beam.Scope) {
+       // create a teststream pipeline and get the pcollection
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       con.AddElements(12000, 4.0, 5.0)
+       con.AdvanceWatermark(13000)
+
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every 
input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in 
the case of this trigger
+func TriggerAlways(s beam.Scope) {
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.AlwaysTrigger}, window.Discarding, 1.0, 2.0, 3.0, 
0.0)
+}
+
+// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N 
elements to be ready
+// to fire an output pane
+func TriggerElementCount(s beam.Scope) {
+       // create a teststream pipeline and get the pcollection
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(2000)
+       con.AddElements(6000, 4.0, 5.0)
+       con.AdvanceWatermark(10000)
+       con.AddElements(52000, 10.0)
+       con.AdvanceWatermark(53000)
+
+       col := teststream.Create(s, con)
+
+       // waits only for two elements to arrive and fires output after that 
and never fires that.
+       // For the trigger to fire every 2 elements, combine it with Repeat 
Trigger
+       tr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 2}
+       windowed := beam.WindowInto(s, window.NewGlobalWindows(), col, 
beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Count(s, sums, "total collections", 1)

Review comment:
       yes, you're right




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to