shunping commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2072763240


##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
 
+                       var fraction float64
+
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       checkpointReady := checkpointTickCount >= 
checkpointTickCutoff
                        if slow && unsplit {
-                               slog.Debug("splitting report", "bundle", rb, 
"index", index)
-                               sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
-                               if err != nil {
-                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
-                                       break progress
-                               }
-                               if sr.GetChannelSplits() == nil {
-                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       unsplit = false
-                                       continue progress
-                               }
+                               fraction = 0.5
+                       } else if checkpointReady && unsplit {

Review Comment:
   After we have bundle retrying implemented, we can adjust the checkpoint 
ticks to longer or shorter based on how often we see an error in the bundle, 
how long does it take to check point, etc.



##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
 
+                       var fraction float64
+
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       checkpointReady := checkpointTickCount >= 
checkpointTickCutoff
                        if slow && unsplit {
-                               slog.Debug("splitting report", "bundle", rb, 
"index", index)
-                               sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
-                               if err != nil {
-                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
-                                       break progress
-                               }
-                               if sr.GetChannelSplits() == nil {
-                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       unsplit = false
-                                       continue progress
-                               }
+                               fraction = 0.5
+                       } else if checkpointReady && unsplit {

Review Comment:
   After we have bundle retrying implemented, we can adjust the threshold of 
checkpoint ticks to longer or shorter based on how often we see an error in the 
bundle, how long does it take to check point, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to