shunping commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2072763240
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
}
slog.Debug("progress report", "bundle", rb, "index",
index, "prevIndex", previousIndex)
+ var fraction float64
+
// Check if there has been any measurable progress by
the input, or all output pcollections since last report.
slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
+ checkpointReady := checkpointTickCount >=
checkpointTickCutoff
if slow && unsplit {
- slog.Debug("splitting report", "bundle", rb,
"index", index)
- sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
- if err != nil {
- slog.Warn("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
- break progress
- }
- if sr.GetChannelSplits() == nil {
- slog.Debug("SDK returned no splits",
"bundle", rb)
- unsplit = false
- continue progress
- }
+ fraction = 0.5
+ } else if checkpointReady && unsplit {
Review Comment:
After we have bundle retrying implemented, we can adjust the checkpoint
ticks to longer or shorter based on how often we see an error in the bundle,
how long does it take to check point, etc.
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
}
slog.Debug("progress report", "bundle", rb, "index",
index, "prevIndex", previousIndex)
+ var fraction float64
+
// Check if there has been any measurable progress by
the input, or all output pcollections since last report.
slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
+ checkpointReady := checkpointTickCount >=
checkpointTickCutoff
if slow && unsplit {
- slog.Debug("splitting report", "bundle", rb,
"index", index)
- sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
- if err != nil {
- slog.Warn("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
- break progress
- }
- if sr.GetChannelSplits() == nil {
- slog.Debug("SDK returned no splits",
"bundle", rb)
- unsplit = false
- continue progress
- }
+ fraction = 0.5
+ } else if checkpointReady && unsplit {
Review Comment:
After we have bundle retrying implemented, we can adjust the threshold of
checkpoint ticks to longer or shorter based on how often we see an error in the
bundle, how long does it take to check point, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]