damccorm commented on code in PR #17386:
URL: https://github.com/apache/beam/pull/17386#discussion_r853109594
##########
sdks/go/pkg/beam/core/runtime/harness/harness.go:
##########
@@ -401,11 +402,34 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
c.plans[bdID] = append(c.plans[bdID], plan)
}
}
+
+ // Check if the underlying DoFn self-checkpointed.
+ // TODO(BEAM-11104): How should a returned error here be
handled to avoid clobbering
+ // an error that is returned after the mutex is given up?
Review Comment:
Can we just store the error in a local variable `splitErr` and do an error
check immediately below our current `err` check? In the case where both error,
we almost certainly want to return the original error since that might be the
cause of this one.
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -320,6 +321,72 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name,
Count: c, pcol: pcol}
}
+func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation {
+ if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok {
+ return u.continuation
+ }
+ return nil
+}
+
+// Checkpoint attempts to split an SDF that has self-checkpointed (e.g.
returned a
+// ProcessContinuation) and needs to be resumed later. If the underlying DoFn
is not
+// splittable or has not returned a resuming continuation, the function
returns an empty
+// SplitResult, a negative resumption time, and a false boolean to indicate
that no split
+// occurred.
+func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+
+ pc := n.getProcessContinuation()
+ if pc == nil {
+ return SplitResult{}, -1 * time.Minute, false, nil
+ }
+ if !pc.ShouldResume() {
Review Comment:
nit: This can be simplified to `if pc == nil || !pc.ShouldResume()`
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -320,6 +321,72 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name,
Count: c, pcol: pcol}
}
+func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation {
+ if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok {
+ return u.continuation
+ }
+ return nil
+}
+
+// Checkpoint attempts to split an SDF that has self-checkpointed (e.g.
returned a
+// ProcessContinuation) and needs to be resumed later. If the underlying DoFn
is not
+// splittable or has not returned a resuming continuation, the function
returns an empty
+// SplitResult, a negative resumption time, and a false boolean to indicate
that no split
+// occurred.
+func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+
+ pc := n.getProcessContinuation()
+ if pc == nil {
+ return SplitResult{}, -1 * time.Minute, false, nil
+ }
+ if !pc.ShouldResume() {
+ return SplitResult{}, -1 * time.Minute, false, nil
+ }
+
+ su := SplittableUnit(n.Out.(*ProcessSizedElementsAndRestrictions))
+
+ // Get the output watermark before splitting to avoid accidentally
overestimating
+ ow := su.GetOutputWatermark()
Review Comment:
Oh nice - I was planning on following up in a PR to do this 😄 technically
the comment doesn't apply anymore though since there's no risk of
overestimating when checkpointing because execution has been fully paused at
this point.
##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -320,6 +321,72 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
return ProgressReportSnapshot{ID: n.SID.PtransformID, Name: n.Name,
Count: c, pcol: pcol}
}
+func (n *DataSource) getProcessContinuation() sdf.ProcessContinuation {
+ if u, ok := n.Out.(*ProcessSizedElementsAndRestrictions); ok {
+ return u.continuation
+ }
+ return nil
+}
+
+// Checkpoint attempts to split an SDF that has self-checkpointed (e.g.
returned a
+// ProcessContinuation) and needs to be resumed later. If the underlying DoFn
is not
+// splittable or has not returned a resuming continuation, the function
returns an empty
+// SplitResult, a negative resumption time, and a false boolean to indicate
that no split
+// occurred.
+func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) {
+ n.mu.Lock()
+ defer n.mu.Unlock()
+
+ pc := n.getProcessContinuation()
+ if pc == nil {
+ return SplitResult{}, -1 * time.Minute, false, nil
+ }
+ if !pc.ShouldResume() {
+ return SplitResult{}, -1 * time.Minute, false, nil
+ }
+
+ su := SplittableUnit(n.Out.(*ProcessSizedElementsAndRestrictions))
+
+ // Get the output watermark before splitting to avoid accidentally
overestimating
+ ow := su.GetOutputWatermark()
+
+ // Always split at fraction 0.0, should have no primaries left.
+ ps, rs, err := su.Split(0.0)
+ if err != nil {
+ return SplitResult{}, -1 * time.Minute, false, err
+ }
+ if ps != nil {
+ return SplitResult{}, -1 * time.Minute, false,
fmt.Errorf("failed to checkpoint: got %v primary roots, want nil", ps)
+ }
+
+ wc := MakeWindowEncoder(n.Coder.Window)
+ ec := MakeElementEncoder(coder.SkipW(n.Coder))
+ encodeElms := func(fvs []*FullValue) ([][]byte, error) {
+ encElms := make([][]byte, len(fvs))
+ for i, fv := range fvs {
+ enc, err := encodeElm(fv, wc, ec)
+ if err != nil {
+ return nil, err
+ }
+ encElms[i] = enc
+ }
+ return encElms, nil
+ }
+
+ rsEnc, err := encodeElms(rs)
+ if err != nil {
+ return SplitResult{}, -1 * time.Minute, false, err
+ }
Review Comment:
This block is almost identical to the existing split block - consider
refactoring to a shared function (you could just pass in an empty list of
fullValues for `ps` in this case and ignore the `ps` result)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]