lostluck commented on code in PR #34828: URL: https://github.com/apache/beam/pull/34828#discussion_r2074219640
########## sdks/go/pkg/beam/runners/prism/internal/stage.go: ########## @@ -218,58 +223,75 @@ progress: } slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex) + var fraction float64 + // Check if there has been any measurable progress by the input, or all output pcollections since last report. slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"] + checkpointReady := checkpointTickCount >= checkpointTickCutoff if slow && unsplit { - slog.Debug("splitting report", "bundle", rb, "index", index) - sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) - if err != nil { - slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) - break progress - } - if sr.GetChannelSplits() == nil { - slog.Debug("SDK returned no splits", "bundle", rb) - unsplit = false - continue progress - } + fraction = 0.5 + } else if checkpointReady && unsplit { + // splitting on 0.0 fraction to make a checkpoint + fraction = 0.0 + // reset tickCount after scheduling a checkpoint + checkpointTickCount = 0 + } else { + previousIndex = index["index"] + previousTotalCount = index["totalCount"] + continue progress + } - // TODO sort out rescheduling primary Roots on bundle failure. - var residuals []engine.Residual - for _, rr := range sr.GetResidualRoots() { - ba := rr.GetApplication() - residuals = append(residuals, engine.Residual{Element: ba.GetElement()}) - if len(ba.GetElement()) == 0 { - slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb)) - panic("sdk returned empty residual application") - } - // TODO what happens to output watermarks on splits? - } - if len(sr.GetChannelSplits()) != 1 { - slog.Warn("received non-single channel split", "bundle", rb) - } - cs := sr.GetChannelSplits()[0] - fr := cs.GetFirstResidualElement() - // The first residual can be after the end of data, so filter out those cases. - if b.EstimatedInputElements >= int(fr) { - b.EstimatedInputElements = int(fr) // Update the estimate for the next split. - // Split Residuals are returned right away for rescheduling. - em.ReturnResiduals(rb, int(fr), s.inputInfo, engine.Residuals{ - Data: residuals, - }) + // Do the split (fraction > 0) or checkpoint (fraction == 0) + slog.Debug("splitting report", "bundle", rb, "index", index) + sr, err := b.Split(ctx, wk, fraction /* fraction of remainder */, nil /* allowed splits */) + if err != nil { + slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) + break progress + } + if sr.GetChannelSplits() == nil { + slog.Debug("SDK returned no splits", "bundle", rb) + unsplit = false + continue progress + } + // Save residual roots for checkpoint. After checkpointing is successful, + // the bundle will be marked as finished and no residual roots will be + // returned in ProcessBundleResponse. + if fraction == 0 { + residualRoots = sr.GetResidualRoots() + } + // TODO sort out rescheduling primary Roots on bundle failure. + var residuals []engine.Residual + for _, rr := range sr.GetResidualRoots() { + ba := rr.GetApplication() + residuals = append(residuals, engine.Residual{Element: ba.GetElement()}) + if len(ba.GetElement()) == 0 { + slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb)) + panic("sdk returned empty residual application") } + // TODO what happens to output watermarks on splits? Review Comment: The Estimated input elements bit is tricky since it's about how to estimate where to split for Unbounded SDFs and how big a bundle "is". I can't recall exactly why it ended up with the "filter out residuals that are before the end of data", cases... but apparently it had to do with timers? https://github.com/apache/beam/blame/master/sdks/go/pkg/beam/runners/prism/internal/stage.go#L253 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org