shunping commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2072762099


##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
 
+                       var fraction float64
+
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       checkpointReady := checkpointTickCount >= 
checkpointTickCutoff
                        if slow && unsplit {
-                               slog.Debug("splitting report", "bundle", rb, 
"index", index)
-                               sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
-                               if err != nil {
-                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
-                                       break progress
-                               }
-                               if sr.GetChannelSplits() == nil {
-                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       unsplit = false
-                                       continue progress
-                               }
+                               fraction = 0.5
+                       } else if checkpointReady && unsplit {

Review Comment:
   Checkpointing is always a trade-off. In theory, we don't want to checkpoint 
too often to hurt performance, while we also want to checkpoint sufficiently 
enough so the hard work can be materialized and saved.
   
   > A fast moving bundle shouldn't be stopped. 
   
   I think we can consider using the checkpoint ticks AND the amount or rate of 
output data ("totalCount") as the criteria to identify a fast-moving bundle 
(thousands of events per tick) that lasts reasonably long. Instead of 1 second, 
we can change it to 10 (or even longer) seconds for example. 
   
   Even if it is fast moving, we may still want to checkpoint to make sure we 
don't need to repeat the previous 10-second work if something goes bad.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to