[ 
https://issues.apache.org/jira/browse/BEAM-14484?focusedWorklogId=772496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772496
 ]

ASF GitHub Bot logged work on BEAM-14484:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/22 15:21
            Start Date: 19/May/22 15:21
    Worklog Time Spent: 10m 
      Work Description: jrmccluskey commented on code in PR #17716:
URL: https://github.com/apache/beam/pull/17716#discussion_r877204263


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +378,25 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               for _, root := range ps {
+                       tracker, size, ok := getRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               continue
+                       }
+                       if !tracker.IsBounded() || size > 0.00001 {

Review Comment:
   I believe it _should_ be if we're here, but if the split code is implemented 
incorrectly for a given tracker it may not be. Better to check and make sure we 
aren't losing anything in the primary than assume that everything is exactly as 
we expect.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772496)
    Time Spent: 2h 20m  (was: 2h 10m)

> Improve error message surrounding primary returns in the self-checkpointing 
> code
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-14484
>                 URL: https://issues.apache.org/jira/browse/BEAM-14484
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Jack McCluskey
>            Priority: P1
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The error message in the Go SDK harness around returned primaries in the 
> self-checkpointing code 
> ([https://github.com/apache/beam/blob/ea1f292e9cf31fc8c4803b10d811f0d3ee184ae7/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L375)]
>  is unclear and should be made more explicit. It should also guide the user 
> towards making sure that the restriction behaves properly in the 
> self-checkpointing case. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to