lostluck commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2074123583


##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
                        }
                        slog.Debug("progress report", "bundle", rb, "index", 
index, "prevIndex", previousIndex)
 
+                       var fraction float64
+
                        // Check if there has been any measurable progress by 
the input, or all output pcollections since last report.
                        slow := previousIndex == index["index"] && 
previousTotalCount == index["totalCount"]
+                       checkpointReady := checkpointTickCount >= 
checkpointTickCutoff
                        if slow && unsplit {
-                               slog.Debug("splitting report", "bundle", rb, 
"index", index)
-                               sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
-                               if err != nil {
-                                       slog.Warn("SDK Error from split, 
aborting splits", "bundle", rb, "error", err.Error())
-                                       break progress
-                               }
-                               if sr.GetChannelSplits() == nil {
-                                       slog.Debug("SDK returned no splits", 
"bundle", rb)
-                                       unsplit = false
-                                       continue progress
-                               }
+                               fraction = 0.5
+                       } else if checkpointReady && unsplit {
+                               // splitting on 0.0 fraction to make a 
checkpoint
+                               fraction = 0.0
+                               // reset tickCount after scheduling a checkpoint
+                               checkpointTickCount = 0
+                       } else {
+                               previousIndex = index["index"]
+                               previousTotalCount = index["totalCount"]
+                               continue progress
+                       }
 
-                               // TODO sort out rescheduling primary Roots on 
bundle failure.
-                               var residuals []engine.Residual
-                               for _, rr := range sr.GetResidualRoots() {
-                                       ba := rr.GetApplication()
-                                       residuals = append(residuals, 
engine.Residual{Element: ba.GetElement()})
-                                       if len(ba.GetElement()) == 0 {
-                                               slog.LogAttrs(context.TODO(), 
slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
-                                               panic("sdk returned empty 
residual application")
-                                       }
-                                       // TODO what happens to output 
watermarks on splits?
-                               }
-                               if len(sr.GetChannelSplits()) != 1 {
-                                       slog.Warn("received non-single channel 
split", "bundle", rb)
-                               }
-                               cs := sr.GetChannelSplits()[0]
-                               fr := cs.GetFirstResidualElement()
-                               // The first residual can be after the end of 
data, so filter out those cases.
-                               if b.EstimatedInputElements >= int(fr) {
-                                       b.EstimatedInputElements = int(fr) // 
Update the estimate for the next split.
-                                       // Split Residuals are returned right 
away for rescheduling.
-                                       em.ReturnResiduals(rb, int(fr), 
s.inputInfo, engine.Residuals{
-                                               Data: residuals,
-                                       })
+                       // Do the split (fraction > 0) or checkpoint (fraction 
== 0)
+                       slog.Debug("splitting report", "bundle", rb, "index", 
index)
+                       sr, err := b.Split(ctx, wk, fraction /* fraction of 
remainder */, nil /* allowed splits */)
+                       if err != nil {
+                               slog.Warn("SDK Error from split, aborting 
splits", "bundle", rb, "error", err.Error())
+                               break progress
+                       }
+                       if sr.GetChannelSplits() == nil {
+                               slog.Debug("SDK returned no splits", "bundle", 
rb)
+                               unsplit = false
+                               continue progress
+                       }
+                       // Save residual roots for checkpoint. After 
checkpointing is successful,
+                       // the bundle will be marked as finished and no 
residual roots will be

Review Comment:
   So residualRoots is set on line 260, but the ResigdualRoots is called again 
and processed and sent back to the EM on line 282.
   
   That variable is never un-set after processing for bundles.
   
   Then after the bundle finishes, the residual roots are only overridden If 
and only if the final bundle has residual roots already. Therefore the cached 
roots from the split response might be processed a second time, being sent to 
the EM as part of PersistBundle, which then also reschedules them.  So it may 
duplicate the residual data.
   
   IIUC the better fix is to handle the output watermark estimate in the 
em.ReturnResidual call. Right now it's only happening in PersistBundle.
   
   PersistBundle call: 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L905
 
   
   ReturnResiduals call: 
   
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1023
   
   --------
   Alternatively the MinOutputWatermarks for residuals is independent of the 
specific data, so it should also be valid to collect/update them with splits, 
but not *persist* them until the PersistBundle call. That matches closely what 
you have here (and what works), but without the duplicated data.
   
   MinOutputWatermarks are collected here: 
https://github.com/apache/beam/pull/34828/files#diff-c799dce79559a70660d7abb42fcbff8455ba41452bd9483fc5c58dfcf156ee8cR343
   
   Map is created just above currently: 
https://github.com/apache/beam/pull/34828/files#diff-c799dce79559a70660d7abb42fcbff8455ba41452bd9483fc5c58dfcf156ee8cR322
   
   So we'd just need to ensure we don't have "stale" watermarks being persisted 
for this holding things back by accident.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to