lostluck commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2072742789


##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
 
+                       var fraction float64
+
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       checkpointReady := checkpointTickCount >= 
checkpointTickCutoff
                        if slow && unsplit {
-                               slog.Debug("splitting report", "bundle", rb, 
"index", index)
-                               sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
-                               if err != nil {
-                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
-                                       break progress
-                               }
-                               if sr.GetChannelSplits() == nil {
-                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       unsplit = false
-                                       continue progress
-                               }
+                               fraction = 0.5
+                       } else if checkpointReady && unsplit {

Review Comment:
   I guess I'm worried about the situation where we're just oversplitting and 
forcing a checkpoint on a perfectly fine running Batch DoFn (processing 
efficiently and per element fast).
   
   As it stands, this code looks like it will checkpoint a fast moving bundle 
after 1 second. (10 ticks, at 100ms per tick, since a fast moving bundle won't 
get a slower progress request rate).
   
   But that's sort of wasteful. A fast moving bundle shouldn't be stopped. We 
might just want to only do this if the bundle is moving fast, but the input 
index isn't moving? Perhaps this is a reason why Dataflow has explicit Batch 
and Streaming modes of execution.
   
   One almost wants to do it based on the number or amount of output data 
instead, in order to allow the watermark to progress.... But that would be much 
harder, and is overthinking it for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to