This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f1980dc917c [BEAM-14484] Improve behavior surrounding primary roots in 
self-checkpointing (#17716)
f1980dc917c is described below

commit f1980dc917c939a9c8178de62daeda1405032701
Author: Jack McCluskey <[email protected]>
AuthorDate: Thu May 19 18:37:10 2022 -0400

    [BEAM-14484] Improve behavior surrounding primary roots in 
self-checkpointing (#17716)
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   | 43 +++++++++++++-
 .../pkg/beam/core/runtime/exec/datasource_test.go  | 69 ++++++++++++++++++++++
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  1 +
 sdks/go/pkg/beam/core/sdf/sdf.go                   |  4 ++
 sdks/go/pkg/beam/core/sdf/wrappedbounded.go        | 34 +++++++++++
 .../test/integration/primitives/checkpointing.go   | 16 ++++-
 6 files changed, 162 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index e47fee00c96..8c39aeaed41 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
 )
 
 // DataSource is a Root execution unit.
@@ -348,6 +349,27 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) 
([][]byte, error) {
        return encodeElms
 }
 
+func getBoundedRTrackerFromRoot(root *FullValue) (sdf.BoundableRTracker, 
float64, bool) {
+       tElm := root.Elm.(*FullValue).Elm2.(*FullValue).Elm
+       tracker, ok := tElm.(sdf.RTracker)
+       if !ok {
+               log.Warnf(context.Background(), "expected type sdf.RTracker, 
got type %T", tElm)
+               return nil, -1.0, false
+       }
+       boundTracker, ok := tracker.(sdf.BoundableRTracker)
+       if !ok {
+               log.Warn(context.Background(), "expected type 
sdf.BoundableRTracker; ensure that the RTracker implements IsBounded()")
+               // Assume an RTracker that does not implement IsBounded() will 
always be bounded, wrap so it can be used.
+               boundTracker = sdf.NewWrappedTracker(tracker)
+       }
+       size, ok := root.Elm2.(float64)
+       if !ok {
+               log.Warnf(context.Background(), "expected size to be type 
float64, got type %T", root.Elm2)
+               return nil, -1.0, false
+       }
+       return boundTracker, size, true
+}
+
 // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. 
returned a
 // ProcessContinuation) and needs to be resumed later. If the underlying DoFn 
is not
 // splittable or has not returned a resuming continuation, the function 
returns an empty
@@ -366,13 +388,30 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
+       if len(rs) == 0 {
+               return SplitResult{}, -1 * time.Minute, false, nil
+       }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               // Expected structure of the root FullValue is KV<KV<Elm, 
KV<BoundedRTracker, watermarkEstimatorState>>, Size>
+               for _, root := range ps {
+                       tracker, size, ok := getBoundedRTrackerFromRoot(root)
+                       // If type assertion didn't return a BoundableRTracker, 
we move on.
+                       if !ok {
+                               log.Warnf(context.Background(), "got unexpected 
primary root contents %v, please check the output of the restriction tracker's 
TrySplit() function", root)
+                               continue
+                       }
+                       if !tracker.IsBounded() || size > 0.00001 {
+                               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %#v primary roots, want none. Ensure that 
the restriction tracker returns nil in TrySplit() when the split fraction is 
0.0", ps)
+                       }
+               }
+
        }
 
        encodeElms := n.makeEncodeElms()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index bf367ae1cb4..365cf52062f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
        "google.golang.org/protobuf/types/known/timestamppb"
 )
 
@@ -914,3 +915,71 @@ func validateSource(t *testing.T, out *CaptureNode, source 
*DataSource, expected
                t.Errorf("DataSource => %#v, want %#v", 
extractValues(out.Elements...), extractValues(expected...))
        }
 }
+
+func constructRootFullValue(rt, size interface{}) *FullValue {
+       return &FullValue{
+               Elm: &FullValue{
+                       Elm2: &FullValue{
+                               Elm: rt,
+                       },
+               },
+               Elm2: size,
+       }
+}
+
+func TestGetRTrackerFromRoot(t *testing.T) {
+       var tests = []struct {
+               name    string
+               inRt    interface{}
+               inSize  interface{}
+               valid   bool
+               expSize float64
+       }{
+               {
+                       "valid",
+                       offsetrange.NewTracker(offsetrange.Restriction{Start: 
int64(0), End: int64(1)}),
+                       1.0,
+                       true,
+                       1.0,
+               },
+               {
+                       "not a bounded rtracker",
+                       int64(42),
+                       1.0,
+                       false,
+                       -1.0,
+               },
+               {
+                       "non-float size",
+                       offsetrange.NewTracker(offsetrange.Restriction{Start: 
int64(0), End: int64(1)}),
+                       int64(1),
+                       false,
+                       -1.0,
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       root := constructRootFullValue(test.inRt, test.inSize)
+                       tracker, size, ok := getBoundedRTrackerFromRoot(root)
+
+                       if test.valid {
+                               if !ok {
+                                       t.Fatalf("failed to get tracker and 
size from root")
+                               }
+                               if tracker == nil {
+                                       t.Errorf("got nil tracker, expected 
%#v", test.inRt)
+                               }
+                       } else {
+                               if ok {
+                                       t.Errorf("invalid root returned ok")
+                               }
+                               if tracker != nil {
+                                       t.Errorf("got tracker %#v, want nil", 
tracker)
+                               }
+                       }
+                       if !floatEquals(test.expSize, size, 0.001) {
+                               t.Errorf("got size %f, want %f", size, 
test.inSize)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index d5c7a31b3e4..7b777771a8c 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -68,6 +68,7 @@ const (
 
        URNRequiresSplittableDoFn     = 
"beam:requirement:pardo:splittable_dofn:v1"
        URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
+       URNTruncate                   = 
"beam:transform:sdf_truncate_sized_restrictions_v1"
 
        // Deprecated: Determine worker binary based on GoWorkerBinary Role 
instead.
        URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
diff --git a/sdks/go/pkg/beam/core/sdf/sdf.go b/sdks/go/pkg/beam/core/sdf/sdf.go
index 2876d5985a2..9812d300e5b 100644
--- a/sdks/go/pkg/beam/core/sdf/sdf.go
+++ b/sdks/go/pkg/beam/core/sdf/sdf.go
@@ -72,6 +72,10 @@ type RTracker interface {
        // the only split point is the end of the restriction, or the split 
failed for some recoverable
        // reason), then this function returns nil as the residual.
        //
+       // If the split fraction is 0 (e.g. a self-checkpointing split) 
TrySplit() should return either
+       // a nil primary or an RTracker that is both bounded and has size 0. 
This ensures that there is
+       // no data that is lost by not being rescheduled for execution later.
+       //
        // If an error is returned, some catastrophic failure occurred and the 
entire bundle will fail.
        TrySplit(fraction float64) (primary, residual interface{}, err error)
 
diff --git a/sdks/go/pkg/beam/core/sdf/wrappedbounded.go 
b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go
new file mode 100644
index 00000000000..36f44817a83
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/wrappedbounded.go
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+// WrappedTracker wraps an implementation of an RTracker and adds an 
IsBounded() function
+// that returns true in order to allow RTrackers to be handled as bounded 
BoundableRTrackers
+// if necessary (like in self-checkpointing evaluation.)
+type WrappedTracker struct {
+       RTracker
+}
+
+// IsBounded returns true, indicating that the underlying RTracker represents 
a bounded
+// amount of work.
+func (t *WrappedTracker) IsBounded() bool {
+       return true
+}
+
+// NewWrappedTracker is a constructor for an RTracker that wraps another 
RTracker into a BoundedRTracker.
+func NewWrappedTracker(underlying RTracker) *WrappedTracker {
+       return &WrappedTracker{RTracker: underlying}
+}
diff --git a/sdks/go/test/integration/primitives/checkpointing.go 
b/sdks/go/test/integration/primitives/checkpointing.go
index f26b9f392ac..5b1079ad4ef 100644
--- a/sdks/go/test/integration/primitives/checkpointing.go
+++ b/sdks/go/test/integration/primitives/checkpointing.go
@@ -16,12 +16,14 @@
 package primitives
 
 import (
+       "context"
        "reflect"
        "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
 )
 
@@ -53,7 +55,7 @@ func (fn *selfCheckpointingDoFn) RestrictionSize(_ []byte, 
rest offsetrange.Rest
 // SplitRestriction modifies the offsetrange.Restriction's sized restriction 
function to produce a size-zero restriction
 // at the end of execution.
 func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, rest 
offsetrange.Restriction) []offsetrange.Restriction {
-       size := int64(1)
+       size := int64(10)
        s := rest.Start
        var splits []offsetrange.Restriction
        for e := s + size; e <= rest.End; s, e = e, e+size {
@@ -68,19 +70,27 @@ func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, 
rest offsetrange.Res
 func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ 
[]byte, emit func(int64)) sdf.ProcessContinuation {
        position := rt.GetRestriction().(offsetrange.Restriction).Start
 
+       counter := 0
        for {
                if rt.TryClaim(position) {
                        // Successful claim, emit the value and move on.
                        emit(position)
                        position++
-                       return sdf.ResumeProcessingIn(1 * time.Second)
+                       counter++
                } else if rt.GetError() != nil || rt.IsDone() {
                        // Stop processing on error or completion
+                       if err := rt.GetError(); err != nil {
+                               log.Errorf(context.Background(), "error in 
restriction tracker, got %v", err)
+                       }
                        return sdf.StopProcessing()
                } else {
-                       // Failed to claim but no error, resume later.
+                       // Resume later.
                        return sdf.ResumeProcessingIn(5 * time.Second)
                }
+
+               if counter >= 10 {
+                       return sdf.ResumeProcessingIn(1 * time.Second)
+               }
        }
 }
 

Reply via email to