damccorm commented on code in PR #17716:
URL: https://github.com/apache/beam/pull/17716#discussion_r877146972


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {

Review Comment:
   Maybe prefer `getBoundableRTrackerFromRoot`? It was not immediately obvious 
that this wouldn't return an unbounded tracker from the call below (though your 
comment helped)



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {
+       tracker, ok := 
root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker)
+       if !ok {
+               return nil, -1.0, false
+       }
+       size, ok := root.Elm2.(float64)
+       if !ok {

Review Comment:
   What is this check doing? It doesn't _seem_ like it belongs in the function 
to get the rtracker, but I might just be missing something since I don't really 
understand why its there. Regardless, it might be helpful to outline the 
expected structure of the FullValue (like we do in 
[exec/sdf.go](https://github.com/apache/beam/blob/9eb86446eb4c609138e29ead4617331918e120f4/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L161))
 and/or leave a comment



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {
+       tracker, ok := 
root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker)
+       if !ok {
+               return nil, -1.0, false
+       }
+       size, ok := root.Elm2.(float64)
+       if !ok {

Review Comment:
   Ah - your test helped - this is the restriction size, right? Should we also 
check to see if its equal to 0 or will that never happen?



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +378,25 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               for _, root := range ps {
+                       tracker, size, ok := getRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               continue

Review Comment:
   Maybe we can warn here? I agree that we shouldn't stop a pipeline for this, 
but it is still the wrong behavior and it could be causing a sneaky bug if 
they're expecting some work to still be done in the primary.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +378,25 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               for _, root := range ps {
+                       tracker, size, ok := getRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               continue
+                       }
+                       if !tracker.IsBounded() || size > 0.00001 {
+                               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that 
the restriction tracker returns nil in TrySplit() when the split fraction is 
0.0", ps)

Review Comment:
   Nice - I like this error message



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -348,6 +348,18 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, float64, 
bool) {
+       tracker, ok := 
root.Elm.(*FullValue).Elm2.(*FullValue).Elm.(sdf.BoundableRTracker)
+       if !ok {
+               return nil, -1.0, false
+       }
+       size, ok := root.Elm2.(float64)
+       if !ok {

Review Comment:
   Actually, I'm still a little confused, it looks like the fact that its a 
float is significant? This is basically just the "help Danny understand how 
this all works thread" at this point 🙃 



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +378,25 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               for _, root := range ps {
+                       tracker, size, ok := getRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               continue
+                       }
+                       if !tracker.IsBounded() || size > 0.00001 {

Review Comment:
   Isn't the tracker guaranteed to be bounded if we've made it this far?
   
   I actually don't mind offloading this check here and returning all 
RTrackers, not sure which is cleaner without seeing them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to