lostluck commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r852438536
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -852,11 +879,11 @@ func validateSdfSigTypes(fn *Fn, num int) error {
method := fn.methods[name]
switch name {
case createInitialRestrictionName:
- if err := validateSdfElementT(fn,
createInitialRestrictionName, method, num); err != nil {
+ if err := validateSdfElementT(fn,
createInitialRestrictionName, method, num, 0); err != nil {
Review Comment:
I guess we haven't tried this with adding a context.Context parameter (or
windows, event time, etc...) ?
##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection,
opts ...Option) ([]PCo
}
var rc *coder.Coder
+ // Sdfs will always encode restrictions as KV<restriction, watermark
state | nil>
if fn.IsSplittable() {
sdf := (*graph.SplittableDoFn)(fn)
- rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+ restT := typex.New(sdf.RestrictionT())
+ // If no watermark estimator state, use boolean as a placeholder
+ weT := typex.New(reflect.TypeOf(true))
+ if sdf.IsStatefulWatermarkEstimating() {
+ weT = typex.New(sdf.WatermarkEstimatorStateT())
+ }
+ rc, err = inferCoder(typex.New(typex.KVType, restT, weT))
Review Comment:
[`typex.NewKV`](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/typex/fulltype.go#L199)
instead.
##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection,
opts ...Option) ([]PCo
}
var rc *coder.Coder
+ // Sdfs will always encode restrictions as KV<restriction, watermark
state | nil>
if fn.IsSplittable() {
sdf := (*graph.SplittableDoFn)(fn)
- rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+ restT := typex.New(sdf.RestrictionT())
+ // If no watermark estimator state, use boolean as a placeholder
Review Comment:
Now that I'm looking at your comment in it's code context: I'm not happy
with making up a type, especially if we need to re-check that decision at the
execution layer. The execution layers need to make as few decisions as
possible, and if they do need to be made, they should happen at Bundle
construction time, not on a per call basis.
If we have to, we should make it a GlobalWindow which encodes down to 0
size, but we didn't make that easy to do at this user layer. We'd almost rather
propagate the current information through to
[graphx/translate.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L443)
and make the decisions there, since we would be able to set the coder type to
a [Global window coder](
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/coder/windows.go#L45),
but we didn't make window coders fully modular to ordinary coders, so
additional work is there.
That seems hackier than trusting the restriction coder is correct by
construction time (after passing through this, and later steps) and determining
if we need to execute Watermark code based on the DoFn in
[exec/translate.go](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L452)
configuring the affected exec nodes accordingly at construction.
What do you think about that? Would that minimize the execution time
branches?
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
// each case occurs and the implementation details, see the documentation for
// the singleWindowSplit and multiWindowSplit methods.
func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue,
[]*FullValue, error) {
+ // Get the watermark state immediately so that we don't overestimate
our current watermark.
+ var pWeState interface{}
+ var rWeState interface{}
Review Comment:
This is something that should probably be computed once at bundle
initialization time, rather than per call. That is, we should be already aware
whether the restriction has a specific state or not, and that information
should be part of the coders we set in the pipeline proto during Pipeline
Construction (in graphx/translate.go), so we can execute on them in
exec/translate.go, and avoid making decisions per call on the hot execution
paths as much as possible. (While ProcessElement is the "hottest" of these,
all lifecycle methods are hotter than bundle construction time.)t
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
// each case occurs and the implementation details, see the documentation for
// the singleWindowSplit and multiWindowSplit methods.
func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue,
[]*FullValue, error) {
+ // Get the watermark state immediately so that we don't overestimate
our current watermark.
+ var pWeState interface{}
+ var rWeState interface{}
+ if n.gwesInv != nil {
+ rWeState = n.gwesInv.Invoke(n.PDo.we)
+ pWeState = rWeState
+ // If we've processed elements, the initial watermark estimator
state will be set.
+ // In that case we should hold the output watermark at that
initial state so that we don't
+ // Advance past where the current elements are holding the
watermark
+ if n.initWeS != nil {
+ pWeState = n.initWeS
+ }
+ } else {
+ // If no watermark estimator state, use boolean as a placeholder
+ pWeState = false
+ rWeState = false
Review Comment:
As a separate point: it's identical to the compiler if we declare the
variables as
```
pWeState := interface{}(false)
rWeState := interface{}(false)
```
that way we can drop the else path entirely, and the other path with
override these defaults. I'm pretty sure the compiler is smart enough to know
that these have no side effects, and won't allocate anything for them when
`n.gwesInv != nil`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]