shunping opened a new issue, #36145: URL: https://github.com/apache/beam/issues/36145
### What happened? Trying to enable TriggerAlways primitive integration test (https://github.com/apache/beam/blob/6077034b7337283fc2f2467c6455ef7b308379c8/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go#L52), but it is not successful. I am able to reproduce it with a standalone code ``` package main import ( "context" "log" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func init() {} func main() { beam.Init() ctx := context.Background() pipeline, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() con.AddElements(1000, 1.0, 2.0, 3.0) con.AdvanceWatermark(11000) col := teststream.Create(s, con) windowSize := 10 * time.Second wfn := window.NewFixedWindows(windowSize) windowed := beam.WindowInto(s.Scope("Fixed"), wfn, col, beam.Trigger(trigger.Always())) sums := stats.Sum(s, windowed) debug.Print(s, sums) if err := beamx.Run(ctx, pipeline); err != nil { log.Fatalf("Failed to execute job: %v", err) } } ``` The stats.Sum should sum on three window panes separately, since trigger.Always() is used. However, the result returns 6. ``` 2025/09/14 15:28:17 INFO log from SDK worker worker.ID=job-001[go-job-1-1757878097559282000]_go worker.endpoint=localhost:53202 sdk.transformID=e7 sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:54 sdk.time=2025-09-14T19:28:17.572Z sdk.msg="Elm: 6" ``` I changed `stats.Sum` to GroupByKey, and I can three three panes. ``` package main import ( "context" "log" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func init() {} func main() { beam.Init() ctx := context.Background() pipeline, s := beam.NewPipelineWithRoot() con := teststream.NewConfig() con.AddElements(1000, 1.0, 2.0, 3.0) con.AdvanceWatermark(11000) col := teststream.Create(s, con) windowSize := 10 * time.Second wfn := window.NewFixedWindows(windowSize) windowed := beam.WindowInto(s.Scope("Fixed"), wfn, col, beam.Trigger(trigger.Always())) kv := beam.ParDo(s, func(element float64) (string, float64) { return "0", element }, windowed) gbk := beam.GroupByKey(s, kv) debug.Print(s, gbk) if err := beamx.Run(ctx, pipeline); err != nil { log.Fatalf("Failed to execute job: %v", err) } } ``` ``` 2025/09/14 15:32:38 INFO log from SDK worker worker.ID=job-001[go-job-1-1757878358434257000]_go worker.endpoint=localhost:53506 sdk.transformID=e5 sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:77 sdk.time=2025-09-14T19:32:38.500Z sdk.msg="Elm: (0,[2])" 2025/09/14 15:32:38 INFO log from SDK worker worker.ID=job-001[go-job-1-1757878358434257000]_go worker.endpoint=localhost:53506 sdk.transformID=e5 sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:77 sdk.time=2025-09-14T19:32:38.500Z sdk.msg="Elm: (0,[1])" 2025/09/14 15:32:38 INFO log from SDK worker worker.ID=job-001[go-job-1-1757878358434257000]_go worker.endpoint=localhost:53506 sdk.transformID=e5 sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:77 sdk.time=2025-09-14T19:32:38.500Z sdk.msg="Elm: (0,[3])" ``` ### Issue Failure Failure: Test is flaky ### Issue Priority Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be sure the product is healthy) ### Issue Components - [ ] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
