lostluck commented on a change in pull request #11517:
URL: https://github.com/apache/beam/pull/11517#discussion_r416930600



##########
File path: sdks/go/examples/stringsplit/offsetrange/offsetrange.go
##########
@@ -89,21 +89,23 @@ func (tracker *Tracker) GetError() error {
 }
 
 // TrySplit splits at the nearest integer greater than the given fraction of 
the remainder.
-func (tracker *Tracker) TrySplit(fraction float64) (interface{}, error) {
+func (tracker *Tracker) TrySplit(fraction float64) (interface{}, interface{}, 
error) {
        if tracker.Stopped || tracker.IsDone() {
-               return nil, nil
+               return tracker.Rest, nil, nil
        }
-       if fraction < 0 || fraction > 1 {
-               return nil, errors.New("fraction must be in range [0, 1]")
+       if fraction < 0 {
+               fraction = 0

Review comment:
       Might be worth documenting this behavior in the comment.

##########
File path: sdks/go/examples/stringsplit/offsetrange/offsetrange.go
##########
@@ -89,21 +89,23 @@ func (tracker *Tracker) GetError() error {
 }
 
 // TrySplit splits at the nearest integer greater than the given fraction of 
the remainder.
-func (tracker *Tracker) TrySplit(fraction float64) (interface{}, error) {
+func (tracker *Tracker) TrySplit(fraction float64) (interface{}, interface{}, 
error) {

Review comment:
       Given this is the example, using named return values (primary, residual 
...) is appropriate here for documentation purposes (but not so one can use an 
empty return.)

##########
File path: sdks/go/pkg/beam/pardo.go
##########
@@ -222,6 +222,87 @@ func ParDo0(s Scope, dofn interface{}, col PCollection, 
opts ...Option) {
 // DoFn instance via output PCollections, in the absence of external
 // communication mechanisms written by user code.
 //
+// Splittable DoFns (Experimental)
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
+//
+// Splittable DoFns are DoFns that are able to split work within an element,
+// as opposed to only at element boundaries like normal DoFns. This is useful
+// for DoFns that emit many outputs per input element and can distribute that
+// work among multiple workers. The most common examples of this are sources.
+//
+// In order to split work within an element, splittable DoFns use the concept 
of
+// restrictions, which are objects that are associated with an element and
+// describe a portion of work on that element. For example, a restriction
+// associated with a filename might describe what byte range within that file 
to
+// process. In addition to restrictions, splittable DoFns also rely on
+// restriction trackers to track progress and perform splits on a restriction
+// currently being processed. See the `RTracker` interface in core/sdf/sdf.go
+// for more details.
+//
+// Splitting
+//
+// Splitting means taking one restriction and splitting into two or more that
+// cover the entire input space of the original one. In other words, processing
+// all the split restrictions should produce identical output to processing
+// the original one.
+//
+// Splitting occurs in two stages. The initial splitting occurs before any
+// restrictions have started processing. This step is used to split large
+// restrictions into smaller ones that can then be distributed among multiple
+// workers for processing. Initial splitting is user-defined and optional.
+//
+// Dynamic splitting occurs during the processing of a restriction in runners
+// that have implemented it. If there are available workers, runners may split
+// the unprocessed portion of work from a busy worker and shard it to available
+// workers in order to better distribute work. With unsplittable DoFns this can
+// only occur on element boundaries, but for splittable DoFns this split
+// can land within a restriction and will require splitting that restriction.
+//
+// * Note: The Go SDK currently does not support dynamic splitting for SDFs,
+//   only initial splitting. Only initially split restrictions can be
+//   distributed by liquid sharding. Stragglers will not be split during
+//   execution with dynamic splitting.
+//
+// Splittable DoFn Methods
+//
+// Making a splittable DoFn requires the following methods to be implemented on
+// a DoFn in addition to the usual DoFn requirements. In the following
+// method signatures `elem` represents the main input elements to the DoFn, and
+// should match the types used in ProcessElement. `restriction` represents the
+// user-defined restriction, and can be any type as long as it is consistent
+// throughout all the splittable DoFn methods:
+//
+// * `CreateInitialRestriction(element) restriction`
+//     CreateInitialRestriction creates an initial restriction encompassing an
+//     entire element. The restriction created stays associated with the 
element
+//     it describes.
+// * `SplitRestriction(elem, restriction) []restriction`
+//     SplitRestriction takes an element and its initial restriction, and
+//     optionally performs an initial split on it, returning a slice of all the
+//     split restrictions. If no splits are desired, the method returns a slice
+//     containing only the original restriction. This method will always be
+//     called on each newly created restriction before they are processed.
+// * `RestrictionSize(elem, restriction) float64`
+//     RestrictionSize returns a cheap size estimation for a restriction. This
+//     size is an abstract scalar value that represents how much work a
+//     restriction takes compared to other restrictions in the same DoFn. For
+//     example, a size of 200 represents twice as much work as a size of
+//     100, but the numbers do not represent anything on their own. Size is
+//     used by runners to estimate work for liquid sharding.
+// * `CreateTracker(restriction) restrictionTracker`
+//     CreateTracker creates and returns a restriction tracker (a concrete type
+//     implementing `sdf.RTracker`) given a restriction. The restriction 
tracker

Review comment:
       Consider being explicit about sdf.RTracker being an interface.
   eg... implementing the `sdf.RTracker` interface.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general 
usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while 
processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for 
tracking a single
-// restriction type, which is the type that should be used to create the 
RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each 
RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-       // TryClaim attempts to claim the block of work in the current 
restriction located at a given
-       // position. This method must be used in the ProcessElement method of 
Splittable DoFns to claim
-       // work before performing it. If no work is claimed, the ProcessElement 
is not allowed to perform
-       // work or emit outputs. If the claim is successful, the DoFn must 
process the entire block. If
-       // the claim is unsuccessful the ProcessElement method of the DoFn must 
return without performing
-       // any additional work or emitting any outputs.
-       //
-       // TryClaim accepts an arbitrary value that can be interpreted as the 
position of a block, and
-       // returns a boolean indicating whether the claim succeeded.
+       // TryClaim attempts to claim the block of work located in the given 
position of the
+       // restriction. This method must be called in ProcessElement to claim 
work before it can be
+       // processed. Processing work without claiming it first can lead to 
incorrect output.
        //
-       // If the claim fails due to an error, that error can be retrieved with 
GetError.
+       // If the claim is successful, the DoFn must process the entire block. 
If the claim is
+       // unsuccessful ProcessElement method of the DoFn must return without 
performing
+       // any additional work or emitting any outputs.
        //
-       // For SDFs to work properly, claims must always be monotonically 
increasing in reference to the
-       // restriction's start and end points, and every block of work in a 
restriction must be claimed.
+       // If the claim fails due to an error, that error is stored and will be 
automatically emitted
+       // when the RTracker is validated, or can be manually retrieved with 
GetError.
        //
        // This pseudocode example illustrates the typical usage of TryClaim:
        //
-       //      pos = position of first block after restriction.start
+       //      pos = position of first block within the restriction
        //      for TryClaim(pos) == true {
        //              // Do all work in the claimed block and emit outputs.
-       //              pos = position of next block
+       //              pos = position of next block within the restriction
        //      }
        //      return
        TryClaim(pos interface{}) (ok bool)
 
-       // GetError returns the error that made this RTracker stop executing, 
and it returns nil if no
-       // error occurred. If IsDone fails while validating this RTracker, this 
method will be
-       // called to log the error.
+       // GetError returns the error that made this RTracker stop executing, 
and returns nil if no
+       // error occurred. This is the error that is emitted if automated 
validation fails.
        GetError() error
 
-       // TrySplit splits the current restriction into a primary and residual 
based on a fraction of the
-       // work remaining. The split is performed along the first valid split 
point located after the
-       // given fraction of the remainder. This method is called by the SDK 
harness when receiving a
-       // split request by the runner.
+       // TrySplit splits the current restriction into a primary (currently 
executing work) and
+       // residual (work to be split off) based on a fraction of work 
remaining. The split is performed
+       // at the first valid split point located after the given fraction of 
remaining work.
+       //
+       // For example, a fraction of 0.5 means to split at the halfway point 
of remaining work only. If
+       // 50% of work is done and 50% remaining, then a fraction of 0.5 would 
split after 75% of work.

Review comment:
       +1 to this concrete example.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general 
usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while 
processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for 
tracking a single
-// restriction type, which is the type that should be used to create the 
RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each 
RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-       // TryClaim attempts to claim the block of work in the current 
restriction located at a given
-       // position. This method must be used in the ProcessElement method of 
Splittable DoFns to claim
-       // work before performing it. If no work is claimed, the ProcessElement 
is not allowed to perform
-       // work or emit outputs. If the claim is successful, the DoFn must 
process the entire block. If
-       // the claim is unsuccessful the ProcessElement method of the DoFn must 
return without performing
-       // any additional work or emitting any outputs.
-       //
-       // TryClaim accepts an arbitrary value that can be interpreted as the 
position of a block, and
-       // returns a boolean indicating whether the claim succeeded.
+       // TryClaim attempts to claim the block of work located in the given 
position of the
+       // restriction. This method must be called in ProcessElement to claim 
work before it can be
+       // processed. Processing work without claiming it first can lead to 
incorrect output.
        //
-       // If the claim fails due to an error, that error can be retrieved with 
GetError.
+       // If the claim is successful, the DoFn must process the entire block. 
If the claim is
+       // unsuccessful ProcessElement method of the DoFn must return without 
performing
+       // any additional work or emitting any outputs.
        //
-       // For SDFs to work properly, claims must always be monotonically 
increasing in reference to the
-       // restriction's start and end points, and every block of work in a 
restriction must be claimed.
+       // If the claim fails due to an error, that error is stored and will be 
automatically emitted
+       // when the RTracker is validated, or can be manually retrieved with 
GetError.
        //
        // This pseudocode example illustrates the typical usage of TryClaim:
        //
-       //      pos = position of first block after restriction.start
+       //      pos = position of first block within the restriction
        //      for TryClaim(pos) == true {
        //              // Do all work in the claimed block and emit outputs.
-       //              pos = position of next block
+       //              pos = position of next block within the restriction
        //      }
        //      return
        TryClaim(pos interface{}) (ok bool)
 
-       // GetError returns the error that made this RTracker stop executing, 
and it returns nil if no
-       // error occurred. If IsDone fails while validating this RTracker, this 
method will be
-       // called to log the error.
+       // GetError returns the error that made this RTracker stop executing, 
and returns nil if no
+       // error occurred. This is the error that is emitted if automated 
validation fails.
        GetError() error
 
-       // TrySplit splits the current restriction into a primary and residual 
based on a fraction of the
-       // work remaining. The split is performed along the first valid split 
point located after the
-       // given fraction of the remainder. This method is called by the SDK 
harness when receiving a
-       // split request by the runner.
+       // TrySplit splits the current restriction into a primary (currently 
executing work) and
+       // residual (work to be split off) based on a fraction of work 
remaining. The split is performed
+       // at the first valid split point located after the given fraction of 
remaining work.
+       //
+       // For example, a fraction of 0.5 means to split at the halfway point 
of remaining work only. If
+       // 50% of work is done and 50% remaining, then a fraction of 0.5 would 
split after 75% of work.
+       //
+       // This method modifies the underlying restriction in the RTracker to 
reflect the primary. It
+       // then returns a copy of the newly modified restriction as a primary, 
and returns a new
+       // restriction for the residual. If the split would produce an empty 
residual (i.e. the only
+       // split point is the end of the restriction), then the returned 
residual is nil.
        //
-       // The current restriction is split into two by modifying the current 
restriction's endpoint to
-       // turn it into the primary, and returning a new restriction tracker 
representing the residual.
-       // If no valid split point exists, this method returns nil instead of a 
residual, but does not
-       // return an error. If this method is unable to split due to some error 
then it returns nil and
-       // an error.
-       TrySplit(fraction float64) (residual interface{}, err error)
+       // If an error is returned, some catastrophic failure occurred and the 
entire bundle will fail.
+       TrySplit(fraction float64) (primary, residual interface{}, err error)

Review comment:
       IIRC for dynamic splitting, this is the one that requires the Tracker to 
be concurrency safe? Do we want to declared ahead of time that implementations 
of RTracker must be threadsafe?

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -43,7 +43,7 @@ func Stage(ctx context.Context, id, endpoint, binary, st 
string) (retrievalToken
                return "", nil
        }
 
-  return StageViaLegacyApi(ctx, cc, binary, st)

Review comment:
       This cleanup is  separate from the rest of the change.

##########
File path: sdks/go/pkg/beam/core/sdf/sdf.go
##########
@@ -13,63 +13,64 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package sdf is experimental, incomplete, and not yet meant for general 
usage.
+// Package contains interfaces used specifically for splittable DoFns.
+//
+// Warning: Splittable DoFns are still experimental, largely untested, and
+// likely to have bugs.
 package sdf
 
 // RTracker is an interface used to interact with restrictions while 
processing elements in
-// SplittableDoFns. Each implementation of RTracker is expected to be used for 
tracking a single
-// restriction type, which is the type that should be used to create the 
RTracker, and output by
-// TrySplit.
+// splittable DoFns (specifically, in the ProcessElement method). Each 
RTracker tracks the progress
+// of a single restriction.
 type RTracker interface {
-       // TryClaim attempts to claim the block of work in the current 
restriction located at a given
-       // position. This method must be used in the ProcessElement method of 
Splittable DoFns to claim
-       // work before performing it. If no work is claimed, the ProcessElement 
is not allowed to perform
-       // work or emit outputs. If the claim is successful, the DoFn must 
process the entire block. If
-       // the claim is unsuccessful the ProcessElement method of the DoFn must 
return without performing
-       // any additional work or emitting any outputs.
-       //
-       // TryClaim accepts an arbitrary value that can be interpreted as the 
position of a block, and
-       // returns a boolean indicating whether the claim succeeded.
+       // TryClaim attempts to claim the block of work located in the given 
position of the
+       // restriction. This method must be called in ProcessElement to claim 
work before it can be
+       // processed. Processing work without claiming it first can lead to 
incorrect output.
        //
-       // If the claim fails due to an error, that error can be retrieved with 
GetError.
+       // If the claim is successful, the DoFn must process the entire block. 
If the claim is
+       // unsuccessful ProcessElement method of the DoFn must return without 
performing
+       // any additional work or emitting any outputs.
        //
-       // For SDFs to work properly, claims must always be monotonically 
increasing in reference to the
-       // restriction's start and end points, and every block of work in a 
restriction must be claimed.
+       // If the claim fails due to an error, that error is stored and will be 
automatically emitted
+       // when the RTracker is validated, or can be manually retrieved with 
GetError.
        //
        // This pseudocode example illustrates the typical usage of TryClaim:
        //
-       //      pos = position of first block after restriction.start
+       //      pos = position of first block within the restriction
        //      for TryClaim(pos) == true {
        //              // Do all work in the claimed block and emit outputs.
-       //              pos = position of next block
+       //              pos = position of next block within the restriction
        //      }
        //      return
        TryClaim(pos interface{}) (ok bool)

Review comment:
       It would be worth explicitly saying that the position type is related 
directly to the type of Restriction being handled. 
   
   Eg.
   A linear offset restriction could use a single int64 value to represent a 
position. Similarly a multi dimensional restriction space could use a more 
complex type to represent the area claimed as a position.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to