[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=756565&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756565
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Apr/22 17:30
            Start Date: 13/Apr/22 17:30
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r849710510


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +948,64 @@ func validateSdfElementT(fn *Fn, name string, method 
*funcx.Fn, num int) error {
        return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods 
are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the 
function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+       var isWatermarkEstimating bool
+       if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+               isWatermarkEstimating = true
+       }
+       if !isSdf && isWatermarkEstimating {
+               return false, errors.Errorf("Watermark estimation method %v is 
defined on non-splittable DoFn. Watermark"+
+                       "estimation is only valid on splittable DoFns", 
createWatermarkEstimatorName)
+       }
+       return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are 
valid
+func validateWatermarkSig(fn *Fn) error {
+       paramRange := map[string][]int{
+               createWatermarkEstimatorName: []int{0, 0},
+       }
+       returnNum := 1 // TODO(BEAM-3301): Enable optional error params in SDF 
methods.
+
+       watermarkEstimatorT := 
reflect.TypeOf((*sdf.WatermarkEstimator)(nil)).Elem()
+
+       for _, name := range watermarkEstimationNames {
+               if method, ok := fn.methods[name]; ok {
+                       if len(method.Param) < paramRange[name][0] || 
len(method.Param) > paramRange[name][1] {
+                               err := errors.Errorf("unexpected number of 
params in method %v. got: %v, want number in range: %v to %v",
+                                       name, len(method.Param), 
paramRange[name][0], paramRange[name][1])
+                               return errors.SetTopLevelMsgf(err, "Unexpected 
number of parameters in method %v. "+
+                                       "Got: %v, Want number in range: %v to 
%v. Check that the signature conforms to the expected signature for %v, "+
+                                       "and that elements in SDF method 
parameters match elements in %v.",
+                                       name, len(method.Param), 
paramRange[name][0], paramRange[name][1], name, processElementName)

Review Comment:
   That seems like something we can make easier on ourselves (in a later PR), 
by adding a new function to our errors package so we don't need to repeat 
ourselves when we want a top level message, but also not lose that info when a 
higher level sets a top level error....



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -414,6 +432,10 @@ type SplittableUnit interface {
        // GetInputId returns the local input ID of the input that the element 
being
        // split was received from.
        GetInputId() string
+
+       // GetOutputWatermark gets the current output watermark of the 
splittable unit
+       // if one  is defined, or nil otherwise.

Review Comment:
   ```suggestion
        // if one is defined, or nil otherwise.
   ```



##########
sdks/go/pkg/beam/core/sdf/watermarkEstimator.go:
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import "time"
+
+type WallTimeWatermarkEstimator struct{}

Review Comment:
   Please add documentation strings for exported types.



##########
sdks/go/pkg/beam/core/sdf/watermarkEstimator.go:
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import "time"
+
+type WallTimeWatermarkEstimator struct{}

Review Comment:
   Exported types and methods on those types must have documentation comments.



##########
sdks/go/pkg/beam/core/sdf/watermarkEstimator.go:
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   WRT the file name, prefer snake_case over lowerCamelCase.
   
   Or simplify the name to estimators.go or watermarks.go and avoid the 
question entirely.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 756565)
    Time Spent: 2.5h  (was: 2h 20m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to