damccorm commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r852949698


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -852,11 +879,11 @@ func validateSdfSigTypes(fn *Fn, num int) error {
                method := fn.methods[name]
                switch name {
                case createInitialRestrictionName:
-                       if err := validateSdfElementT(fn, 
createInitialRestrictionName, method, num); err != nil {
+                       if err := validateSdfElementT(fn, 
createInitialRestrictionName, method, num, 0); err != nil {

Review Comment:
   No, I don't think that is currently supported



##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        var rc *coder.Coder
+       // Sdfs will always encode restrictions as KV<restriction, watermark 
state | nil>
        if fn.IsSplittable() {
                sdf := (*graph.SplittableDoFn)(fn)
-               rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+               restT := typex.New(sdf.RestrictionT())
+               // If no watermark estimator state, use boolean as a placeholder

Review Comment:
   > What do you think about that? Would that minimize the execution time 
branches?
   
   I'm not necessarily opposed to doing this, but I _don't_ think that it will 
minimized execution time branches, in fact I think it will moderately increase 
the number of them. Right now, we branch any time we might need to do some 
watermark related work - I actually don't think any of those branches would go 
away, and we'd need to add some additional conditional logic for 
reading/writing restrictions (since we wouldn't know if the restriction is 
located at `elm.Elm.(*FullValue).Elm2` or 
`elm.Elm.(*FullValue).Elm2.(*FullValue).Elm`). 
   
   Like I said, I'm not opposed to doing this differently, but I'd lean towards 
adding a custom nil value coder (probably in a future PR) over sometimes 
encoding and sometimes not encoding watermark state. I'd actually probably lean 
towards the current implementation over both, though, since a single bool value 
doesn't add much space overhead, and introducing another custom coder actually 
adds a little additional overhead of its own I think. FWIW, I believe the Java 
Sdk just has a default watermark state timestamp that it sends when no state is 
actually specified (not too different from this approach)



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
 // each case occurs and the implementation details, see the documentation for
 // the singleWindowSplit and multiWindowSplit methods.
 func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue, 
[]*FullValue, error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}

Review Comment:
   I'm not sure I fully understand your concern here - how are you suggesting 
we reduce decisions made? The only decision being made here is whether we need 
to invoke getWatermarkEstimatorState, which is a decision that should be made 
as close to when we actually perform the split as possible (without happening 
after the split to avoid accidental overestimation of state). Even if we know 
the restriction has watermark state, we will need this condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to