damccorm commented on code in PR #17386:
URL: https://github.com/apache/beam/pull/17386#discussion_r853109594


##########
sdks/go/pkg/beam/core/runtime/harness/harness.go:
##########
@@ -401,11 +402,34 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                                c.plans[bdID] = append(c.plans[bdID], plan)
                        }
                }
+
+               // Check if the underlying DoFn self-checkpointed.
+               // TODO(BEAM-11104): How should a returned error here be 
handled to avoid clobbering
+               // an error that is returned after the mutex is given up?

Review Comment:
   Can we just store the error in a local variable `splitErr` and do an error 
check immediately below our current `err` check? In the case where both error, 
we almost certainly want to return the original error since that might be the 
cause of this one.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -320,6 +321,72 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, 
Count: c, pcol: pcol}
 }
 
+func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation {
+       if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok {
+               return u.continuation
+       }
+       return nil
+}
+
+// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. 
returned a
+// ProcessContinuation) and needs to be resumed later. If the underlying DoFn 
is not
+// splittable or has not returned a resuming continuation, the function 
returns an empty
+// SplitResult, a negative resumption time, and a false boolean to indicate 
that no split
+// occurred.
+func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
+       n.mu.Lock()
+       defer n.mu.Unlock()
+
+       pc := n.getProcessContinuation()
+       if pc == nil {
+               return SplitResult{}, -1 * time.Minute, false, nil
+       }
+       if !pc.ShouldResume() {

Review Comment:
   nit: This can be simplified to `if pc == nil || !pc.ShouldResume()`



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -320,6 +321,72 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, 
Count: c, pcol: pcol}
 }
 
+func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation {
+       if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok {
+               return u.continuation
+       }
+       return nil
+}
+
+// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. 
returned a
+// ProcessContinuation) and needs to be resumed later. If the underlying DoFn 
is not
+// splittable or has not returned a resuming continuation, the function 
returns an empty
+// SplitResult, a negative resumption time, and a false boolean to indicate 
that no split
+// occurred.
+func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
+       n.mu.Lock()
+       defer n.mu.Unlock()
+
+       pc := n.getProcessContinuation()
+       if pc == nil {
+               return SplitResult{}, -1 * time.Minute, false, nil
+       }
+       if !pc.ShouldResume() {
+               return SplitResult{}, -1 * time.Minute, false, nil
+       }
+
+       su := SplittableUnit(n.Out.(*ProcessSizedElementsAndRestrictions))
+
+       // Get the output watermark before splitting to avoid accidentally 
overestimating
+       ow := su.GetOutputWatermark()

Review Comment:
   Oh nice - I was planning on following up in a PR to do this 😄 technically 
the comment doesn't apply anymore though since there's no risk of 
overestimating when checkpointing because execution has been fully paused at 
this point.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -320,6 +321,72 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name, 
Count: c, pcol: pcol}
 }
 
+func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation {
+       if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok {
+               return u.continuation
+       }
+       return nil
+}
+
+// Checkpoint attempts to split an SDF that has self-checkpointed (e.g. 
returned a
+// ProcessContinuation) and needs to be resumed later. If the underlying DoFn 
is not
+// splittable or has not returned a resuming continuation, the function 
returns an empty
+// SplitResult, a negative resumption time, and a false boolean to indicate 
that no split
+// occurred.
+func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
+       n.mu.Lock()
+       defer n.mu.Unlock()
+
+       pc := n.getProcessContinuation()
+       if pc == nil {
+               return SplitResult{}, -1 * time.Minute, false, nil
+       }
+       if !pc.ShouldResume() {
+               return SplitResult{}, -1 * time.Minute, false, nil
+       }
+
+       su := SplittableUnit(n.Out.(*ProcessSizedElementsAndRestrictions))
+
+       // Get the output watermark before splitting to avoid accidentally 
overestimating
+       ow := su.GetOutputWatermark()
+
+       // Always split at fraction 0.0, should have no primaries left.
+       ps, rs, err := su.Split(0.0)
+       if err != nil {
+               return SplitResult{}, -1 * time.Minute, false, err
+       }
+       if ps != nil {
+               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want nil", ps)
+       }
+
+       wc := MakeWindowEncoder(n.Coder.Window)
+       ec := MakeElementEncoder(coder.SkipW(n.Coder))
+       encodeElms := func(fvs []*FullValue) ([][]byte, error) {
+               encElms := make([][]byte, len(fvs))
+               for i, fv := range fvs {
+                       enc, err := encodeElm(fv, wc, ec)
+                       if err != nil {
+                               return nil, err
+                       }
+                       encElms[i] = enc
+               }
+               return encElms, nil
+       }
+
+       rsEnc, err := encodeElms(rs)
+       if err != nil {
+               return SplitResult{}, -1 * time.Minute, false, err
+       }

Review Comment:
   This block is almost identical to the existing split block - consider 
refactoring to a shared function (you could just pass in an empty list of 
fullValues for `ps` in this case and ignore the `ps` result)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to