[ 
https://issues.apache.org/jira/browse/BEAM-14484?focusedWorklogId=772905&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772905
 ]

ASF GitHub Bot logged work on BEAM-14484:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/May/22 16:47
            Start Date: 20/May/22 16:47
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17724:
URL: https://github.com/apache/beam/pull/17724#discussion_r878339972


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -647,6 +653,46 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f 
float64) ([]*FullValue, []
        return p, r, nil
 }
 
+// Checkpoint splits the remaining work in a restriction into residuals to be 
resumed
+// later by the runner. This is done iff the underlying Splittable DoFn 
returns a resuming
+// ProcessContinuation. If the split occurs and the primary restriction is 
marked as done
+// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
+func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue, 
error) {

Review Comment:
   Could you please add some test cases? It should be fairly straightforward to 
model them off of the existing Split tests



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -647,6 +653,46 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f 
float64) ([]*FullValue, []
        return p, r, nil
 }
 
+// Checkpoint splits the remaining work in a restriction into residuals to be 
resumed
+// later by the runner. This is done iff the underlying Splittable DoFn 
returns a resuming
+// ProcessContinuation. If the split occurs and the primary restriction is 
marked as done
+// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
+func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue, 
error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}
+       rWeState = n.wesInv.Invoke(n.PDo.we)
+       pWeState = rWeState
+       // If we've processed elements, the initial watermark estimator state 
will be set.
+       // In that case we should hold the output watermark at that initial 
state so that we don't
+       // Advance past where the current elements are holding the watermark
+       if n.initWeS != nil {
+               pWeState = n.initWeS
+       }
+       addContext := func(err error) error {
+               return errors.WithContext(err, "Attempting checkpoint in 
ProcessSizedElementsAndRestrictions")
+       }
+
+       // Errors checking.
+       if n.rt == nil {
+               return nil, addContext(errors.New("Restriction tracker 
missing."))
+       }
+       if err := n.rt.GetError(); err != nil {
+               return nil, addContext(err)
+       }
+
+       _, r, err := n.singleWindowSplit(0.0, pWeState, rWeState)
+       if err != nil {
+               return nil, addContext(err)
+       }

Review Comment:
   
   ```suggestion
        _, r, err := n.Split(0.0)
   ```
   
   I like moving this into its own function in sdf.go, but that doesn't mean we 
can't still leverage Split



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -647,6 +653,46 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f 
float64) ([]*FullValue, []
        return p, r, nil
 }
 
+// Checkpoint splits the remaining work in a restriction into residuals to be 
resumed
+// later by the runner. This is done iff the underlying Splittable DoFn 
returns a resuming
+// ProcessContinuation. If the split occurs and the primary restriction is 
marked as done
+// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
+func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue, 
error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}
+       rWeState = n.wesInv.Invoke(n.PDo.we)
+       pWeState = rWeState
+       // If we've processed elements, the initial watermark estimator state 
will be set.
+       // In that case we should hold the output watermark at that initial 
state so that we don't
+       // Advance past where the current elements are holding the watermark
+       if n.initWeS != nil {
+               pWeState = n.initWeS
+       }
+       addContext := func(err error) error {
+               return errors.WithContext(err, "Attempting checkpoint in 
ProcessSizedElementsAndRestrictions")
+       }
+
+       // Errors checking.
+       if n.rt == nil {
+               return nil, addContext(errors.New("Restriction tracker 
missing."))
+       }
+       if err := n.rt.GetError(); err != nil {
+               return nil, addContext(err)
+       }
+
+       _, r, err := n.singleWindowSplit(0.0, pWeState, rWeState)
+       if err != nil {
+               return nil, addContext(err)
+       }
+
+       if !n.rt.IsDone() {
+               return nil, addContext(errors.New("Primary restriction is not 
done, data may be lost as a result"))

Review Comment:
   We should still provide more context here - important pieces of information 
are:
   
   1) This happened during a self checkpoint
   2) That is probably a problem with their TrySplit logic - they should never 
have a restriction that isn't done after splitting at 0.0
   
   Also, because this is an error, it will kill pipeline execution (so data 
loss isn't really a concern)



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -647,6 +653,46 @@ func (n *ProcessSizedElementsAndRestrictions) Split(f 
float64) ([]*FullValue, []
        return p, r, nil
 }
 
+// Checkpoint splits the remaining work in a restriction into residuals to be 
resumed
+// later by the runner. This is done iff the underlying Splittable DoFn 
returns a resuming
+// ProcessContinuation. If the split occurs and the primary restriction is 
marked as done
+// my the RTracker, the Checkpoint fails as this is a potential data-loss case.
+func (n *ProcessSizedElementsAndRestrictions) Checkpoint() ([]*FullValue, 
error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}
+       rWeState = n.wesInv.Invoke(n.PDo.we)
+       pWeState = rWeState
+       // If we've processed elements, the initial watermark estimator state 
will be set.
+       // In that case we should hold the output watermark at that initial 
state so that we don't
+       // Advance past where the current elements are holding the watermark
+       if n.initWeS != nil {
+               pWeState = n.initWeS
+       }

Review Comment:
   If you do keep this and don't rely on split, you can get rid of all the 
pWeState logic and just set it to nil since we're discarding the primary 
watermark anyways



##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -73,8 +73,8 @@ type RTracker interface {
        // reason), then this function returns nil as the residual.
        //
        // If the split fraction is 0 (e.g. a self-checkpointing split) 
TrySplit() should return either
-       // a nil primary or an RTracker that is both bounded and has size 0. 
This ensures that there is
-       // no data that is lost by not being rescheduled for execution later.
+       // a nil primary or a restriction that represents no remaining work. 
This will ensure that there
+       // is not data loss.

Review Comment:
   Really we don't care about the primary that they return, right? The thing we 
really care about is that they've correctly set their RTracker such that 
IsDone() returns true.
   
   Also, the current consequence isn't data loss, its a failed pipeline I think





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772905)
    Time Spent: 6.5h  (was: 6h 20m)

> Improve error message surrounding primary returns in the self-checkpointing 
> code
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-14484
>                 URL: https://issues.apache.org/jira/browse/BEAM-14484
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Jack McCluskey
>            Priority: P1
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> The error message in the Go SDK harness around returned primaries in the 
> self-checkpointing code 
> ([https://github.com/apache/beam/blob/ea1f292e9cf31fc8c4803b10d811f0d3ee184ae7/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L375)]
>  is unclear and should be made more explicit. It should also guide the user 
> towards making sure that the restriction behaves properly in the 
> self-checkpointing case. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to