[ 
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=755912&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755912
 ]

ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/22 17:34
            Start Date: 12/Apr/22 17:34
    Worklog Time Spent: 10m 
      Work Description: youngoli commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848692292


##########
sdks/go/pkg/beam/core/graph/fn_test.go:
##########
@@ -227,6 +228,59 @@ func TestNewDoFnSdf(t *testing.T) {
        })
 }
 
+func TestNewDoFnWatermarkEstimating(t *testing.T) {
+       t.Run("valid", func(t *testing.T) {
+               tests := []struct {
+                       dfn  interface{}
+                       main mainInputs
+               }{
+                       {dfn: &GoodWatermarkEstimating{}, main: MainSingle},
+               }
+
+               for _, test := range tests {
+                       t.Run(reflect.TypeOf(test.dfn).String(), func(t 
*testing.T) {
+                               // Valid DoFns should pass validation with and 
without KV info.
+                               if _, err := NewDoFn(test.dfn); err != nil {
+                                       t.Fatalf("NewDoFn with Watermark 
Estimation failed: %v", err)
+                               }
+                               if _, err := NewDoFn(test.dfn, 
NumMainInputs(test.main)); err != nil {
+                                       t.Fatalf("NewDoFn(NumMainInputs(%v)) 
with Watermark Estimation failed: %v", test.main, err)
+                               }
+                       })
+               }
+       })
+       t.Run("invalid", func(t *testing.T) {
+               tests := []struct {
+                       dfn interface{}
+               }{
+                       {dfn: &BadWatermarkEstimatingNonSdf{}},
+                       {dfn: 
&BadWatermarkEstimatingCreateWatermarkEstimatorReturnType{}},
+               }
+               for _, test := range tests {
+                       t.Run(reflect.TypeOf(test.dfn).String(), func(t 
*testing.T) {
+                               if cfn, err := NewDoFn(test.dfn); err != nil {
+                                       t.Logf("NewDoFn with SDF failed as 
expected:\n%v", err)
+                               } else {
+                                       t.Errorf("NewDoFn(%v) = %v, want 
failure", cfn.Name(), cfn)
+                               }
+                               // If validation fails with unknown main 
inputs, then it should
+                               // always fail for any known number of main 
inputs, so test them
+                               // all. Error messages won't necessarily match.
+                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainSingle)); err != nil {
+                                       
t.Logf("NewDoFn(NumMainInputs(MainSingle)) with SDF failed as expected:\n%v", 
err)
+                               } else {
+                                       t.Errorf("NewDoFn(%v, 
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+                               }
+                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainKv)); err != nil {

Review Comment:
   That sounds reasonable to me.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 755912)
    Time Spent: 2h 20m  (was: 2h 10m)

> [Go SDK] Watermark Estimation
> -----------------------------
>
>                 Key: BEAM-11105
>                 URL: https://issues.apache.org/jira/browse/BEAM-11105
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Danny McCormick
>            Priority: P3
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to