[ 
https://issues.apache.org/jira/browse/BEAM-14484?focusedWorklogId=772489&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772489
 ]

ASF GitHub Bot logged work on BEAM-14484:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/May/22 15:03
            Start Date: 19/May/22 15:03
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17716:
URL: https://github.com/apache/beam/pull/17716#discussion_r877146972


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {

Review Comment:
   Maybe prefer `getBoundableRTrackerFromRoot`? It was not immediately obvious 
that this wouldn't return an unbounded tracker from the call below (though your 
comment helped)



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {
+       tracker, ok := 
root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker)
+       if !ok {
+               return nil, -1.0, false
+       }
+       size, ok := root.Elm2.(float64)
+       if !ok {

Review Comment:
   What is this check doing? It doesn't _seem_ like it belongs in the function 
to get the rtracker, but I might just be missing something since I don't really 
understand why its there. Regardless, it might be helpful to outline the 
expected structure of the FullValue (like we do in 
[exec/sdf.go](https://github.com/apache/beam/blob/9eb86446eb4c609138e29ead4617331918e120f4/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L161))
 and/or leave a comment



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {
+       tracker, ok := 
root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker)
+       if !ok {
+               return nil, -1.0, false
+       }
+       size, ok := root.Elm2.(float64)
+       if !ok {

Review Comment:
   Ah - your test helped - this is the restriction size, right? Should we also 
check to see if its equal to 0 or will that never happen?



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +378,25 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               for _, root := range ps {
+                       tracker, size, ok := getRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               continue

Review Comment:
   Maybe we can warn here? I agree that we shouldn't stop a pipeline for this, 
but it is still the wrong behavior and it could be causing a sneaky bug if 
they're expecting some work to still be done in the primary.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +378,25 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               for _, root := range ps {
+                       tracker, size, ok := getRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               continue
+                       }
+                       if !tracker.IsBounded() || size > 0.00001 {
+                               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that 
the restriction tracker returns nil in TrySplit() when the split fraction is 
0.0", ps)

Review Comment:
   Nice - I like this error message



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {
+       tracker, ok := 
root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker)
+       if !ok {
+               return nil, -1.0, false
+       }
+       size, ok := root.Elm2.(float64)
+       if !ok {

Review Comment:
   Actually, I'm still a little confused, it looks like the fact that its a 
float is significant? This is basically just the "help Danny understand how 
this all works thread" at this point 🙃 



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +378,25 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               for _, root := range ps {
+                       tracker, size, ok := getRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               continue
+                       }
+                       if !tracker.IsBounded() || size > 0.00001 {

Review Comment:
   Isn't the tracker guaranteed to be bounded if we've made it this far?
   
   I actually don't mind offloading this check here and returning all 
RTrackers, not sure which is cleaner without seeing them





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772489)
    Time Spent: 1h 20m  (was: 1h 10m)

> Improve error message surrounding primary returns in the self-checkpointing 
> code
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-14484
>                 URL: https://issues.apache.org/jira/browse/BEAM-14484
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Jack McCluskey
>            Assignee: Jack McCluskey
>            Priority: P1
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The error message in the Go SDK harness around returned primaries in the 
> self-checkpointing code 
> ([https://github.com/apache/beam/blob/ea1f292e9cf31fc8c4803b10d811f0d3ee184ae7/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L375)]
>  is unclear and should be made more explicit. It should also guide the user 
> towards making sure that the restriction behaves properly in the 
> self-checkpointing case. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to