[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=755702&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755702
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/22 11:34
            Start Date: 12/Apr/22 11:34
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848330886


##########
sdks/go/pkg/beam/core/graph/fn_test.go:
##########
@@ -227,6 +228,59 @@ func TestNewDoFnSdf(t *testing.T) {
        })
 }
 
+func TestNewDoFnWatermarkEstimating(t *testing.T) {
+       t.Run("valid", func(t *testing.T) {
+               tests := []struct {
+                       dfn  interface{}
+                       main mainInputs
+               }{
+                       {dfn: &GoodWatermarkEstimating{}, main: MainSingle},
+               }
+
+               for _, test := range tests {
+                       t.Run(reflect.TypeOf(test.dfn).String(), func(t 
*testing.T) {
+                               // Valid DoFns should pass validation with and 
without KV info.
+                               if _, err := NewDoFn(test.dfn); err != nil {
+                                       t.Fatalf("NewDoFn with Watermark 
Estimation failed: %v", err)
+                               }
+                               if _, err := NewDoFn(test.dfn, 
NumMainInputs(test.main)); err != nil {
+                                       t.Fatalf("NewDoFn(NumMainInputs(%v)) 
with Watermark Estimation failed: %v", test.main, err)
+                               }
+                       })
+               }
+       })
+       t.Run("invalid", func(t *testing.T) {
+               tests := []struct {
+                       dfn interface{}
+               }{
+                       {dfn: &BadWatermarkEstimatingNonSdf{}},
+                       {dfn: 
&BadWatermarkEstimatingCreateWatermarkEstimatorReturnType{}},
+               }
+               for _, test := range tests {
+                       t.Run(reflect.TypeOf(test.dfn).String(), func(t 
*testing.T) {
+                               if cfn, err := NewDoFn(test.dfn); err != nil {
+                                       t.Logf("NewDoFn with SDF failed as 
expected:\n%v", err)
+                               } else {
+                                       t.Errorf("NewDoFn(%v) = %v, want 
failure", cfn.Name(), cfn)
+                               }
+                               // If validation fails with unknown main 
inputs, then it should
+                               // always fail for any known number of main 
inputs, so test them
+                               // all. Error messages won't necessarily match.
+                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainSingle)); err != nil {
+                                       
t.Logf("NewDoFn(NumMainInputs(MainSingle)) with SDF failed as expected:\n%v", 
err)
+                               } else {
+                                       t.Errorf("NewDoFn(%v, 
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+                               }
+                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainKv)); err != nil {

Review Comment:
   I actually like leaving it for 2 reasons:
   
   1) (more important) It will be relevant as these tests are expanded to 
future use cases
   2) (probably a weaker argument) This is still part of the testable interface 
- even if we know that under the covers we don't make use of KV info for 
watermark estimation testing, there's not a reason that _must_ be true. This 
test protects us if we want to make implementation changes that somehow involve 
KV awareness (like my first point)



##########
sdks/go/pkg/beam/core/runtime/exec/plan.go:
##########
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "google.golang.org/protobuf/types/known/timestamppb"

Review Comment:
   It is in several places, but most importantly in our OutputWatermarks proto 
- 
https://github.com/apache/beam/blob/81d3de2007f82703d93c420bf03b00d00c03e96e/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go#L922





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755702)
    Time Spent: 1h 40m  (was: 1.5h)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to