shunping commented on issue #33815:
URL: https://github.com/apache/beam/issues/33815#issuecomment-2844032012

   Ok. I have a simplified pipeline that has only one PeriodicImpulse and no 
Flatten. Note that I need a Reshuffle to start a new stage so that the pending 
watermark issue can be seen. 
   
   ```go
   package main
   
   import (
        "context"
        "log"
        "math"
        "time"
   
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        beamLog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
   )
   
   func init() {
        register.Function3x0(count)
        register.Function3x0(initCount)
        register.Emitter1[int]()
   }
   
   func initCount(ctx context.Context, _ []byte, emit func(int)) {
        // beamLog.Info(ctx, "Starting count")
        emit(1)
   }
   
   func count(ctx context.Context, currCount int, emit func(int)) {
        beamLog.Infof(ctx, "Current Count = %d", currCount)
        emit(currCount + 1)
   }
   
   func main() {
   
        beam.Init()
   
        ctx := context.Background()
        pipeline, scope := beam.NewPipelineWithRoot()
   
        duration := 5 * time.Second
        unboundedSource := periodic.Impulse(scope, 
time.UnixMilli(math.MaxInt32), time.UnixMilli(math.MaxInt64), duration, false)
        trigger := unboundedSource
   
        s1 := scope.Scope("source")
        c1 := beam.ParDo(s1, initCount, trigger)
   
        sr := scope.Scope("reshuffle")
        r := beam.Reshuffle(sr, c1)
   
        s2 := scope.Scope("count")
        beam.ParDo(s2, count, r)
   
        if err := beamx.Run(ctx, pipeline); err != nil {
                log.Fatalf("Failed to execute job: %v", err)
        }
   }
   ```
   
   Basically, the problem is no longer about using flatten on unbounded sources 
(as suggested from the title), but something related to the following stage 
after PeriodicImpulse's SDF which doesn't stop (perform any checkpoint). The 
current implementation of PeriodicImpulse internal DoFn is that it will only 
stop when it catches up with the current time. With a very early start time, it 
will take a long time before the checkpoint happens, leading to a stuck 
pipeline.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to