damccorm commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r855427817


##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        var rc *coder.Coder
+       // Sdfs will always encode restrictions as KV<restriction, watermark 
state | nil>
        if fn.IsSplittable() {
                sdf := (*graph.SplittableDoFn)(fn)
-               rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+               restT := typex.New(sdf.RestrictionT())
+               // If no watermark estimator state, use boolean as a placeholder

Review Comment:
   I updated to use a default invoker implementation that just returns false 
like we talked about offline - I agree that is cleaner.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
 // each case occurs and the implementation details, see the documentation for
 // the singleWindowSplit and multiWindowSplit methods.
 func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue, 
[]*FullValue, error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}

Review Comment:
   This was simplified by the default invoker approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to