This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9eb86446eb4 [BEAM-14473] Throw error if using globally windowed, 
unbounded side input (#17681)
9eb86446eb4 is described below

commit 9eb86446eb4c609138e29ead4617331918e120f4
Author: Jack McCluskey <[email protected]>
AuthorDate: Mon May 16 12:33:33 2022 -0400

    [BEAM-14473] Throw error if using globally windowed, unbounded side input 
(#17681)
---
 sdks/go/pkg/beam/pardo.go      |  5 ++++-
 sdks/go/pkg/beam/pardo_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index aad86b6a02e..e2d536cb4f0 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -60,7 +60,10 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
                        return nil, fmt.Errorf("error with side input %d in 
DoFn %v: PCollections using merging WindowFns are not supported as side inputs. 
Consider re-windowing the side input PCollection before use", i, fn)
                }
                if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != 
window.GlobalWindows) {
-                       return nil, fmt.Errorf("main input is global windowed 
in DoFn %v but side input %v is not, cannot map windows correctly. Consider 
re-windowing the side input PCOllection before use", fn, i)
+                       return nil, fmt.Errorf("main input is global windowed 
in DoFn %v but side input %v is not, cannot map windows correctly. Consider 
re-windowing the side input PCollection before use", fn, i)
+               }
+               if (sideWfn.Kind == window.GlobalWindows) && 
!sideNode.Bounded() {
+                       return nil, fmt.Errorf("side input %v is global 
windowed in DoFn %v but is unbounded, DoFn will block until end of Global 
Window. Consider windowing your unbounded side input PCollection before use", 
i, fn)
                }
                in = append(in, s.Input.n)
        }
diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go
index f4dd8e96b1f..b15cecbf5a1 100644
--- a/sdks/go/pkg/beam/pardo_test.go
+++ b/sdks/go/pkg/beam/pardo_test.go
@@ -21,8 +21,14 @@ import (
        "reflect"
        "strings"
        "testing"
+       "time"
 
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
 )
 
@@ -132,3 +138,44 @@ type AnnotationsFn struct {
 func (fn *AnnotationsFn) ProcessElement(v int) int {
        return v
 }
+
+func doNothing(_ []byte, _ int) {}
+func TestParDoSideInputValdiation(t *testing.T) {
+       var tests = []struct {
+               name      string
+               wFn       *window.Fn
+               isBounded bool
+       }{
+               {
+                       "global window unbounded",
+                       window.NewGlobalWindows(),
+                       false,
+               },
+               {
+                       "side input session windowed",
+                       window.NewSessions(1 * time.Minute),
+                       true,
+               },
+               {
+                       "global main, interval side",
+                       window.NewFixedWindows(10 * time.Second),
+                       true,
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       p := NewPipeline()
+                       s := p.Root()
+
+                       strat := &window.WindowingStrategy{Fn: test.wFn, 
Trigger: trigger.Default(), AccumulationMode: window.Discarding, 
AllowedLateness: 0}
+                       sideCol := PCollection{n: 
graph.New().NewNode(typex.New(reflectx.Int), strat, test.isBounded)}
+                       outCol, err := TryParDo(s, doNothing, Impulse(s), 
SideInput{Input: sideCol})
+                       if outCol != nil {
+                               t.Errorf("TryParDo() produced an output 
PCollection when it should have failed, got %v", outCol)
+                       }
+                       if err == nil {
+                               t.Errorf("TryParDo() did not return an error 
when it should have")
+                       }
+               })
+       }
+}

Reply via email to