damccorm commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r852949698
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -852,11 +879,11 @@ func validateSdfSigTypes(fn *Fn, num int) error {
method := fn.methods[name]
switch name {
case createInitialRestrictionName:
- if err := validateSdfElementT(fn,
createInitialRestrictionName, method, num); err != nil {
+ if err := validateSdfElementT(fn,
createInitialRestrictionName, method, num, 0); err != nil {
Review Comment:
No, I don't think that is currently supported
##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection,
opts ...Option) ([]PCo
}
var rc *coder.Coder
+ // Sdfs will always encode restrictions as KV<restriction, watermark
state | nil>
if fn.IsSplittable() {
sdf := (*graph.SplittableDoFn)(fn)
- rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+ restT := typex.New(sdf.RestrictionT())
+ // If no watermark estimator state, use boolean as a placeholder
Review Comment:
> What do you think about that? Would that minimize the execution time
branches?
I'm not necessarily opposed to doing this, but I _don't_ think that it will
minimized execution time branches, in fact I think it will moderately increase
the number of them. Right now, we branch any time we might need to do some
watermark related work - I actually don't think any of those branches would go
away, and we'd need to add some additional conditional logic for
reading/writing restrictions (since we wouldn't know if the restriction is
located at `elm.Elm.(*FullValue).Elm2` or
`elm.Elm.(*FullValue).Elm2.(*FullValue).Elm`).
Like I said, I'm not opposed to doing this differently, but I'd lean towards
adding a custom nil value coder (probably in a future PR) over sometimes
encoding and sometimes not encoding watermark state. I'd actually probably lean
towards the current implementation over both, though, since a single bool value
doesn't add much space overhead, and introducing another custom coder actually
adds a little additional overhead of its own I think. FWIW, I believe the Java
Sdk just has a default watermark state timestamp that it sends when no state is
actually specified (not too different from this approach)
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
// each case occurs and the implementation details, see the documentation for
// the singleWindowSplit and multiWindowSplit methods.
func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue,
[]*FullValue, error) {
+ // Get the watermark state immediately so that we don't overestimate
our current watermark.
+ var pWeState interface{}
+ var rWeState interface{}
Review Comment:
I'm not sure I fully understand your concern here - how are you suggesting
we reduce decisions made? The only decision being made here is whether we need
to invoke getWatermarkEstimatorState, which is a decision that should be made
as close to when we actually perform the split as possible (without happening
after the split to avoid accidental overestimation of state). Even if we know
the restriction has watermark state, we will need this condition
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]