jrmccluskey commented on code in PR #17724:
URL: https://github.com/apache/beam/pull/17724#discussion_r878370897
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -647,6 +653,46 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f
float64) ([]*FullValue, []
return p, r, nil
}
+// Checkpoint splits the remaining work in a restriction into residuals to be
resumed
+// later by the runner. This is done iff the underlying Splittable DoFn
returns a resuming
+// ProcessContinuation. If the split occurs and the primary restriction is
marked as done
+// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
+func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue,
error) {
+ // Get the watermark state immediately so that we don't overestimate
our current watermark.
+ var pWeState interface{}
+ var rWeState interface{}
+ rWeState = n.wesInv.Invoke(n.PDo.we)
+ pWeState = rWeState
+ // If we've processed elements, the initial watermark estimator state
will be set.
+ // In that case we should hold the output watermark at that initial
state so that we don't
+ // Advance past where the current elements are holding the watermark
+ if n.initWeS != nil {
+ pWeState = n.initWeS
+ }
+ addContext := func(err error) error {
+ return errors.WithContext(err, "Attempting checkpoint in
ProcessSizedElementsAndRestrictions")
+ }
+
+ // Errors checking.
+ if n.rt == nil {
+ return nil, addContext(errors.New("Restriction tracker
missing."))
+ }
+ if err := n.rt.GetError(); err != nil {
+ return nil, addContext(err)
+ }
+
+ _, r, err := n.singleWindowSplit(0.0, pWeState, rWeState)
+ if err != nil {
+ return nil, addContext(err)
+ }
+
+ if !n.rt.IsDone() {
+ return nil, addContext(errors.New("Primary restriction is not
done, data may be lost as a result"))
Review Comment:
The addContext function should address point 1, but I agree with point 2.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]