[
https://issues.apache.org/jira/browse/BEAM-11105?focusedWorklogId=755702&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755702
]
ASF GitHub Bot logged work on BEAM-11105:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Apr/22 11:34
Start Date: 12/Apr/22 11:34
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848330886
##########
sdks/go/pkg/beam/core/graph/fn_test.go:
##########
@@ -227,6 +228,59 @@ func TestNewDoFnSdf(t *testing.T) {
})
}
+func TestNewDoFnWatermarkEstimating(t *testing.T) {
+ t.Run("valid", func(t *testing.T) {
+ tests := []struct {
+ dfn interface{}
+ main mainInputs
+ }{
+ {dfn: &GoodWatermarkEstimating{}, main: MainSingle},
+ }
+
+ for _, test := range tests {
+ t.Run(reflect.TypeOf(test.dfn).String(), func(t
*testing.T) {
+ // Valid DoFns should pass validation with and
without KV info.
+ if _, err := NewDoFn(test.dfn); err != nil {
+ t.Fatalf("NewDoFn with Watermark
Estimation failed: %v", err)
+ }
+ if _, err := NewDoFn(test.dfn,
NumMainInputs(test.main)); err != nil {
+ t.Fatalf("NewDoFn(NumMainInputs(%v))
with Watermark Estimation failed: %v", test.main, err)
+ }
+ })
+ }
+ })
+ t.Run("invalid", func(t *testing.T) {
+ tests := []struct {
+ dfn interface{}
+ }{
+ {dfn: &BadWatermarkEstimatingNonSdf{}},
+ {dfn:
&BadWatermarkEstimatingCreateWatermarkEstimatorReturnType{}},
+ }
+ for _, test := range tests {
+ t.Run(reflect.TypeOf(test.dfn).String(), func(t
*testing.T) {
+ if cfn, err := NewDoFn(test.dfn); err != nil {
+ t.Logf("NewDoFn with SDF failed as
expected:\n%v", err)
+ } else {
+ t.Errorf("NewDoFn(%v) = %v, want
failure", cfn.Name(), cfn)
+ }
+ // If validation fails with unknown main
inputs, then it should
+ // always fail for any known number of main
inputs, so test them
+ // all. Error messages won't necessarily match.
+ if cfn, err := NewDoFn(test.dfn,
NumMainInputs(MainSingle)); err != nil {
+
t.Logf("NewDoFn(NumMainInputs(MainSingle)) with SDF failed as expected:\n%v",
err)
+ } else {
+ t.Errorf("NewDoFn(%v,
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+ }
+ if cfn, err := NewDoFn(test.dfn,
NumMainInputs(MainKv)); err != nil {
Review Comment:
I actually like leaving it for 2 reasons:
1) (more important) It will be relevant as these tests are expanded to
future use cases
2) (probably a weaker argument) This is still part of the testable interface
- even if we know that under the covers we don't make use of KV info for
watermark estimation testing, there's not a reason that _must_ be true. This
test protects us if we want to make implementation changes that somehow involve
KV awareness (like my first point)
##########
sdks/go/pkg/beam/core/runtime/exec/plan.go:
##########
@@ -24,6 +24,7 @@ import (
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "google.golang.org/protobuf/types/known/timestamppb"
Review Comment:
It is in several places, but most importantly in our OutputWatermarks proto
-
https://github.com/apache/beam/blob/81d3de2007f82703d93c420bf03b00d00c03e96e/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go#L922
Issue Time Tracking
-------------------
Worklog Id: (was: 755702)
Time Spent: 1h 40m (was: 1.5h)
> [Go SDK] Watermark Estimation
> -----------------------------
>
> Key: BEAM-11105
> URL: https://issues.apache.org/jira/browse/BEAM-11105
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Danny McCormick
> Priority: P3
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Allow self checkpointing SplittableDoFns to specify a watermark estimator.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)