lostluck commented on code in PR #17716:
URL: https://github.com/apache/beam/pull/17716#discussion_r877529091


##########
sdks/go/test/integration/primitives/checkpointing.go:
##########
@@ -68,19 +68,24 @@ func (fn *selfCheckpointingDoFn) SplitRestriction(_ []byte, 
rest offsetrange.Res
 func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ 
[]byte, emit func(int64)) sdf.ProcessContinuation {
        position := rt.GetRestriction().(offsetrange.Restriction).Start
 
+       counter := 0
        for {
                if rt.TryClaim(position) {
                        // Successful claim, emit the value and move on.
                        emit(position)
                        position++
-                       return sdf.ResumeProcessingIn(1 * time.Second)
+                       counter++
                } else if rt.GetError() != nil || rt.IsDone() {
                        // Stop processing on error or completion

Review Comment:
   Log an error if one is present please. Just in case. 



##########
sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go:
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package offsetrange defines a restriction and restriction tracker for offset
+// ranges. An offset range is just a range, with a start and end, that can
+// begin at an offset, and is commonly used to represent byte ranges for files
+// or indices for iterable containers.
+
+package wrappedbounded
+
+import "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+
+// Tracker wraps an implementation of an RTracker and adds an IsBounded() 
function
+// that returns true in order to allow RTrackers to be handled as bounded 
BoundableRTrackers
+// if necessary (like in self-checkpointing evaluation.)
+type Tracker struct {
+       baseTracker sdf.RTracker
+}
+
+// TryClaim attempts to claim a block of work from the underlying RTracker's 
restriction.
+func (t *Tracker) TryClaim(pos interface{}) (ok bool) {
+       return t.baseTracker.TryClaim(pos)
+}
+
+// GetError returns an error from the underlying RTracker if it has stopped 
executing. Returns nil
+// if none has occurred.
+func (t *Tracker) GetError() error {
+       return t.baseTracker.GetError()
+}
+
+// TrySplit splits the underlying RTracker's restriction into a primary (work 
that is currently executing)
+// and a residual (work that will be split off and resumed later.)
+func (t *Tracker) TrySplit(fraction float64) (primary, residual interface{}, 
err error) {
+       return t.baseTracker.TrySplit(fraction)
+}
+
+// GetProgress returns two abstract scalars representing the amount of work 
done and the remaining work
+// left in the underlying RTracker. These are unitless values, only used to 
estimate work in relation to
+// each other.
+func (t *Tracker) GetProgress() (done float64, remaining float64) {
+       return t.baseTracker.GetProgress()
+}
+
+// IsDone() returns a boolean indicating if the work represented by the 
underlying RTracker has
+// been completed.
+func (t *Tracker) IsDone() bool {
+       return t.baseTracker.IsDone()
+}

Review Comment:
   Today you get to learn about Type Embedding:
   https://eli.thegreenplace.net/2020/embedding-in-go-part-1-structs-in-structs/
   
   In short, if you write 
   ```
   type BoundedTrackerWrapper struct{
     RTracker
   }
   ```
   You only need to define the IsBounded method, and the other methods on the 
interface are automatically forwarded. The fieldname for the embedded field 
would be `RTracker` as it matches the type.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -366,13 +389,30 @@ func (n *DataSource) Checkpoint() (SplitResult, 
time.Duration, bool, error) {
 
        ow := su.GetOutputWatermark()
 
-       // Always split at fraction 0.0, should have no primaries left.
+       // Always split at fraction 0.0. All remaining work should be returned 
as a residual, as anything left in the primaries
+       // will not be rescheduled and could represent data loss. We expect nil 
primaries but will also ignore any restrictions
+       // that are bounded and of size 0 as they represent no remaining work.
        ps, rs, err := su.Split(0.0)
        if err != nil {
                return SplitResult{}, -1 * time.Minute, false, err
        }
+       if len(rs) == 0 {
+               return SplitResult{}, -1 * time.Minute, false, nil
+       }
        if len(ps) != 0 {
-               return SplitResult{}, -1 * time.Minute, false, 
fmt.Errorf("failed to checkpoint: got %v primary roots, want none", ps)
+               // Expected structure of the root FullValue is KV<KV<Elm, 
KV<BoundedRTracker, watermarkEstimatorState>>, Size, (Timestamp?, Windows?)>

Review Comment:
   As written, it looks like the KV has the windows and the timestamps too, 
which is confusing. I'd recommend removing those, since they aren't relevant at 
this stage, and it feels confusing to mention them (they're not stored in 
`Elm`s, so they don't need to be described.)



##########
sdks/go/pkg/beam/io/rtrackers/wrappedbounded/wrappedbounded.go:
##########
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package offsetrange defines a restriction and restriction tracker for offset
+// ranges. An offset range is just a range, with a start and end, that can
+// begin at an offset, and is commonly used to represent byte ranges for files
+// or indices for iterable containers.
+
+package wrappedbounded

Review Comment:
   TBH this seems generally useful so we can just incorporate it into the SDF 
package, rather than spawn a new package for a type. Packages aren't Java 
Classes, and this isn't a stand alone RTracker/Restriction implementation like 
OffsetRange.
   
   Also, Package Doc *must* be attached without a blank line to the 
declaration. As written this package has no doc header.



##########
sdks/go/pkg/beam/core/runtime/graphx/translate.go:
##########
@@ -43,6 +43,7 @@ const (
        URNReshuffle     = "beam:transform:reshuffle:v1"
        URNCombinePerKey = "beam:transform:combine_per_key:v1"
        URNWindow        = "beam:transform:window_into:v1"
+       URNTruncate      = "beam:transform:sdf_truncate_sized_restrictions_v1"

Review Comment:
   Please move this to adjacent to `URNRequiresSplittableDoFn`, since it's 
associated with that feature.  It should probably be added to the capabilities 
requirement of the pipeline whenever there's a ProcessContinuation DoFn (see 
how Danny did BundleFinalization, and how the SDF one is handled.  
   
   That can happen in this PR, or in Ritesh's, either's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to