[ 
https://issues.apache.org/jira/browse/BEAM-14484?focusedWorklogId=772926&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772926
 ]

ASF GitHub Bot logged work on BEAM-14484:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/May/22 17:14
            Start Date: 20/May/22 17:14
    Worklog Time Spent: 10m 
      Work Description: jrmccluskey commented on code in PR #17724:
URL: https://github.com/apache/beam/pull/17724#discussion_r878370897


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -647,6 +653,46 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f 
float64) ([]*FullValue, []
        return p, r, nil
 }
 
+// Checkpoint splits the remaining work in a restriction into residuals to be 
resumed
+// later by the runner. This is done iff the underlying Splittable DoFn 
returns a resuming
+// ProcessContinuation. If the split occurs and the primary restriction is 
marked as done
+// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
+func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue, 
error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}
+       rWeState = n.wesInv.Invoke(n.PDo.we)
+       pWeState = rWeState
+       // If we've processed elements, the initial watermark estimator state 
will be set.
+       // In that case we should hold the output watermark at that initial 
state so that we don't
+       // Advance past where the current elements are holding the watermark
+       if n.initWeS != nil {
+               pWeState = n.initWeS
+       }
+       addContext := func(err error) error {
+               return errors.WithContext(err, "Attempting checkpoint in 
ProcessSizedElementsAndRestrictions")
+       }
+
+       // Errors checking.
+       if n.rt == nil {
+               return nil, addContext(errors.New("Restriction tracker 
missing."))
+       }
+       if err := n.rt.GetError(); err != nil {
+               return nil, addContext(err)
+       }
+
+       _, r, err := n.singleWindowSplit(0.0, pWeState, rWeState)
+       if err != nil {
+               return nil, addContext(err)
+       }
+
+       if !n.rt.IsDone() {
+               return nil, addContext(errors.New("Primary restriction is not 
done, data may be lost as a result"))

Review Comment:
   The addContext function should address point 1, but I agree with point 2. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772926)
    Time Spent: 6h 40m  (was: 6.5h)

> Improve error message surrounding primary returns in the self-checkpointing 
> code
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-14484
>                 URL: https://issues.apache.org/jira/browse/BEAM-14484
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Jack McCluskey
>            Priority: P1
>          Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> The error message in the Go SDK harness around returned primaries in the 
> self-checkpointing code 
> ([https://github.com/apache/beam/blob/ea1f292e9cf31fc8c4803b10d811f0d3ee184ae7/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L375)]
>  is unclear and should be made more explicit. It should also guide the user 
> towards making sure that the restriction behaves properly in the 
> self-checkpointing case. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to