lostluck commented on code in PR #34828: URL: https://github.com/apache/beam/pull/34828#discussion_r2072731387
########## sdks/go/pkg/beam/runners/prism/internal/stage.go: ########## @@ -218,58 +223,75 @@ progress: } slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex) + var fraction float64 + // Check if there has been any measurable progress by the input, or all output pcollections since last report. slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"] + checkpointReady := checkpointTickCount >= checkpointTickCutoff if slow && unsplit { - slog.Debug("splitting report", "bundle", rb, "index", index) - sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) - if err != nil { - slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) - break progress - } - if sr.GetChannelSplits() == nil { - slog.Debug("SDK returned no splits", "bundle", rb) - unsplit = false - continue progress - } + fraction = 0.5 + } else if checkpointReady && unsplit { Review Comment: My intuition is telling me that we should not to add it for the bounded case. But the essence of Beam is to unify batch and streaming so it's probably fine. ########## sdks/go/pkg/beam/runners/prism/internal/stage.go: ########## @@ -218,58 +223,75 @@ progress: } slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex) + var fraction float64 + // Check if there has been any measurable progress by the input, or all output pcollections since last report. slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"] + checkpointReady := checkpointTickCount >= checkpointTickCutoff if slow && unsplit { - slog.Debug("splitting report", "bundle", rb, "index", index) - sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) - if err != nil { - slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) - break progress - } - if sr.GetChannelSplits() == nil { - slog.Debug("SDK returned no splits", "bundle", rb) - unsplit = false - continue progress - } + fraction = 0.5 + } else if checkpointReady && unsplit { + // splitting on 0.0 fraction to make a checkpoint + fraction = 0.0 + // reset tickCount after scheduling a checkpoint + checkpointTickCount = 0 + } else { + previousIndex = index["index"] + previousTotalCount = index["totalCount"] + continue progress + } - // TODO sort out rescheduling primary Roots on bundle failure. - var residuals []engine.Residual - for _, rr := range sr.GetResidualRoots() { - ba := rr.GetApplication() - residuals = append(residuals, engine.Residual{Element: ba.GetElement()}) - if len(ba.GetElement()) == 0 { - slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb)) - panic("sdk returned empty residual application") - } - // TODO what happens to output watermarks on splits? - } - if len(sr.GetChannelSplits()) != 1 { - slog.Warn("received non-single channel split", "bundle", rb) - } - cs := sr.GetChannelSplits()[0] - fr := cs.GetFirstResidualElement() - // The first residual can be after the end of data, so filter out those cases. - if b.EstimatedInputElements >= int(fr) { - b.EstimatedInputElements = int(fr) // Update the estimate for the next split. - // Split Residuals are returned right away for rescheduling. - em.ReturnResiduals(rb, int(fr), s.inputInfo, engine.Residuals{ - Data: residuals, - }) + // Do the split (fraction > 0) or checkpoint (fraction == 0) + slog.Debug("splitting report", "bundle", rb, "index", index) + sr, err := b.Split(ctx, wk, fraction /* fraction of remainder */, nil /* allowed splits */) + if err != nil { + slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) + break progress + } + if sr.GetChannelSplits() == nil { + slog.Debug("SDK returned no splits", "bundle", rb) + unsplit = false + continue progress + } + // Save residual roots for checkpoint. After checkpointing is successful, + // the bundle will be marked as finished and no residual roots will be Review Comment: It is expected and by design. Were you seeing errors due to returning the residuals early in the split response for checkpointing case? There are a few different cases to think about, but they're aligned with the two FnAPI calls in question. 1. Normal ProcessBundleResponse: Returns when the primary is completed, there are no residuals to worry about. 2. Split Response + ProcessBundleResponse: The Split Response contains the confirmation of the primary (what the bundle will finish processing), and the residual that needs to be processed later. ProcessBundleResponse will not contain any residuals at this time, since they were already persisted by the split response (per the above). 3. Self Checkpointed ProcessBundleResponse: This is when the DoFn itself returns a process continuation for a specific element (eg. Resume in 10s or similar). The Primary is by definition completed, but there may be residuals to process later. That's what's returned and scheduled. You're seeing 2 in this case. We shouldn't need to do any additional residual handling and processing after the bundle is finished here. I'd be a bit concerned that there is a data duplication risk when doing it this way (the same residuals getting "returned" twice.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org