lostluck commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2072731387


##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
 
+                       var fraction float64
+
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       checkpointReady := checkpointTickCount >= 
checkpointTickCutoff
                        if slow && unsplit {
-                               slog.Debug("splitting report", "bundle", rb, 
"index", index)
-                               sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
-                               if err != nil {
-                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
-                                       break progress
-                               }
-                               if sr.GetChannelSplits() == nil {
-                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       unsplit = false
-                                       continue progress
-                               }
+                               fraction = 0.5
+                       } else if checkpointReady && unsplit {

Review Comment:
   My intuition is telling me that we should not to add it for the bounded 
case. But the essence of Beam is to unify batch and streaming so it's probably 
fine.
   



##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
 
+                       var fraction float64
+
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       checkpointReady := checkpointTickCount >= 
checkpointTickCutoff
                        if slow && unsplit {
-                               slog.Debug("splitting report", "bundle", rb, 
"index", index)
-                               sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
-                               if err != nil {
-                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
-                                       break progress
-                               }
-                               if sr.GetChannelSplits() == nil {
-                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       unsplit = false
-                                       continue progress
-                               }
+                               fraction = 0.5
+                       } else if checkpointReady && unsplit {
+                               // splitting on 0.0 fraction to make a 
checkpoint
+                               fraction = 0.0
+                               // reset tickCount after scheduling a checkpoint
+                               checkpointTickCount = 0
+                       } else {
+                               previousIndex = index["index"]
+                               previousTotalCount = index["totalCount"]
+                               continue progress
+                       }
 
-                               // TODO sort out rescheduling primary Roots on 
bundle failure.
-                               var residuals []engine.Residual
-                               for _, rr := range sr.GetResidualRoots() {
-                                       ba := rr.GetApplication()
-                                       residuals = append(residuals, 
engine.Residual{Element: ba.GetElement()})
-                                       if len(ba.GetElement()) == 0 {
-                                               slog.LogAttrs(context.TODO(), 
slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
-                                               panic("sdk returned empty 
residual application")
-                                       }
-                                       // TODO what happens to output 
watermarks on splits?
-                               }
-                               if len(sr.GetChannelSplits()) != 1 {
-                                       slog.Warn("received non-single channel 
split", "bundle", rb)
-                               }
-                               cs := sr.GetChannelSplits()[0]
-                               fr := cs.GetFirstResidualElement()
-                               // The first residual can be after the end of 
data, so filter out those cases.
-                               if b.EstimatedInputElements >= int(fr) {
-                                       b.EstimatedInputElements = int(fr) // 
Update the estimate for the next split.
-                                       // Split Residuals are returned right 
away for rescheduling.
-                                       em.ReturnResiduals(rb, int(fr), 
s.inputInfo, engine.Residuals{
-                                               Data: residuals,
-                                       })
+                       // Do the split (fraction > 0) or checkpoint (fraction 
== 0)
+                       slog.Debug("splitting report", "bundle", rb, "index", 
index)
+                       sr, err := b.Split(ctx, wk, fraction /* fraction of 
remainder */, nil /* allowed splits */)
+                       if err != nil {
+                               slog.Warn("SDK Error from split, aborting 
splits", "bundle", rb, "error", err.Error())
+                               break progress
+                       }
+                       if sr.GetChannelSplits() == nil {
+                               slog.Debug("SDK returned no splits", "bundle", 
rb)
+                               unsplit = false
+                               continue progress
+                       }
+                       // Save residual roots for checkpoint. After 
checkpointing is successful,
+                       // the bundle will be marked as finished and no 
residual roots will be

Review Comment:
   It is expected and by design. 
   
   Were you seeing errors due to returning the residuals early in the split 
response for checkpointing case?
   
   There are a few different cases to think about, but they're aligned with the 
two FnAPI calls in question.
   
   1. Normal ProcessBundleResponse: Returns when the primary is completed, 
there are no residuals to worry about.
   
   2. Split Response + ProcessBundleResponse:
    The Split Response contains the confirmation of the primary (what the 
bundle will finish processing), and the residual that needs to be processed 
later. ProcessBundleResponse will not contain any residuals at this time, since 
they were already persisted by the split response (per the above).
    
   3. Self Checkpointed ProcessBundleResponse: This is when the DoFn itself 
returns a process continuation for a specific element (eg. Resume in 10s or 
similar). The Primary is by definition completed, but there may be residuals to 
process later. That's what's returned and scheduled. 
   
   You're seeing 2 in this case. We shouldn't need to do any additional 
residual handling and processing after the bundle is finished here. I'd be a 
bit concerned that there is a data duplication risk when doing it this way (the 
same residuals getting "returned" twice.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to