lostluck commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r852438536


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -852,11 +879,11 @@ func validateSdfSigTypes(fn *Fn, num int) error {
                method := fn.methods[name]
                switch name {
                case createInitialRestrictionName:
-                       if err := validateSdfElementT(fn, 
createInitialRestrictionName, method, num); err != nil {
+                       if err := validateSdfElementT(fn, 
createInitialRestrictionName, method, num, 0); err != nil {

Review Comment:
   I guess we haven't tried this with adding a context.Context parameter (or 
windows, event time, etc...) ?



##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        var rc *coder.Coder
+       // Sdfs will always encode restrictions as KV<restriction, watermark 
state | nil>
        if fn.IsSplittable() {
                sdf := (*graph.SplittableDoFn)(fn)
-               rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+               restT := typex.New(sdf.RestrictionT())
+               // If no watermark estimator state, use boolean as a placeholder
+               weT := typex.New(reflect.TypeOf(true))
+               if sdf.IsStatefulWatermarkEstimating() {
+                       weT = typex.New(sdf.WatermarkEstimatorStateT())
+               }
+               rc, err = inferCoder(typex.New(typex.KVType, restT, weT))

Review Comment:
   
[`typex.NewKV`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/typex/fulltype.go#L199)
  instead.



##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
        }
 
        var rc *coder.Coder
+       // Sdfs will always encode restrictions as KV<restriction, watermark 
state | nil>
        if fn.IsSplittable() {
                sdf := (*graph.SplittableDoFn)(fn)
-               rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+               restT := typex.New(sdf.RestrictionT())
+               // If no watermark estimator state, use boolean as a placeholder

Review Comment:
   Now that I'm looking at your comment in it's code context: I'm not happy 
with making up a type, especially if we need to re-check that decision at the 
execution layer. The execution layers need to make as few decisions as 
possible, and if they do need to be made, they should happen at Bundle 
construction time, not on a per call basis.
   
   If we have to, we should make it a GlobalWindow which encodes down to 0 
size, but we didn't make that easy to do at this user layer. We'd almost rather 
propagate the current information through to 
[graphx/translate.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L443)
 and make the decisions there, since we would be able to set the coder type to 
a [Global window coder]( 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/coder/windows.go#L45),
 but we didn't make window coders fully modular to ordinary coders, so 
additional work is there.
   
   That seems hackier than trusting the restriction coder is correct by 
construction time (after passing through this, and later steps) and determining 
if we need to execute Watermark code based on the DoFn in 
[exec/translate.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L452)
 configuring the affected exec nodes accordingly at construction.
   
   What do you think about that? Would that minimize the execution time 
branches?
   



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
 // each case occurs and the implementation details, see the documentation for
 // the singleWindowSplit and multiWindowSplit methods.
 func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue, 
[]*FullValue, error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}

Review Comment:
   This is something that should probably be computed once at bundle 
initialization time, rather than per call. That is, we should be already aware 
whether the restriction has a specific state or not, and that information 
should be part of the coders we set in the pipeline proto during Pipeline 
Construction (in graphx/translate.go), so we can execute on them in 
exec/translate.go, and avoid making decisions per call on the hot execution 
paths as much as possible.  (While ProcessElement is the "hottest" of these, 
all lifecycle methods are hotter than bundle construction time.)t



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
 // each case occurs and the implementation details, see the documentation for
 // the singleWindowSplit and multiWindowSplit methods.
 func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue, 
[]*FullValue, error) {
+       // Get the watermark state immediately so that we don't overestimate 
our current watermark.
+       var pWeState interface{}
+       var rWeState interface{}
+       if n.gwesInv != nil {
+               rWeState = n.gwesInv.Invoke(n.PDo.we)
+               pWeState = rWeState
+               // If we've processed elements, the initial watermark estimator 
state will be set.
+               // In that case we should hold the output watermark at that 
initial state so that we don't
+               // Advance past where the current elements are holding the 
watermark
+               if n.initWeS != nil {
+                       pWeState = n.initWeS
+               }
+       } else {
+               // If no watermark estimator state, use boolean as a placeholder
+               pWeState = false
+               rWeState = false

Review Comment:
   As a separate point: it's identical to the compiler if we declare the 
variables as 
   ```
   pWeState := interface{}(false)
   rWeState := interface{}(false)
   ```
   that way we can drop the else path entirely, and the other path with 
override these defaults. I'm pretty sure the compiler is smart enough to know 
that these have no side effects, and won't allocate anything for them when 
`n.gwesInv != nil`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to