shunping commented on issue #33815: URL: https://github.com/apache/beam/issues/33815#issuecomment-2844032012
Ok. I have a simplified pipeline that has only one PeriodicImpulse and no Flatten. Note that I need a Reshuffle to start a new stage so that the pending watermark issue can be seen. ```go package main import ( "context" "log" "math" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" beamLog "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) func init() { register.Function3x0(count) register.Function3x0(initCount) register.Emitter1[int]() } func initCount(ctx context.Context, _ []byte, emit func(int)) { // beamLog.Info(ctx, "Starting count") emit(1) } func count(ctx context.Context, currCount int, emit func(int)) { beamLog.Infof(ctx, "Current Count = %d", currCount) emit(currCount + 1) } func main() { beam.Init() ctx := context.Background() pipeline, scope := beam.NewPipelineWithRoot() duration := 5 * time.Second unboundedSource := periodic.Impulse(scope, time.UnixMilli(math.MaxInt32), time.UnixMilli(math.MaxInt64), duration, false) trigger := unboundedSource s1 := scope.Scope("source") c1 := beam.ParDo(s1, initCount, trigger) sr := scope.Scope("reshuffle") r := beam.Reshuffle(sr, c1) s2 := scope.Scope("count") beam.ParDo(s2, count, r) if err := beamx.Run(ctx, pipeline); err != nil { log.Fatalf("Failed to execute job: %v", err) } } ``` Basically, the problem is no longer about using flatten on unbounded sources (as suggested from the title), but something related to the following stage after PeriodicImpulse's SDF which doesn't stop (perform any checkpoint). The current implementation of PeriodicImpulse internal DoFn is that it will only stop when it catches up with the current time. With a very early start time, it will take a long time before the checkpoint happens, leading to a stuck pipeline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org