[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=760248&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-760248
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Apr/22 17:34
Start Date: 21/Apr/22 17:34
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17374:
URL: https://github.com/apache/beam/pull/17374#discussion_r855427817
##########
sdks/go/pkg/beam/pardo.go:
##########
@@ -64,9 +66,16 @@ func TryParDo(s Scope, dofn interface{}, col PCollection,
opts ...Option) ([]PCo
}
var rc *coder.Coder
+ // Sdfs will always encode restrictions as KV<restriction, watermark
state | nil>
if fn.IsSplittable() {
sdf := (*graph.SplittableDoFn)(fn)
- rc, err = inferCoder(typex.New(sdf.RestrictionT()))
+ restT := typex.New(sdf.RestrictionT())
+ // If no watermark estimator state, use boolean as a placeholder
Review Comment:
I updated to use a default invoker implementation that just returns false
like we talked about offline - I agree that is cleaner.
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -457,6 +501,23 @@ type SplittableUnit interface {
// each case occurs and the implementation details, see the documentation for
// the singleWindowSplit and multiWindowSplit methods.
func (n *ProcessSizedElementsAndRestrictions) Split(f float64) ([]*FullValue,
[]*FullValue, error) {
+ // Get the watermark state immediately so that we don't overestimate
our current watermark.
+ var pWeState interface{}
+ var rWeState interface{}
Review Comment:
This was simplified by the default invoker approach.
Issue Time Tracking
-------------------
Worklog Id: (was: 760248)
Time Spent: 5.5h (was: 5h 20m)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)