shunping commented on code in PR #34828: URL: https://github.com/apache/beam/pull/34828#discussion_r2072316949
########## sdks/go/pkg/beam/runners/prism/internal/stage.go: ########## @@ -218,58 +223,75 @@ progress: } slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex) + var fraction float64 + // Check if there has been any measurable progress by the input, or all output pcollections since last report. slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"] + checkpointReady := checkpointTickCount >= checkpointTickCutoff if slow && unsplit { - slog.Debug("splitting report", "bundle", rb, "index", index) - sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) - if err != nil { - slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) - break progress - } - if sr.GetChannelSplits() == nil { - slog.Debug("SDK returned no splits", "bundle", rb) - unsplit = false - continue progress - } + fraction = 0.5 + } else if checkpointReady && unsplit { Review Comment: I am wondering if we should do this checkpointing for both bounded and unbounded cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org