This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 32a0f09 [BEAM-13757] adds pane observation in DoFn (#16629)
32a0f09 is described below
commit 32a0f09b61d4146d48e5ba7d4f3433d9df98d586
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Thu Feb 3 13:44:55 2022 -0500
[BEAM-13757] adds pane observation in DoFn (#16629)
---
sdks/go/pkg/beam/core/funcx/fn.go | 39 ++++++-
sdks/go/pkg/beam/core/funcx/fn_test.go | 104 +++++++++++++++++-
sdks/go/pkg/beam/core/runtime/exec/decode.go | 2 +-
sdks/go/pkg/beam/core/runtime/exec/fn.go | 48 ++++----
sdks/go/pkg/beam/core/runtime/exec/fn_arity.go | 122 ++++++++++-----------
sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl | 14 +--
sdks/go/pkg/beam/core/runtime/exec/fn_test.go | 6 +-
sdks/go/pkg/beam/core/runtime/exec/pardo.go | 16 +--
sdks/go/test/integration/integration.go | 15 ++-
.../go/test/integration/primitives/window_panes.go | 51 +++++++++
.../integration/primitives/window_panes_test.go | 31 ++++++
11 files changed, 336 insertions(+), 112 deletions(-)
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go
b/sdks/go/pkg/beam/core/funcx/fn.go
index 97ca76a..55fbc25 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -17,9 +17,9 @@ package funcx
import (
"fmt"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"reflect"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -76,6 +76,8 @@ const (
// Example:
// "func(string) func (*int) bool"
FnMultiMap FnParamKind = 0x200
+ // FnPane indicates a function input parameter that is a PaneInfo
+ FnPane FnParamKind = 0x400
)
func (k FnParamKind) String() string {
@@ -100,6 +102,8 @@ func (k FnParamKind) String() string {
return "RTracker"
case FnMultiMap:
return "MultiMap"
+ case FnPane:
+ return "Pane"
default:
return fmt.Sprintf("%v", int(k))
}
@@ -243,6 +247,16 @@ func (u *Fn) Window() (pos int, exists bool) {
return -1, false
}
+// Pane returns (index, true) iff the function expects a PaneInfo.
+func (u *Fn) Pane() (pos int, exists bool) {
+ for i, p := range u.Param {
+ if p.Kind == FnPane {
+ return i, true
+ }
+ }
+ return -1, false
+}
+
// RTracker returns (index, true) iff the function expects an sdf.RTracker.
func (u *Fn) RTracker() (pos int, exists bool) {
for i, p := range u.Param {
@@ -329,6 +343,8 @@ func New(fn reflectx.Func) (*Fn, error) {
kind = FnReIter
case IsMultiMap(t):
kind = FnMultiMap
+ case t == typex.PaneInfoType:
+ kind = FnPane
default:
return nil, errors.Errorf("bad parameter type for %s:
%v", fn.Name(), t)
}
@@ -386,7 +402,7 @@ func SubReturns(list []ReturnParam, indices ...int)
[]ReturnParam {
}
// The order of present parameters and return values must be as follows:
-// func(FnContext?, FnWindow?, FnEventTime?, FnType?, FnRTracker?, (FnValue,
SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
+// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnType?, FnRTracker?,
(FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
// where ? indicates 0 or 1, and * indicates any number.
// and a SideInput is one of FnValue or FnIter or FnReIter
// Note: Fns with inputs must have at least one FnValue as the main input.
@@ -411,6 +427,7 @@ func validateOrder(u *Fn) error {
var (
errContextParam = errors.New("may only have a single
context.Context parameter and it must be the first parameter")
+ errPaneParamPrecedence = errors.New("may only have a single
PaneInfo parameter and it must precede the WindowParam, EventTime and main
input parameter")
errWindowParamPrecedence = errors.New("may only have a single Window
parameter and it must precede the EventTime and main input parameter")
errEventTimeParamPrecedence = errors.New("may only have a single
beam.EventTime parameter and it must precede the main input parameter")
errReflectTypePrecedence = errors.New("may only have a single
reflect.Type parameter and it must precede the main input parameter")
@@ -423,6 +440,7 @@ type paramState int
const (
psStart paramState = iota
psContext
+ psPane
psWindow
psEventTime
psType
@@ -437,6 +455,8 @@ func nextParamState(cur paramState, transition FnParamKind)
(paramState, error)
switch transition {
case FnContext:
return psContext, nil
+ case FnPane:
+ return psPane, nil
case FnWindow:
return psWindow, nil
case FnEventTime:
@@ -448,6 +468,19 @@ func nextParamState(cur paramState, transition
FnParamKind) (paramState, error)
}
case psContext:
switch transition {
+ case FnPane:
+ return psPane, nil
+ case FnWindow:
+ return psWindow, nil
+ case FnEventTime:
+ return psEventTime, nil
+ case FnType:
+ return psType, nil
+ case FnRTracker:
+ return psRTracker, nil
+ }
+ case psPane:
+ switch transition {
case FnWindow:
return psWindow, nil
case FnEventTime:
@@ -495,6 +528,8 @@ func nextParamState(cur paramState, transition FnParamKind)
(paramState, error)
switch transition {
case FnContext:
return -1, errContextParam
+ case FnPane:
+ return -1, errPaneParamPrecedence
case FnWindow:
return -1, errWindowParamPrecedence
case FnEventTime:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go
b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 3a8abc8..0a12bad 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -81,6 +81,11 @@ func TestNew(t *testing.T) {
Param: []FnParamKind{FnValue, FnMultiMap},
},
{
+ Name: "good7",
+ Fn: func(typex.PaneInfo, typex.Window,
typex.EventTime, reflect.Type, []byte) {},
+ Param: []FnParamKind{FnPane, FnWindow, FnEventTime,
FnType, FnValue},
+ },
+ {
Name: "good-method",
Fn: foo{1}.Do,
Param: []FnParamKind{FnContext, FnValue, FnValue},
@@ -123,6 +128,11 @@ func TestNew(t *testing.T) {
Err: errContextParam,
},
{
+ Name: "errPaneParam: after Window",
+ Fn: func(typex.Window, typex.PaneInfo, int) {},
+ Err: errPaneParamPrecedence,
+ },
+ {
Name: "errWindowParamPrecedence: after EventTime",
Fn: func(typex.EventTime, typex.Window, int) {
},
@@ -273,8 +283,7 @@ func TestEmits(t *testing.T) {
params[i].Kind = kind
params[i].T = nil
}
- fn := new(Fn)
- fn.Param = params
+ fn := &Fn{Param: params}
// Validate we get expected results for Emits function.
pos, num, exists := fn.Emits()
@@ -291,6 +300,94 @@ func TestEmits(t *testing.T) {
}
}
+func TestPane(t *testing.T) {
+ tests := []struct {
+ Name string
+ Params []FnParamKind
+ Pos int
+ Exists bool
+ }{
+ {
+ Name: "pane input",
+ Params: []FnParamKind{FnContext, FnPane},
+ Pos: 1,
+ Exists: true,
+ },
+ {
+ Name: "no pane input",
+ Params: []FnParamKind{FnContext, FnEventTime},
+ Pos: -1,
+ Exists: false,
+ },
+ }
+
+ for _, test := range tests {
+ test := test
+ t.Run(test.Name, func(t *testing.T) {
+ // Create a Fn with a filled params list.
+ params := make([]FnParam, len(test.Params))
+ for i, kind := range test.Params {
+ params[i].Kind = kind
+ params[i].T = nil
+ }
+ fn := &Fn{Param: params}
+
+ // Validate we get expected results for pane function.
+ pos, exists := fn.Pane()
+ if exists != test.Exists {
+ t.Errorf("Pane(%v) - exists: got %v, want %v",
params, exists, test.Exists)
+ }
+ if pos != test.Pos {
+ t.Errorf("Pane(%v) - pos: got %v, want %v",
params, pos, test.Pos)
+ }
+ })
+ }
+}
+
+func TestWindow(t *testing.T) {
+ tests := []struct {
+ Name string
+ Params []FnParamKind
+ Pos int
+ Exists bool
+ }{
+ {
+ Name: "window input",
+ Params: []FnParamKind{FnContext, FnWindow},
+ Pos: 1,
+ Exists: true,
+ },
+ {
+ Name: "no window input",
+ Params: []FnParamKind{FnContext, FnEventTime},
+ Pos: -1,
+ Exists: false,
+ },
+ }
+
+ for _, test := range tests {
+ test := test
+ t.Run(test.Name, func(t *testing.T) {
+ // Create a Fn with a filled params list.
+ params := make([]FnParam, len(test.Params))
+ for i, kind := range test.Params {
+ params[i].Kind = kind
+ params[i].T = nil
+ }
+ fn := &Fn{Param: params}
+
+ // Validate we get expected results for pane function.
+ pos, exists := fn.Window()
+ if exists != test.Exists {
+ t.Errorf("Window(%v) - exists: got %v, want
%v", params, exists, test.Exists)
+ }
+ if pos != test.Pos {
+ t.Errorf("Window(%v) - pos: got %v, want %v",
params, pos, test.Pos)
+ }
+ })
+ }
+}
+
func TestInputs(t *testing.T) {
tests := []struct {
Name string
@@ -352,8 +449,7 @@ func TestInputs(t *testing.T) {
params[i].Kind = kind
params[i].T = nil
}
- fn := new(Fn)
- fn.Param = params
+ fn := &Fn{Param: params}
// Validate we get expected results for Inputs function.
pos, num, exists := fn.Inputs()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/decode.go
b/sdks/go/pkg/beam/core/runtime/exec/decode.go
index a14713c..90d0c95 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/decode.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/decode.go
@@ -22,7 +22,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
-// Decoder is a uniform custom encoder interface. It wraps various
+// Decoder is a uniform custom decoder interface. It wraps various
// forms of reflectx.Funcs.
type Decoder interface {
// Decode decodes the []byte in to a value of the given type.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 064d483..456edeb 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -41,12 +41,12 @@ type MainInput struct {
// Invoke invokes the fn with the given values. The extra values must match
the non-main
// side input and emitters. It returns the direct output, if any.
-func Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn
*funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
+func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{})
(*FullValue, error) {
if fn == nil {
return nil, nil // ok: nothing to Invoke
}
inv := newInvoker(fn)
- return inv.Invoke(ctx, ws, ts, opt, extra...)
+ return inv.Invoke(ctx, pn, ws, ts, opt, extra...)
}
// InvokeWithoutEventTime runs the given function at time 0 in the global
window.
@@ -64,13 +64,13 @@ type invoker struct {
fn *funcx.Fn
args []interface{}
// TODO(lostluck): 2018/07/06 consider replacing with a slice of
functions to run over the args slice, as an improvement.
- ctxIdx, wndIdx, etIdx int // specialized input indexes
- outEtIdx, outErrIdx int // specialized output indexes
- in, out []int // general indexes
+ ctxIdx, pnIdx, wndIdx, etIdx int // specialized input indexes
+ outEtIdx, outErrIdx int // specialized output indexes
+ in, out []int // general indexes
ret FullValue // ret is a
cached allocation for passing to the next Unit. Units never modify the passed
in FullValue.
elmConvert, elm2Convert func(interface{}) interface{} // Cached
conversion functions, which assums this invoker is always used with the same
parameter types.
- call func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error)
+ call func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error)
}
func newInvoker(fn *funcx.Fn) *invoker {
@@ -84,6 +84,9 @@ func newInvoker(fn *funcx.Fn) *invoker {
if n.ctxIdx, ok = fn.Context(); !ok {
n.ctxIdx = -1
}
+ if n.pnIdx, ok = fn.Pane(); !ok {
+ n.pnIdx = -1
+ }
if n.wndIdx, ok = fn.Window(); !ok {
n.wndIdx = -1
}
@@ -113,12 +116,12 @@ func (n *invoker) Reset() {
// InvokeWithoutEventTime runs the function at time 0 in the global window.
func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput,
extra ...interface{}) (*FullValue, error) {
- return n.Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp,
opt, extra...)
+ return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow,
mtime.ZeroTimestamp, opt, extra...)
}
// Invoke invokes the fn with the given values. The extra values must match
the non-main
// side input and emitters. It returns the direct output, if any.
-func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts
typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) {
+func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{})
(*FullValue, error) {
// (1) Populate contexts
// extract these to make things easier to read.
args := n.args
@@ -128,6 +131,9 @@ func (n *invoker) Invoke(ctx context.Context, ws
[]typex.Window, ts typex.EventT
if n.ctxIdx >= 0 {
args[n.ctxIdx] = ctx
}
+ if n.pnIdx >= 0 {
+ args[n.pnIdx] = pn
+ }
if n.wndIdx >= 0 {
if len(ws) != 1 {
return nil, errors.Errorf("DoFns that observe windows
must be invoked with single window: %v", opt.Key.Windows)
@@ -183,12 +189,12 @@ func (n *invoker) Invoke(ctx context.Context, ws
[]typex.Window, ts typex.EventT
}
// (4) Invoke
- return n.call(ws, ts)
+ return n.call(pn, ws, ts)
}
// ret1 handles processing of a single return value.
// Errors or single values are the only options.
-func (n *invoker) ret1(ws []typex.Window, ts typex.EventTime, r0 interface{})
(*FullValue, error) {
+func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0 interface{}) (*FullValue, error) {
switch {
case n.outErrIdx >= 0:
if r0 != nil {
@@ -198,44 +204,44 @@ func (n *invoker) ret1(ws []typex.Window, ts
typex.EventTime, r0 interface{}) (*
case n.outEtIdx >= 0:
panic("invoker.ret1: cannot return event time without a value")
default:
- n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0}
+ n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
return &n.ret, nil
}
}
// ret2 handles processing of a pair of return values.
-func (n *invoker) ret2(ws []typex.Window, ts typex.EventTime, r0, r1
interface{}) (*FullValue, error) {
+func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0, r1 interface{}) (*FullValue, error) {
switch {
case n.outErrIdx >= 0:
if r1 != nil {
return nil, r1.(error)
}
- n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0}
+ n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
return &n.ret, nil
case n.outEtIdx == 0:
- n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1}
+ n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Pane: pn}
return &n.ret, nil
default:
- n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1}
+ n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2:
r1, Pane: pn}
return &n.ret, nil
}
}
// ret3 handles processing of a trio of return values.
-func (n *invoker) ret3(ws []typex.Window, ts typex.EventTime, r0, r1, r2
interface{}) (*FullValue, error) {
+func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
switch {
case n.outErrIdx >= 0:
if r2 != nil {
return nil, r2.(error)
}
if n.outEtIdx < 0 {
- n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0,
Elm2: r1}
+ n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0,
Elm2: r1, Pane: pn}
return &n.ret, nil
}
- n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1}
+ n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Pane: pn}
return &n.ret, nil
case n.outEtIdx == 0:
- n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Elm2: r2}
+ n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime),
Elm: r1, Elm2: r2, Pane: pn}
return &n.ret, nil
default:
panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match
permitted return values.", r0, r1, r2))
@@ -243,11 +249,11 @@ func (n *invoker) ret3(ws []typex.Window, ts
typex.EventTime, r0, r1, r2 interfa
}
// ret4 handles processing of a quad of return values.
-func (n *invoker) ret4(ws []typex.Window, ts typex.EventTime, r0, r1, r2, r3
interface{}) (*FullValue, error) {
+func (n *invoker) ret4(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime, r0, r1, r2, r3 interface{}) (*FullValue, error) {
if r3 != nil {
return nil, r3.(error)
}
- n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm:
r1, Elm2: r2}
+ n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm:
r1, Elm2: r2, Pane: pn}
return &n.ret, nil
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
index 4e9fdf5..960edff 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
@@ -33,199 +33,199 @@ func (n *invoker) initCall() {
switch fn := n.fn.Fn.(type) {
case reflectx.Func0x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call0x0()
return nil, nil
}
case reflectx.Func1x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call1x0(n.args[0])
return nil, nil
}
case reflectx.Func2x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call2x0(n.args[0], n.args[1])
return nil, nil
}
case reflectx.Func3x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call3x0(n.args[0], n.args[1], n.args[2])
return nil, nil
}
case reflectx.Func4x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call4x0(n.args[0], n.args[1], n.args[2], n.args[3])
return nil, nil
}
case reflectx.Func5x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call5x0(n.args[0], n.args[1], n.args[2], n.args[3],
n.args[4])
return nil, nil
}
case reflectx.Func6x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call6x0(n.args[0], n.args[1], n.args[2], n.args[3],
n.args[4], n.args[5])
return nil, nil
}
case reflectx.Func7x0:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
fn.Call7x0(n.args[0], n.args[1], n.args[2], n.args[3],
n.args[4], n.args[5], n.args[6])
return nil, nil
}
case reflectx.Func0x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call0x1()
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func1x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call1x1(n.args[0])
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func2x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call2x1(n.args[0], n.args[1])
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func3x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call3x1(n.args[0], n.args[1], n.args[2])
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func4x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call4x1(n.args[0], n.args[1], n.args[2],
n.args[3])
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func5x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call5x1(n.args[0], n.args[1], n.args[2],
n.args[3], n.args[4])
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func6x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call6x1(n.args[0], n.args[1], n.args[2],
n.args[3], n.args[4], n.args[5])
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func7x1:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0 := fn.Call7x1(n.args[0], n.args[1], n.args[2],
n.args[3], n.args[4], n.args[5], n.args[6])
- return n.ret1(ws, ts, r0)
+ return n.ret1(pn, ws, ts, r0)
}
case reflectx.Func0x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call0x2()
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func1x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call1x2(n.args[0])
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func2x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call2x2(n.args[0], n.args[1])
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func3x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call3x2(n.args[0], n.args[1], n.args[2])
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func4x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call4x2(n.args[0], n.args[1], n.args[2],
n.args[3])
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func5x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call5x2(n.args[0], n.args[1], n.args[2],
n.args[3], n.args[4])
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func6x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call6x2(n.args[0], n.args[1], n.args[2],
n.args[3], n.args[4], n.args[5])
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func7x2:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1 := fn.Call7x2(n.args[0], n.args[1], n.args[2],
n.args[3], n.args[4], n.args[5], n.args[6])
- return n.ret2(ws, ts, r0, r1)
+ return n.ret2(pn, ws, ts, r0, r1)
}
case reflectx.Func0x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call0x3()
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
case reflectx.Func1x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call1x3(n.args[0])
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
case reflectx.Func2x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call2x3(n.args[0], n.args[1])
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
case reflectx.Func3x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call3x3(n.args[0], n.args[1],
n.args[2])
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
case reflectx.Func4x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call4x3(n.args[0], n.args[1],
n.args[2], n.args[3])
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
case reflectx.Func5x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call5x3(n.args[0], n.args[1],
n.args[2], n.args[3], n.args[4])
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
case reflectx.Func6x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call6x3(n.args[0], n.args[1],
n.args[2], n.args[3], n.args[4], n.args[5])
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
case reflectx.Func7x3:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
r0, r1, r2 := fn.Call7x3(n.args[0], n.args[1],
n.args[2], n.args[3], n.args[4], n.args[5], n.args[6])
- return n.ret3(ws, ts, r0, r1, r2)
+ return n.ret3(pn, ws, ts, r0, r1, r2)
}
default:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
ret := n.fn.Fn.Call(n.args)
if n.outErrIdx >= 0 && ret[n.outErrIdx] != nil {
return nil, ret[n.outErrIdx].(error)
@@ -237,13 +237,13 @@ func (n *invoker) initCall() {
case 0:
return nil, nil
case 1:
- return n.ret1(ws, ts, ret[0])
+ return n.ret1(pn, ws, ts, ret[0])
case 2:
- return n.ret2(ws, ts, ret[0], ret[1])
+ return n.ret2(pn, ws, ts, ret[0], ret[1])
case 3:
- return n.ret3(ws, ts, ret[0], ret[1], ret[2])
+ return n.ret3(pn, ws, ts, ret[0], ret[1],
ret[2])
case 4:
- return n.ret4(ws, ts, ret[0], ret[1], ret[2],
ret[3])
+ return n.ret4(pn, ws, ts, ret[0], ret[1],
ret[2], ret[3])
}
panic(fmt.Sprintf("invoker: %v has > 4 return values,
which is not permitted", n.fn.Fn.Name()))
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
index 2f5a25d..29bee7f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
@@ -32,10 +32,10 @@ func (n *invoker) initCall() {
{{range $out := upto 4}}
{{range $in := upto 8}}
case reflectx.Func{{$in}}x{{$out}}:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
{{if $out}}{{mktuplef $out "r%v"}} :=
{{end}}fn.Call{{$in}}x{{$out}}({{mktuplef $in "n.args[%v]"}})
{{- if $out}}
- return n.ret{{$out}}(ws, ts, {{mktuplef $out "r%v"}})
+ return n.ret{{$out}}(pn, ws, ts, {{mktuplef $out
"r%v"}})
{{- else}}
return nil, nil
{{- end}}
@@ -43,7 +43,7 @@ func (n *invoker) initCall() {
{{end}}
{{end}}
default:
- n.call = func(ws []typex.Window, ts typex.EventTime)
(*FullValue, error) {
+ n.call = func(pn typex.PaneInfo, ws []typex.Window, ts
typex.EventTime) (*FullValue, error) {
ret := n.fn.Fn.Call(n.args)
if n.outErrIdx >= 0 && ret[n.outErrIdx] != nil {
return nil, ret[n.outErrIdx].(error)
@@ -55,13 +55,13 @@ func (n *invoker) initCall() {
case 0:
return nil, nil
case 1:
- return n.ret1(ws, ts, ret[0])
+ return n.ret1(pn, ws, ts, ret[0])
case 2:
- return n.ret2(ws, ts, ret[0], ret[1])
+ return n.ret2(pn, ws, ts, ret[0], ret[1])
case 3:
- return n.ret3(ws, ts, ret[0], ret[1], ret[2])
+ return n.ret3(pn, ws, ts, ret[0], ret[1],
ret[2])
case 4:
- return n.ret4(ws, ts, ret[0], ret[1], ret[2],
ret[3])
+ return n.ret4(pn, ws, ts, ret[0], ret[1],
ret[2], ret[3])
}
panic(fmt.Sprintf("invoker: %v has > 4 return values,
which is not permitted", n.fn.Fn.Name()))
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
index ba5e6d1..b7ab501 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
@@ -178,7 +178,7 @@ func TestInvoke(t *testing.T) {
test.ExpectedTime = ts
}
- val, err := Invoke(context.Background(),
window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
+ val, err := Invoke(context.Background(),
typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
if err != nil {
t.Fatalf("Invoke(%v,%v) failed: %v",
fn.Fn.Name(), test.Args, err)
}
@@ -314,7 +314,7 @@ func BenchmarkInvoke(b *testing.B) {
ts := mtime.ZeroTimestamp.Add(2 * time.Millisecond)
b.Run(fmt.Sprintf("SingleInvoker_%s", test.Name), func(b
*testing.B) {
for i := 0; i < b.N; i++ {
- _, err := Invoke(context.Background(),
window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
+ _, err := Invoke(context.Background(),
typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
if err != nil {
b.Fatalf("Invoke(%v,%v) failed: %v",
fn.Fn.Name(), test.Args, err)
}
@@ -323,7 +323,7 @@ func BenchmarkInvoke(b *testing.B) {
b.Run(fmt.Sprintf("CachedInvoker_%s", test.Name), func(b
*testing.B) {
inv := newInvoker(fn)
for i := 0; i < b.N; i++ {
- _, err := inv.Invoke(context.Background(),
window.SingleGlobalWindow, ts, test.Opt, test.Args...)
+ _, err := inv.Invoke(context.Background(),
typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, test.Args...)
if err != nil {
b.Fatalf("Invoke(%v,%v) failed: %v",
fn.Fn.Name(), test.Args, err)
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 7518ee8..acd0867 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -115,7 +115,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string,
data DataContext) er
// TODO(BEAM-3303): what to set for StartBundle/FinishBundle window and
emitter timestamp?
- if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err != nil {
+ if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(),
window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err
!= nil {
return n.fail(err)
}
return nil
@@ -147,7 +147,7 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
} else {
for _, w := range elm.Windows {
elm := &mainIn.Key
- wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2,
Timestamp: elm.Timestamp, Windows: []typex.Window{w}}
+ wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2,
Timestamp: elm.Timestamp, Windows: []typex.Window{w}, Pane: elm.Pane}
err := n.processSingleWindow(&MainInput{Key: wElm,
Values: mainIn.Values, RTracker: mainIn.RTracker})
if err != nil {
return n.fail(err)
@@ -163,7 +163,7 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
// each individual window by exploding the windows first.
func (n *ParDo) processSingleWindow(mainIn *MainInput) error {
elm := &mainIn.Key
- val, err := n.invokeProcessFn(n.ctx, elm.Windows, elm.Timestamp, mainIn)
+ val, err := n.invokeProcessFn(n.ctx, elm.Pane, elm.Windows,
elm.Timestamp, mainIn)
if err != nil {
return n.fail(err)
}
@@ -208,7 +208,7 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
n.states.Set(n.ctx, metrics.FinishBundle)
- if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow,
mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
+ if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(),
window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil);
err != nil {
return n.fail(err)
}
n.reader = nil
@@ -282,7 +282,7 @@ func (n *ParDo) initSideInput(ctx context.Context, w
typex.Window) error {
}
// invokeDataFn handle non-per element invocations.
-func (n *ParDo) invokeDataFn(ctx context.Context, ws []typex.Window, ts
typex.EventTime, fn *funcx.Fn, opt *MainInput) (val *FullValue, err error) {
+func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput) (val
*FullValue, err error) {
if fn == nil {
return nil, nil
}
@@ -295,7 +295,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, ws
[]typex.Window, ts typex.Ev
if err := n.preInvoke(ctx, ws, ts); err != nil {
return nil, err
}
- val, err = Invoke(ctx, ws, ts, fn, opt, n.cache.extra...)
+ val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.cache.extra...)
if err != nil {
return nil, err
}
@@ -303,7 +303,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, ws
[]typex.Window, ts typex.Ev
}
// invokeProcessFn handles the per element invocations
-func (n *ParDo) invokeProcessFn(ctx context.Context, ws []typex.Window, ts
typex.EventTime, opt *MainInput) (val *FullValue, err error) {
+func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws
[]typex.Window, ts typex.EventTime, opt *MainInput) (val *FullValue, err error)
{
// Defer side input clean-up in case of panic
defer func() {
if postErr := n.postInvoke(); postErr != nil {
@@ -313,7 +313,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, ws
[]typex.Window, ts typex
if err := n.preInvoke(ctx, ws, ts); err != nil {
return nil, err
}
- val, err = n.inv.Invoke(ctx, ws, ts, opt, n.cache.extra...)
+ val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.cache.extra...)
if err != nil {
return nil, err
}
diff --git a/sdks/go/test/integration/integration.go
b/sdks/go/test/integration/integration.go
index 0355048..f55a199 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -65,8 +65,9 @@ var directFilters = []string{
"TestXLang.*",
"TestKafkaIO.*",
"TestJDBCIO_BasicReadWrite",
- // Triggers are not yet supported
+ // Triggers, Panes are not yet supported
"TestTrigger.*",
+ "TestPanes",
// The direct runner does not support the TestStream primitive
"TestTestStream.*",
// (BEAM-13075): The direct runner does not support windowed side inputs
@@ -79,8 +80,9 @@ var directFilters = []string{
var portableFilters = []string{
// The portable runner does not support the TestStream primitive
"TestTestStream.*",
- // The trigger tests uses TestStream
+ // The trigger and pane tests uses TestStream
"TestTrigger.*",
+ "TestPanes",
// TODO(BEAM-12797): Python portable runner times out on Kafka reads.
"TestKafkaIO.*",
// TODO(BEAM-13778) needs a schemaio expansion service address flag
@@ -106,8 +108,9 @@ var samzaFilters = []string{
"TestReshuffleKV",
// The Samza runner does not support the TestStream primitive
"TestTestStream.*",
- // The trigger tests uses TestStream
+ // The trigger and pane tests uses TestStream
"TestTrigger.*",
+ "TestPanes",
// TODO(BEAM-13006): Samza doesn't yet support post job metrics, used
by WordCount
"TestWordCount.*",
// TODO(BEAM-13778) needs a schemaio expansion service address flag
@@ -121,8 +124,9 @@ var sparkFilters = []string{
"TestParDoKVSideInput",
// The Spark runner does not support the TestStream primitive
"TestTestStream.*",
- // The trigger tests uses TestStream
+ // The trigger and pane tests uses TestStream
"TestTrigger.*",
+ "TestPanes",
// TODO(BEAM-13778) needs a schemaio expansion service address flag
"TestJDBCIO_BasicReadWrite",
}
@@ -134,8 +138,9 @@ var dataflowFilters = []string{
"TestFlattenDup",
// The Dataflow runner does not support the TestStream primitive
"TestTestStream.*",
- // The trigger tests uses TestStream
+ // The trigger and pane tests uses TestStream
"TestTrigger.*",
+ "TestPanes",
// There is no infrastructure for running KafkaIO tests with Dataflow.
"TestKafkaIO.*",
// Dataflow doesn't support any test that requires loopback.
diff --git a/sdks/go/test/integration/primitives/window_panes.go
b/sdks/go/test/integration/primitives/window_panes.go
new file mode 100644
index 0000000..b22a2af
--- /dev/null
+++ b/sdks/go/test/integration/primitives/window_panes.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
+)
+
+func init() {
+ beam.RegisterFunction(PanesFn)
+}
+
+func PanesFn(pn typex.PaneInfo, value float64, emit func(int)) {
+ emit(int(pn.Timing))
+}
+
+func Panes(s beam.Scope) {
+ s.Scope("increment")
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+
+ windowed := beam.WindowInto(s, window.NewFixedWindows(windowSize), col,
[]beam.WindowIntoOption{
+ beam.Trigger(trigger.Always()),
+ }...)
+ sums := beam.ParDo(s, PanesFn, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Count(s, sums, "number of firings", 3)
+}
diff --git a/sdks/go/test/integration/primitives/window_panes_test.go
b/sdks/go/test/integration/primitives/window_panes_test.go
new file mode 100644
index 0000000..6a941ac
--- /dev/null
+++ b/sdks/go/test/integration/primitives/window_panes_test.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+ "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+func TestPanes(t *testing.T) {
+ integration.CheckFilters(t)
+ p, s := beam.NewPipelineWithRoot()
+ Panes(s)
+ ptest.RunAndValidate(t, p)
+}