This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 32a0f09  [BEAM-13757] adds pane observation in DoFn (#16629)
32a0f09 is described below

commit 32a0f09b61d4146d48e5ba7d4f3433d9df98d586
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Thu Feb 3 13:44:55 2022 -0500

    [BEAM-13757] adds pane observation in DoFn (#16629)
---
 sdks/go/pkg/beam/core/funcx/fn.go                  |  39 ++++++-
 sdks/go/pkg/beam/core/funcx/fn_test.go             | 104 +++++++++++++++++-
 sdks/go/pkg/beam/core/runtime/exec/decode.go       |   2 +-
 sdks/go/pkg/beam/core/runtime/exec/fn.go           |  48 ++++----
 sdks/go/pkg/beam/core/runtime/exec/fn_arity.go     | 122 ++++++++++-----------
 sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl   |  14 +--
 sdks/go/pkg/beam/core/runtime/exec/fn_test.go      |   6 +-
 sdks/go/pkg/beam/core/runtime/exec/pardo.go        |  16 +--
 sdks/go/test/integration/integration.go            |  15 ++-
 .../go/test/integration/primitives/window_panes.go |  51 +++++++++
 .../integration/primitives/window_panes_test.go    |  31 ++++++
 11 files changed, 336 insertions(+), 112 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index 97ca76a..55fbc25 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -17,9 +17,9 @@ package funcx
 
 import (
        "fmt"
-       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "reflect"
 
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -76,6 +76,8 @@ const (
        // Example:
        //                              "func(string) func (*int) bool"
        FnMultiMap FnParamKind = 0x200
+       // FnPane indicates a function input parameter that is a PaneInfo
+       FnPane FnParamKind = 0x400
 )
 
 func (k FnParamKind) String() string {
@@ -100,6 +102,8 @@ func (k FnParamKind) String() string {
                return "RTracker"
        case FnMultiMap:
                return "MultiMap"
+       case FnPane:
+               return "Pane"
        default:
                return fmt.Sprintf("%v", int(k))
        }
@@ -243,6 +247,16 @@ func (u *Fn) Window() (pos int, exists bool) {
        return -1, false
 }
 
+// Pane returns (index, true) iff the function expects a PaneInfo.
+func (u *Fn) Pane() (pos int, exists bool) {
+       for i, p := range u.Param {
+               if p.Kind == FnPane {
+                       return i, true
+               }
+       }
+       return -1, false
+}
+
 // RTracker returns (index, true) iff the function expects an sdf.RTracker.
 func (u *Fn) RTracker() (pos int, exists bool) {
        for i, p := range u.Param {
@@ -329,6 +343,8 @@ func New(fn reflectx.Func) (*Fn, error) {
                        kind = FnReIter
                case IsMultiMap(t):
                        kind = FnMultiMap
+               case t == typex.PaneInfoType:
+                       kind = FnPane
                default:
                        return nil, errors.Errorf("bad parameter type for %s: 
%v", fn.Name(), t)
                }
@@ -386,7 +402,7 @@ func SubReturns(list []ReturnParam, indices ...int) 
[]ReturnParam {
 }
 
 // The order of present parameters and return values must be as follows:
-// func(FnContext?, FnWindow?, FnEventTime?, FnType?, FnRTracker?, (FnValue, 
SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
+// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnType?, FnRTracker?, 
(FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
 //     where ? indicates 0 or 1, and * indicates any number.
 //     and  a SideInput is one of FnValue or FnIter or FnReIter
 // Note: Fns with inputs must have at least one FnValue as the main input.
@@ -411,6 +427,7 @@ func validateOrder(u *Fn) error {
 
 var (
        errContextParam             = errors.New("may only have a single 
context.Context parameter and it must be the first parameter")
+       errPaneParamPrecedence      = errors.New("may only have a single 
PaneInfo parameter and it must precede the WindowParam, EventTime and main 
input parameter")
        errWindowParamPrecedence    = errors.New("may only have a single Window 
parameter and it must precede the EventTime and main input parameter")
        errEventTimeParamPrecedence = errors.New("may only have a single 
beam.EventTime parameter and it must precede the main input parameter")
        errReflectTypePrecedence    = errors.New("may only have a single 
reflect.Type parameter and it must precede the main input parameter")
@@ -423,6 +440,7 @@ type paramState int
 const (
        psStart paramState = iota
        psContext
+       psPane
        psWindow
        psEventTime
        psType
@@ -437,6 +455,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                switch transition {
                case FnContext:
                        return psContext, nil
+               case FnPane:
+                       return psPane, nil
                case FnWindow:
                        return psWindow, nil
                case FnEventTime:
@@ -448,6 +468,19 @@ func nextParamState(cur paramState, transition 
FnParamKind) (paramState, error)
                }
        case psContext:
                switch transition {
+               case FnPane:
+                       return psPane, nil
+               case FnWindow:
+                       return psWindow, nil
+               case FnEventTime:
+                       return psEventTime, nil
+               case FnType:
+                       return psType, nil
+               case FnRTracker:
+                       return psRTracker, nil
+               }
+       case psPane:
+               switch transition {
                case FnWindow:
                        return psWindow, nil
                case FnEventTime:
@@ -495,6 +528,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
        switch transition {
        case FnContext:
                return -1, errContextParam
+       case FnPane:
+               return -1, errPaneParamPrecedence
        case FnWindow:
                return -1, errWindowParamPrecedence
        case FnEventTime:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go 
b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 3a8abc8..0a12bad 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -81,6 +81,11 @@ func TestNew(t *testing.T) {
                        Param: []FnParamKind{FnValue, FnMultiMap},
                },
                {
+                       Name:  "good7",
+                       Fn:    func(typex.PaneInfo, typex.Window, 
typex.EventTime, reflect.Type, []byte) {},
+                       Param: []FnParamKind{FnPane, FnWindow, FnEventTime, 
FnType, FnValue},
+               },
+               {
                        Name:  "good-method",
                        Fn:    foo{1}.Do,
                        Param: []FnParamKind{FnContext, FnValue, FnValue},
@@ -123,6 +128,11 @@ func TestNew(t *testing.T) {
                        Err:  errContextParam,
                },
                {
+                       Name: "errPaneParam: after Window",
+                       Fn:   func(typex.Window, typex.PaneInfo, int) {},
+                       Err:  errPaneParamPrecedence,
+               },
+               {
                        Name: "errWindowParamPrecedence: after EventTime",
                        Fn: func(typex.EventTime, typex.Window, int) {
                        },
@@ -273,8 +283,7 @@ func TestEmits(t *testing.T) {
                                params[i].Kind = kind
                                params[i].T = nil
                        }
-                       fn := new(Fn)
-                       fn.Param = params
+                       fn := &Fn{Param: params}
 
                        // Validate we get expected results for Emits function.
                        pos, num, exists := fn.Emits()
@@ -291,6 +300,94 @@ func TestEmits(t *testing.T) {
        }
 }
 
+func TestPane(t *testing.T) {
+       tests := []struct {
+               Name   string
+               Params []FnParamKind
+               Pos    int
+               Exists bool
+       }{
+               {
+                       Name:   "pane input",
+                       Params: []FnParamKind{FnContext, FnPane},
+                       Pos:    1,
+                       Exists: true,
+               },
+               {
+                       Name:   "no pane input",
+                       Params: []FnParamKind{FnContext, FnEventTime},
+                       Pos:    -1,
+                       Exists: false,
+               },
+       }
+
+       for _, test := range tests {
+               test := test
+               t.Run(test.Name, func(t *testing.T) {
+                       // Create a Fn with a filled params list.
+                       params := make([]FnParam, len(test.Params))
+                       for i, kind := range test.Params {
+                               params[i].Kind = kind
+                               params[i].T = nil
+                       }
+                       fn := &Fn{Param: params}
+
+                       // Validate we get expected results for pane function.
+                       pos, exists := fn.Pane()
+                       if exists != test.Exists {
+                               t.Errorf("Pane(%v) - exists: got %v, want %v", 
params, exists, test.Exists)
+                       }
+                       if pos != test.Pos {
+                               t.Errorf("Pane(%v) - pos: got %v, want %v", 
params, pos, test.Pos)
+                       }
+               })
+       }
+}
+
+func TestWindow(t *testing.T) {
+       tests := []struct {
+               Name   string
+               Params []FnParamKind
+               Pos    int
+               Exists bool
+       }{
+               {
+                       Name:   "window input",
+                       Params: []FnParamKind{FnContext, FnWindow},
+                       Pos:    1,
+                       Exists: true,
+               },
+               {
+                       Name:   "no window input",
+                       Params: []FnParamKind{FnContext, FnEventTime},
+                       Pos:    -1,
+                       Exists: false,
+               },
+       }
+
+       for _, test := range tests {
+               test := test
+               t.Run(test.Name, func(t *testing.T) {
+                       // Create a Fn with a filled params list.
+                       params := make([]FnParam, len(test.Params))
+                       for i, kind := range test.Params {
+                               params[i].Kind = kind
+                               params[i].T = nil
+                       }
+                       fn := &Fn{Param: params}
+
+                       // Validate we get expected results for pane function.
+                       pos, exists := fn.Window()
+                       if exists != test.Exists {
+                               t.Errorf("Window(%v) - exists: got %v, want 
%v", params, exists, test.Exists)
+                       }
+                       if pos != test.Pos {
+                               t.Errorf("Window(%v) - pos: got %v, want %v", 
params, pos, test.Pos)
+                       }
+               })
+       }
+}
+
 func TestInputs(t *testing.T) {
        tests := []struct {
                Name   string
@@ -352,8 +449,7 @@ func TestInputs(t *testing.T) {
                                params[i].Kind = kind
                                params[i].T = nil
                        }
-                       fn := new(Fn)
-                       fn.Param = params
+                       fn := &Fn{Param: params}
 
                        // Validate we get expected results for Inputs function.
                        pos, num, exists := fn.Inputs()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/decode.go 
b/sdks/go/pkg/beam/core/runtime/exec/decode.go
index a14713c..90d0c95 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/decode.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/decode.go
@@ -22,7 +22,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
 
-// Decoder is a uniform custom encoder interface. It wraps various
+// Decoder is a uniform custom decoder interface. It wraps various
 // forms of reflectx.Funcs.
 type Decoder interface {
        // Decode decodes the []byte in to a value of the given type.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 064d483..456edeb 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -41,12 +41,12 @@ type MainInput struct {
 
 // Invoke invokes the fn with the given values. The extra values must match 
the non-main
 // side input and emitters. It returns the direct output, if any.
-func Invoke(ctx context.Context, ws []typex.Window, ts typex.EventTime, fn 
*funcx.Fn, opt *MainInput, extra ...interface{}) (*FullValue, error) {
+func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, fn *funcx.Fn, opt *MainInput, extra ...interface{}) 
(*FullValue, error) {
        if fn == nil {
                return nil, nil // ok: nothing to Invoke
        }
        inv := newInvoker(fn)
-       return inv.Invoke(ctx, ws, ts, opt, extra...)
+       return inv.Invoke(ctx, pn, ws, ts, opt, extra...)
 }
 
 // InvokeWithoutEventTime runs the given function at time 0 in the global 
window.
@@ -64,13 +64,13 @@ type invoker struct {
        fn   *funcx.Fn
        args []interface{}
        // TODO(lostluck):  2018/07/06 consider replacing with a slice of 
functions to run over the args slice, as an improvement.
-       ctxIdx, wndIdx, etIdx int   // specialized input indexes
-       outEtIdx, outErrIdx   int   // specialized output indexes
-       in, out               []int // general indexes
+       ctxIdx, pnIdx, wndIdx, etIdx int   // specialized input indexes
+       outEtIdx, outErrIdx          int   // specialized output indexes
+       in, out                      []int // general indexes
 
        ret                     FullValue                     // ret is a 
cached allocation for passing to the next Unit. Units never modify the passed 
in FullValue.
        elmConvert, elm2Convert func(interface{}) interface{} // Cached 
conversion functions, which assums this invoker is always used with the same 
parameter types.
-       call                    func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error)
+       call                    func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error)
 }
 
 func newInvoker(fn *funcx.Fn) *invoker {
@@ -84,6 +84,9 @@ func newInvoker(fn *funcx.Fn) *invoker {
        if n.ctxIdx, ok = fn.Context(); !ok {
                n.ctxIdx = -1
        }
+       if n.pnIdx, ok = fn.Pane(); !ok {
+               n.pnIdx = -1
+       }
        if n.wndIdx, ok = fn.Window(); !ok {
                n.wndIdx = -1
        }
@@ -113,12 +116,12 @@ func (n *invoker) Reset() {
 
 // InvokeWithoutEventTime runs the function at time 0 in the global window.
 func (n *invoker) InvokeWithoutEventTime(ctx context.Context, opt *MainInput, 
extra ...interface{}) (*FullValue, error) {
-       return n.Invoke(ctx, window.SingleGlobalWindow, mtime.ZeroTimestamp, 
opt, extra...)
+       return n.Invoke(ctx, typex.NoFiringPane(), window.SingleGlobalWindow, 
mtime.ZeroTimestamp, opt, extra...)
 }
 
 // Invoke invokes the fn with the given values. The extra values must match 
the non-main
 // side input and emitters. It returns the direct output, if any.
-func (n *invoker) Invoke(ctx context.Context, ws []typex.Window, ts 
typex.EventTime, opt *MainInput, extra ...interface{}) (*FullValue, error) {
+func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime, opt *MainInput, extra ...interface{}) 
(*FullValue, error) {
        // (1) Populate contexts
        // extract these to make things easier to read.
        args := n.args
@@ -128,6 +131,9 @@ func (n *invoker) Invoke(ctx context.Context, ws 
[]typex.Window, ts typex.EventT
        if n.ctxIdx >= 0 {
                args[n.ctxIdx] = ctx
        }
+       if n.pnIdx >= 0 {
+               args[n.pnIdx] = pn
+       }
        if n.wndIdx >= 0 {
                if len(ws) != 1 {
                        return nil, errors.Errorf("DoFns that observe windows 
must be invoked with single window: %v", opt.Key.Windows)
@@ -183,12 +189,12 @@ func (n *invoker) Invoke(ctx context.Context, ws 
[]typex.Window, ts typex.EventT
        }
 
        // (4) Invoke
-       return n.call(ws, ts)
+       return n.call(pn, ws, ts)
 }
 
 // ret1 handles processing of a single return value.
 // Errors or single values are the only options.
-func (n *invoker) ret1(ws []typex.Window, ts typex.EventTime, r0 interface{}) 
(*FullValue, error) {
+func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0 interface{}) (*FullValue, error) {
        switch {
        case n.outErrIdx >= 0:
                if r0 != nil {
@@ -198,44 +204,44 @@ func (n *invoker) ret1(ws []typex.Window, ts 
typex.EventTime, r0 interface{}) (*
        case n.outEtIdx >= 0:
                panic("invoker.ret1: cannot return event time without a value")
        default:
-               n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0}
+               n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
                return &n.ret, nil
        }
 }
 
 // ret2 handles processing of a pair of return values.
-func (n *invoker) ret2(ws []typex.Window, ts typex.EventTime, r0, r1 
interface{}) (*FullValue, error) {
+func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0, r1 interface{}) (*FullValue, error) {
        switch {
        case n.outErrIdx >= 0:
                if r1 != nil {
                        return nil, r1.(error)
                }
-               n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0}
+               n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
                return &n.ret, nil
        case n.outEtIdx == 0:
-               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1}
+               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Pane: pn}
                return &n.ret, nil
        default:
-               n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1}
+               n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: 
r1, Pane: pn}
                return &n.ret, nil
        }
 }
 
 // ret3 handles processing of a trio of return values.
-func (n *invoker) ret3(ws []typex.Window, ts typex.EventTime, r0, r1, r2 
interface{}) (*FullValue, error) {
+func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
        switch {
        case n.outErrIdx >= 0:
                if r2 != nil {
                        return nil, r2.(error)
                }
                if n.outEtIdx < 0 {
-                       n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, 
Elm2: r1}
+                       n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, 
Elm2: r1, Pane: pn}
                        return &n.ret, nil
                }
-               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1}
+               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Pane: pn}
                return &n.ret, nil
        case n.outEtIdx == 0:
-               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Elm2: r2}
+               n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), 
Elm: r1, Elm2: r2, Pane: pn}
                return &n.ret, nil
        default:
                panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match 
permitted return values.", r0, r1, r2))
@@ -243,11 +249,11 @@ func (n *invoker) ret3(ws []typex.Window, ts 
typex.EventTime, r0, r1, r2 interfa
 }
 
 // ret4 handles processing of a quad of return values.
-func (n *invoker) ret4(ws []typex.Window, ts typex.EventTime, r0, r1, r2, r3 
interface{}) (*FullValue, error) {
+func (n *invoker) ret4(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, r0, r1, r2, r3 interface{}) (*FullValue, error) {
        if r3 != nil {
                return nil, r3.(error)
        }
-       n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: 
r1, Elm2: r2}
+       n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: 
r1, Elm2: r2, Pane: pn}
        return &n.ret, nil
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
index 4e9fdf5..960edff 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
@@ -33,199 +33,199 @@ func (n *invoker) initCall() {
        switch fn := n.fn.Fn.(type) {
 
        case reflectx.Func0x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call0x0()
                        return nil, nil
                }
 
        case reflectx.Func1x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call1x0(n.args[0])
                        return nil, nil
                }
 
        case reflectx.Func2x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call2x0(n.args[0], n.args[1])
                        return nil, nil
                }
 
        case reflectx.Func3x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call3x0(n.args[0], n.args[1], n.args[2])
                        return nil, nil
                }
 
        case reflectx.Func4x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call4x0(n.args[0], n.args[1], n.args[2], n.args[3])
                        return nil, nil
                }
 
        case reflectx.Func5x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call5x0(n.args[0], n.args[1], n.args[2], n.args[3], 
n.args[4])
                        return nil, nil
                }
 
        case reflectx.Func6x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call6x0(n.args[0], n.args[1], n.args[2], n.args[3], 
n.args[4], n.args[5])
                        return nil, nil
                }
 
        case reflectx.Func7x0:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        fn.Call7x0(n.args[0], n.args[1], n.args[2], n.args[3], 
n.args[4], n.args[5], n.args[6])
                        return nil, nil
                }
 
        case reflectx.Func0x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call0x1()
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func1x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call1x1(n.args[0])
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func2x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call2x1(n.args[0], n.args[1])
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func3x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call3x1(n.args[0], n.args[1], n.args[2])
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func4x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call4x1(n.args[0], n.args[1], n.args[2], 
n.args[3])
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func5x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call5x1(n.args[0], n.args[1], n.args[2], 
n.args[3], n.args[4])
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func6x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call6x1(n.args[0], n.args[1], n.args[2], 
n.args[3], n.args[4], n.args[5])
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func7x1:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0 := fn.Call7x1(n.args[0], n.args[1], n.args[2], 
n.args[3], n.args[4], n.args[5], n.args[6])
-                       return n.ret1(ws, ts, r0)
+                       return n.ret1(pn, ws, ts, r0)
                }
 
        case reflectx.Func0x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call0x2()
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func1x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call1x2(n.args[0])
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func2x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call2x2(n.args[0], n.args[1])
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func3x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call3x2(n.args[0], n.args[1], n.args[2])
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func4x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call4x2(n.args[0], n.args[1], n.args[2], 
n.args[3])
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func5x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call5x2(n.args[0], n.args[1], n.args[2], 
n.args[3], n.args[4])
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func6x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call6x2(n.args[0], n.args[1], n.args[2], 
n.args[3], n.args[4], n.args[5])
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func7x2:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1 := fn.Call7x2(n.args[0], n.args[1], n.args[2], 
n.args[3], n.args[4], n.args[5], n.args[6])
-                       return n.ret2(ws, ts, r0, r1)
+                       return n.ret2(pn, ws, ts, r0, r1)
                }
 
        case reflectx.Func0x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call0x3()
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        case reflectx.Func1x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call1x3(n.args[0])
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        case reflectx.Func2x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call2x3(n.args[0], n.args[1])
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        case reflectx.Func3x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call3x3(n.args[0], n.args[1], 
n.args[2])
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        case reflectx.Func4x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call4x3(n.args[0], n.args[1], 
n.args[2], n.args[3])
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        case reflectx.Func5x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call5x3(n.args[0], n.args[1], 
n.args[2], n.args[3], n.args[4])
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        case reflectx.Func6x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call6x3(n.args[0], n.args[1], 
n.args[2], n.args[3], n.args[4], n.args[5])
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        case reflectx.Func7x3:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        r0, r1, r2 := fn.Call7x3(n.args[0], n.args[1], 
n.args[2], n.args[3], n.args[4], n.args[5], n.args[6])
-                       return n.ret3(ws, ts, r0, r1, r2)
+                       return n.ret3(pn, ws, ts, r0, r1, r2)
                }
 
        default:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        ret := n.fn.Fn.Call(n.args)
                        if n.outErrIdx >= 0 && ret[n.outErrIdx] != nil {
                                return nil, ret[n.outErrIdx].(error)
@@ -237,13 +237,13 @@ func (n *invoker) initCall() {
                        case 0:
                                return nil, nil
                        case 1:
-                               return n.ret1(ws, ts, ret[0])
+                               return n.ret1(pn, ws, ts, ret[0])
                        case 2:
-                               return n.ret2(ws, ts, ret[0], ret[1])
+                               return n.ret2(pn, ws, ts, ret[0], ret[1])
                        case 3:
-                               return n.ret3(ws, ts, ret[0], ret[1], ret[2])
+                               return n.ret3(pn, ws, ts, ret[0], ret[1], 
ret[2])
                        case 4:
-                               return n.ret4(ws, ts, ret[0], ret[1], ret[2], 
ret[3])
+                               return n.ret4(pn, ws, ts, ret[0], ret[1], 
ret[2], ret[3])
                        }
                        panic(fmt.Sprintf("invoker: %v has > 4 return values, 
which is not permitted", n.fn.Fn.Name()))
                }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl 
b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
index 2f5a25d..29bee7f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
@@ -32,10 +32,10 @@ func (n *invoker) initCall() {
 {{range $out := upto 4}}
 {{range $in := upto 8}}
        case reflectx.Func{{$in}}x{{$out}}:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        {{if $out}}{{mktuplef $out "r%v"}} := 
{{end}}fn.Call{{$in}}x{{$out}}({{mktuplef $in "n.args[%v]"}})
                        {{- if $out}}
-                       return n.ret{{$out}}(ws, ts, {{mktuplef $out "r%v"}})
+                       return n.ret{{$out}}(pn, ws, ts, {{mktuplef $out 
"r%v"}})
                        {{- else}}
                        return nil, nil
                        {{- end}}
@@ -43,7 +43,7 @@ func (n *invoker) initCall() {
 {{end}}
 {{end}}
        default:
-               n.call = func(ws []typex.Window, ts typex.EventTime) 
(*FullValue, error) {
+               n.call = func(pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime) (*FullValue, error) {
                        ret := n.fn.Fn.Call(n.args)
                        if n.outErrIdx >= 0 && ret[n.outErrIdx] != nil {
                                return nil, ret[n.outErrIdx].(error)
@@ -55,13 +55,13 @@ func (n *invoker) initCall() {
                        case 0:
                                return nil, nil
                        case 1:
-                               return n.ret1(ws, ts, ret[0])
+                               return n.ret1(pn, ws, ts, ret[0])
                        case 2:
-                               return n.ret2(ws, ts, ret[0], ret[1])
+                               return n.ret2(pn, ws, ts, ret[0], ret[1])
                        case 3:
-                               return n.ret3(ws, ts, ret[0], ret[1], ret[2])
+                               return n.ret3(pn, ws, ts, ret[0], ret[1], 
ret[2])
                        case 4:
-                               return n.ret4(ws, ts, ret[0], ret[1], ret[2], 
ret[3])
+                               return n.ret4(pn, ws, ts, ret[0], ret[1], 
ret[2], ret[3])
                        }
                        panic(fmt.Sprintf("invoker: %v has > 4 return values, 
which is not permitted", n.fn.Fn.Name()))
                } 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
index ba5e6d1..b7ab501 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
@@ -178,7 +178,7 @@ func TestInvoke(t *testing.T) {
                                test.ExpectedTime = ts
                        }
 
-                       val, err := Invoke(context.Background(), 
window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
+                       val, err := Invoke(context.Background(), 
typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
                        if err != nil {
                                t.Fatalf("Invoke(%v,%v) failed: %v", 
fn.Fn.Name(), test.Args, err)
                        }
@@ -314,7 +314,7 @@ func BenchmarkInvoke(b *testing.B) {
                ts := mtime.ZeroTimestamp.Add(2 * time.Millisecond)
                b.Run(fmt.Sprintf("SingleInvoker_%s", test.Name), func(b 
*testing.B) {
                        for i := 0; i < b.N; i++ {
-                               _, err := Invoke(context.Background(), 
window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
+                               _, err := Invoke(context.Background(), 
typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, test.Args...)
                                if err != nil {
                                        b.Fatalf("Invoke(%v,%v) failed: %v", 
fn.Fn.Name(), test.Args, err)
                                }
@@ -323,7 +323,7 @@ func BenchmarkInvoke(b *testing.B) {
                b.Run(fmt.Sprintf("CachedInvoker_%s", test.Name), func(b 
*testing.B) {
                        inv := newInvoker(fn)
                        for i := 0; i < b.N; i++ {
-                               _, err := inv.Invoke(context.Background(), 
window.SingleGlobalWindow, ts, test.Opt, test.Args...)
+                               _, err := inv.Invoke(context.Background(), 
typex.NoFiringPane(), window.SingleGlobalWindow, ts, test.Opt, test.Args...)
                                if err != nil {
                                        b.Fatalf("Invoke(%v,%v) failed: %v", 
fn.Fn.Name(), test.Args, err)
                                }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 7518ee8..acd0867 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -115,7 +115,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, 
data DataContext) er
 
        // TODO(BEAM-3303): what to set for StartBundle/FinishBundle window and 
emitter timestamp?
 
-       if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow, 
mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err != nil {
+       if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err 
!= nil {
                return n.fail(err)
        }
        return nil
@@ -147,7 +147,7 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
        } else {
                for _, w := range elm.Windows {
                        elm := &mainIn.Key
-                       wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2, 
Timestamp: elm.Timestamp, Windows: []typex.Window{w}}
+                       wElm := FullValue{Elm: elm.Elm, Elm2: elm.Elm2, 
Timestamp: elm.Timestamp, Windows: []typex.Window{w}, Pane: elm.Pane}
                        err := n.processSingleWindow(&MainInput{Key: wElm, 
Values: mainIn.Values, RTracker: mainIn.RTracker})
                        if err != nil {
                                return n.fail(err)
@@ -163,7 +163,7 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
 // each individual window by exploding the windows first.
 func (n *ParDo) processSingleWindow(mainIn *MainInput) error {
        elm := &mainIn.Key
-       val, err := n.invokeProcessFn(n.ctx, elm.Windows, elm.Timestamp, mainIn)
+       val, err := n.invokeProcessFn(n.ctx, elm.Pane, elm.Windows, 
elm.Timestamp, mainIn)
        if err != nil {
                return n.fail(err)
        }
@@ -208,7 +208,7 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
 
        n.states.Set(n.ctx, metrics.FinishBundle)
 
-       if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow, 
mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
+       if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); 
err != nil {
                return n.fail(err)
        }
        n.reader = nil
@@ -282,7 +282,7 @@ func (n *ParDo) initSideInput(ctx context.Context, w 
typex.Window) error {
 }
 
 // invokeDataFn handle non-per element invocations.
-func (n *ParDo) invokeDataFn(ctx context.Context, ws []typex.Window, ts 
typex.EventTime, fn *funcx.Fn, opt *MainInput) (val *FullValue, err error) {
+func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput) (val 
*FullValue, err error) {
        if fn == nil {
                return nil, nil
        }
@@ -295,7 +295,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, ws 
[]typex.Window, ts typex.Ev
        if err := n.preInvoke(ctx, ws, ts); err != nil {
                return nil, err
        }
-       val, err = Invoke(ctx, ws, ts, fn, opt, n.cache.extra...)
+       val, err = Invoke(ctx, pn, ws, ts, fn, opt, n.cache.extra...)
        if err != nil {
                return nil, err
        }
@@ -303,7 +303,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, ws 
[]typex.Window, ts typex.Ev
 }
 
 // invokeProcessFn handles the per element invocations
-func (n *ParDo) invokeProcessFn(ctx context.Context, ws []typex.Window, ts 
typex.EventTime, opt *MainInput) (val *FullValue, err error) {
+func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws 
[]typex.Window, ts typex.EventTime, opt *MainInput) (val *FullValue, err error) 
{
        // Defer side input clean-up in case of panic
        defer func() {
                if postErr := n.postInvoke(); postErr != nil {
@@ -313,7 +313,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, ws 
[]typex.Window, ts typex
        if err := n.preInvoke(ctx, ws, ts); err != nil {
                return nil, err
        }
-       val, err = n.inv.Invoke(ctx, ws, ts, opt, n.cache.extra...)
+       val, err = n.inv.Invoke(ctx, pn, ws, ts, opt, n.cache.extra...)
        if err != nil {
                return nil, err
        }
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index 0355048..f55a199 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -65,8 +65,9 @@ var directFilters = []string{
        "TestXLang.*",
        "TestKafkaIO.*",
        "TestJDBCIO_BasicReadWrite",
-       // Triggers are not yet supported
+       // Triggers, Panes are not yet supported
        "TestTrigger.*",
+       "TestPanes",
        // The direct runner does not support the TestStream primitive
        "TestTestStream.*",
        // (BEAM-13075): The direct runner does not support windowed side inputs
@@ -79,8 +80,9 @@ var directFilters = []string{
 var portableFilters = []string{
        // The portable runner does not support the TestStream primitive
        "TestTestStream.*",
-       // The trigger tests uses TestStream
+       // The trigger and pane tests uses TestStream
        "TestTrigger.*",
+       "TestPanes",
        // TODO(BEAM-12797): Python portable runner times out on Kafka reads.
        "TestKafkaIO.*",
        // TODO(BEAM-13778) needs a schemaio expansion service address flag
@@ -106,8 +108,9 @@ var samzaFilters = []string{
        "TestReshuffleKV",
        // The Samza runner does not support the TestStream primitive
        "TestTestStream.*",
-       // The trigger tests uses TestStream
+       // The trigger and pane tests uses TestStream
        "TestTrigger.*",
+       "TestPanes",
        // TODO(BEAM-13006): Samza doesn't yet support post job metrics, used 
by WordCount
        "TestWordCount.*",
        // TODO(BEAM-13778) needs a schemaio expansion service address flag
@@ -121,8 +124,9 @@ var sparkFilters = []string{
        "TestParDoKVSideInput",
        // The Spark runner does not support the TestStream primitive
        "TestTestStream.*",
-       // The trigger tests uses TestStream
+       // The trigger and pane tests uses TestStream
        "TestTrigger.*",
+       "TestPanes",
        // TODO(BEAM-13778) needs a schemaio expansion service address flag
        "TestJDBCIO_BasicReadWrite",
 }
@@ -134,8 +138,9 @@ var dataflowFilters = []string{
        "TestFlattenDup",
        // The Dataflow runner does not support the TestStream primitive
        "TestTestStream.*",
-       // The trigger tests uses TestStream
+       // The trigger and pane tests uses TestStream
        "TestTrigger.*",
+       "TestPanes",
        // There is no infrastructure for running KafkaIO tests with Dataflow.
        "TestKafkaIO.*",
        // Dataflow doesn't support any test that requires loopback.
diff --git a/sdks/go/test/integration/primitives/window_panes.go 
b/sdks/go/test/integration/primitives/window_panes.go
new file mode 100644
index 0000000..b22a2af
--- /dev/null
+++ b/sdks/go/test/integration/primitives/window_panes.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
+)
+
+func init() {
+       beam.RegisterFunction(PanesFn)
+}
+
+func PanesFn(pn typex.PaneInfo, value float64, emit func(int)) {
+       emit(int(pn.Timing))
+}
+
+func Panes(s beam.Scope) {
+       s.Scope("increment")
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+
+       windowed := beam.WindowInto(s, window.NewFixedWindows(windowSize), col, 
[]beam.WindowIntoOption{
+               beam.Trigger(trigger.Always()),
+       }...)
+       sums := beam.ParDo(s, PanesFn, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Count(s, sums, "number of firings", 3)
+}
diff --git a/sdks/go/test/integration/primitives/window_panes_test.go 
b/sdks/go/test/integration/primitives/window_panes_test.go
new file mode 100644
index 0000000..6a941ac
--- /dev/null
+++ b/sdks/go/test/integration/primitives/window_panes_test.go
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+func TestPanes(t *testing.T) {
+       integration.CheckFilters(t)
+       p, s := beam.NewPipelineWithRoot()
+       Panes(s)
+       ptest.RunAndValidate(t, p)
+}

Reply via email to