[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=756575&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756575
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 13/Apr/22 17:51
Start Date: 13/Apr/22 17:51
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r849748500
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method
*funcx.Fn, num int) error {
return nil
}
+// validateIsWatermarkEstimating returns true if watermark estimator methods
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+ var isWatermarkEstimating bool
+ if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+ isWatermarkEstimating = true
+ }
+ if !isSdf && isWatermarkEstimating {
+ return false, errors.Errorf("Watermark estimation method %v is
defined on non-splittable DoFn. Watermark"+
+ "estimation is only valid on splittable DoFns",
createWatermarkEstimatorName)
+ }
+ return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are
valid
+func validateWatermarkSig(fn *Fn) error {
+ paramRange := map[string][]int{
+ createWatermarkEstimatorName: []int{0, 0},
+ }
+ returnNum := 1 // TODO(BEAM-3301): Enable optional error params in SDF
methods.
+
+ watermarkEstimatorT :=
reflect.TypeOf((*sdf.WatermarkEstimator)(nil)).Elem()
+
+ for _, name := range watermarkEstimationNames {
+ if method, ok := fn.methods[name]; ok {
+ if len(method.Param) < paramRange[name][0] ||
len(method.Param) > paramRange[name][1] {
+ err := errors.Errorf("unexpected number of
params in method %v. got: %v, want number in range: %v to %v",
+ name, len(method.Param),
paramRange[name][0], paramRange[name][1])
+ return errors.SetTopLevelMsgf(err, "Unexpected
number of parameters in method %v. "+
+ "Got: %v, Want number in range: %v to
%v. Check that the signature conforms to the expected signature for %v, "+
+ "and that elements in SDF method
parameters match elements in %v.",
+ name, len(method.Param),
paramRange[name][0], paramRange[name][1], name, processElementName)
Review Comment:
Yeah, this file does that a lot - that's a good idea
Issue Time Tracking
-------------------
Worklog Id: (was: 756575)
Time Spent: 2h 40m (was: 2.5h)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)