shunping commented on issue #35771:
URL: https://github.com/apache/beam/issues/35771#issuecomment-3146915070

   > Ideally, we run the same test pipeline implemented in Java and Go, and see 
what they do. If they have the same kind of bug, we should probably just change 
Prism's behavior then.
   
   I made a similar pipeline using go sdk. 
   ```
   package main
   
   import (
        "context"
        "fmt"
        "log"
        "time"
   
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
        beamLog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
   )
   
   func init() {
        register.DoFn7x0[context.Context, beam.Window, beam.EventTime, 
timers.Provider, string, int, func(string)](&eventTimerDoFn{})
        register.Function3x0(add_key)
   }
   
   func add_key(ctx context.Context, e int, emit func(string, int)) {
        key := fmt.Sprint(e)
        // beamLog.Infof(ctx, "%d -> (%s, %d)", e, key, e+1)
        emit(key, e+1)
   }
   
   type eventTimerDoFn struct {
        Timer timers.EventTime
   }
   
   func (fn *eventTimerDoFn) ProcessElement(ctx context.Context, w beam.Window, 
ts beam.EventTime, tp timers.Provider, key string, value int, emitWords 
func(string)) {
        beamLog.Infof(ctx, "ProcessElement called: received %s,%d", key, value)
   
        // Set an event-time timer to the end of window
        fn.Timer.Set(tp, w.MaxTimestamp().ToTime())
   }
   
   func (fn *eventTimerDoFn) OnTimer(ctx context.Context, w beam.Window, ts 
beam.EventTime, tp timers.Provider, key string, timer timers.Context, emitWords 
func(string)) {
        switch timer.Family {
        case fn.Timer.Family:
                beamLog.Infof(ctx, "OnTimer called: for key %s", key)
   
                // sleep to trigger split bundle
                time.Sleep(1 * time.Second)
        }
   }
   
   func AddEventTimeDoFn(s beam.Scope, in beam.PCollection) beam.PCollection {
        return beam.ParDo(s, &eventTimerDoFn{
                Timer: timers.InEventTime("processWatermark"),
        }, in)
   }
   
   func main() {
        beam.Init()
   
        ctx := context.Background()
        p := beam.NewPipeline()
        s := p.Root()
   
        a := beam.CreateList(s, []int{1, 2, 3})
        b := beam.ParDo(s, add_key, a)
        AddEventTimeDoFn(s, b)
   
        if err := beamx.Run(ctx, p); err != nil {
                log.Fatalf("Failed to execute job: %v", err)
        }
   }
   ```
   
   
   I ran it before my PR change in #35770, and I was able to see a similar 
error as mentioned in 
https://github.com/apache/beam/issues/35771#issue-3285534638: two timers in 
`pendings` in the last stage.
   ```
   stage-000 watermark in +inf out +inf upstream +inf from IMPULSE   pending [] 
byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] 
holdsInBundle map[] pttEvents map[] bundlesToInject []
           sideInputs: [] outputCols: [n1] outputConsumers: [stage-001] 
sideConsumers: []
   stage-001 watermark in +inf out +inf upstream +inf from stage-000 pending [] 
byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] 
holdsInBundle map[] pttEvents map[] bundlesToInject []
           sideInputs: [] outputCols: [n3] outputConsumers: [stage-002] 
sideConsumers: []
   stage-002 watermark in glo out glo upstream +inf from stage-001 pending 
[{Timer - Window [*], EventTime glo, Hold glo, "e4" "processWatermark" "" 
"\x011"} {Timer - Window [*], EventTime glo, Hold glo, "e4" "processWatermark" 
"" "\x013"}] byKey map[] inprogressKeys map[] byBundle map[] holds [] 
holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
           sideInputs: [] outputCols: [] outputConsumers: [] sideConsumers: []
   ```
   
   When I ran the above pipeline against #35770, which includes the change of 
prism, I can see the timers are fired more than I expected (similar to the 
python pipeline prior to Python SDK change.
   ```
   v-go-prism/sdks/go/examples/periodic_bug.go:41 
sdk.time=2025-08-03T02:00:04.654Z sdk.msg="OnTimer called: for key 3"
   2025/08/02 22:00:04 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1754186404643657000]_go 
worker.endpoint=localhost:62176 sdk.transformID=e4 
sdk.location=/Users/shunping/Projects/beam-dev-go-prism/sdks/go/examples/periodic_bug.go:41
 sdk.time=2025-08-03T02:00:04.855Z sdk.msg="OnTimer called: for key 2"
   2025/08/02 22:00:05 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1754186404643657000]_go 
worker.endpoint=localhost:62176 sdk.transformID=e4 
sdk.location=/Users/shunping/Projects/beam-dev-go-prism/sdks/go/examples/periodic_bug.go:41
 sdk.time=2025-08-03T02:00:05.258Z sdk.msg="OnTimer called: for key 1"
   2025/08/02 22:00:05 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1754186404643657000]_go 
worker.endpoint=localhost:62176 sdk.transformID=e4 
sdk.location=/Users/shunping/Projects/beam-dev-go-prism/sdks/go/examples/periodic_bug.go:41
 sdk.time=2025-08-03T02:00:05.655Z sdk.msg="OnTimer called: for key 1"
   2025/08/02 22:00:06 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1754186404643657000]_go 
worker.endpoint=localhost:62176 sdk.transformID=e4 
sdk.location=/Users/shunping/Projects/beam-dev-go-prism/sdks/go/examples/periodic_bug.go:41
 sdk.time=2025-08-03T02:00:06.655Z sdk.msg="OnTimer called: for key 2"
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to