This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b0e9f26  [BEAM-3304] Go triggering support (#15239)
b0e9f26 is described below

commit b0e9f2638cbaca822ee6a58bd8ec0e61db8e799e
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Thu Aug 19 16:55:34 2021 -0400

    [BEAM-3304] Go triggering support (#15239)
---
 sdks/go/pkg/beam/core/graph/coder/coder.go         |   6 +
 sdks/go/pkg/beam/core/graph/coder/panes.go         | 116 ++++++++++++++++++
 sdks/go/pkg/beam/core/graph/edge.go                |   6 +-
 sdks/go/pkg/beam/core/graph/window/strategy.go     |  18 ++-
 sdks/go/pkg/beam/core/graph/window/trigger.go      |  37 ++++++
 sdks/go/pkg/beam/core/runtime/exec/coder.go        |  62 +++++++---
 sdks/go/pkg/beam/core/runtime/exec/datasink.go     |   2 +-
 sdks/go/pkg/beam/core/runtime/exec/datasource.go   |   5 +-
 .../pkg/beam/core/runtime/exec/datasource_test.go  |  11 +-
 .../go/pkg/beam/core/runtime/exec/dynsplit_test.go |   5 +-
 sdks/go/pkg/beam/core/runtime/exec/fullvalue.go    |   5 +-
 sdks/go/pkg/beam/core/runtime/exec/reshuffle.go    |   5 +-
 sdks/go/pkg/beam/core/runtime/exec/window.go       |   1 +
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  | 129 +++++++++++++++++++--
 sdks/go/pkg/beam/core/typex/class.go               |   1 +
 sdks/go/pkg/beam/core/typex/fulltype.go            |   6 +
 sdks/go/pkg/beam/core/typex/special.go             |  16 +++
 sdks/go/pkg/beam/pcollection.go                    |   1 -
 .../beam/runners/dataflow/dataflowlib/translate.go |   3 +-
 sdks/go/pkg/beam/windowing.go                      |  37 +++++-
 sdks/go/test/integration/integration.go            |  12 ++
 sdks/go/test/integration/primitives/windowinto.go  |  91 ++++++++++++++-
 .../test/integration/primitives/windowinto_test.go |  28 +++++
 23 files changed, 554 insertions(+), 49 deletions(-)

diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go 
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index ebc26a2..6eea66b 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -170,6 +170,7 @@ const (
        Double             Kind = "double"
        Row                Kind = "R"
        Timer              Kind = "T"
+       PaneInfo           Kind = "PI"
        WindowedValue      Kind = "W"
        ParamWindowedValue Kind = "PW"
        Iterable           Kind = "I"
@@ -297,6 +298,11 @@ func IsW(c *Coder) bool {
        return c.Kind == WindowedValue
 }
 
+// NewPI returns a PaneInfo coder
+func NewPI() *Coder {
+       return &Coder{Kind: PaneInfo, T: typex.New(typex.PaneInfoType)}
+}
+
 // NewW returns a WindowedValue coder for the window of elements.
 func NewW(c *Coder, w *WindowCoder) *Coder {
        if c == nil {
diff --git a/sdks/go/pkg/beam/core/graph/coder/panes.go 
b/sdks/go/pkg/beam/core/graph/coder/panes.go
new file mode 100644
index 0000000..3ccd987
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/panes.go
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+       // Encoding: typex.PaneInfo
+
+       pane := byte(0)
+       if v.IsFirst {
+               pane |= 0x01
+       }
+       if v.IsLast {
+               pane |= 0x02
+       }
+       pane |= byte(v.Timing << 2)
+
+       switch {
+       case v.Index == 0 || v.NonSpeculativeIndex == 0 || v.Timing == 
typex.PaneUnknown:
+               // The entire pane info is encoded as a single byte
+               paneByte := []byte{pane}
+               w.Write(paneByte)
+       case v.Index == v.NonSpeculativeIndex || v.Timing == typex.PaneEarly:
+               // The pane info is encoded as this byte plus a single VarInt 
encoded integer
+               paneByte := []byte{pane | 1<<4}
+               w.Write(paneByte)
+               EncodeVarInt(v.Index, w)
+       default:
+               // The pane info is encoded as this byte plus two VarInt 
encoded integer
+               paneByte := []byte{pane | 2<<4}
+               w.Write(paneByte)
+               EncodeVarInt(v.Index, w)
+               EncodeVarInt(v.NonSpeculativeIndex, w)
+       }
+       return nil
+}
+
+// NewPane initializes the PaneInfo from a given byte.
+// By default, PaneInfo is assigned to NoFiringPane.
+func NewPane(b byte) typex.PaneInfo {
+       pn := typex.NoFiringPane()
+
+       if b&0x01 == 1 {
+               pn.IsFirst = true
+       }
+       if b&0x02 == 2 {
+               pn.IsLast = true
+       }
+
+       pn.Timing = typex.PaneTiming((b >> 2) & 0x03)
+       return pn
+}
+
+// DecodePane decodes a single byte.
+func DecodePane(r io.Reader) (typex.PaneInfo, error) {
+       // Decoding: typex.PaneInfo
+
+       var data [1]byte
+       if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil {
+               return typex.PaneInfo{}, err
+       }
+
+       pn := NewPane(data[0] & 0x0f)
+
+       switch data[0] >> 4 {
+       case 0:
+               // Result encoded in only one pane.
+               return pn, nil
+       case 1:
+               // Result encoded in one pane plus a VarInt encoded integer.
+               data, err := DecodeVarInt(r)
+               if err != nil {
+                       return typex.PaneInfo{}, err
+               }
+
+               pn.Index = data
+               if pn.Timing == typex.PaneEarly {
+                       pn.NonSpeculativeIndex = -1
+               } else {
+                       pn.NonSpeculativeIndex = pn.Index
+               }
+       case 2:
+               // Result encoded in one pane plus two VarInt encoded integer.
+               data, err := DecodeVarInt(r)
+               if err != nil {
+                       return typex.PaneInfo{}, err
+               }
+
+               pn.Index = data
+               pn.NonSpeculativeIndex, err = DecodeVarInt(r)
+               if err != nil {
+                       return typex.PaneInfo{}, err
+               }
+       }
+       return pn, nil
+}
diff --git a/sdks/go/pkg/beam/core/graph/edge.go 
b/sdks/go/pkg/beam/core/graph/edge.go
index 3f1933f..e1f0d0b 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -533,13 +533,13 @@ func NewImpulse(g *Graph, s *Scope, value []byte) 
*MultiEdge {
 }
 
 // NewWindowInto inserts a new WindowInto edge into the graph.
-func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge {
-       n := g.NewNode(in.Type(), &window.WindowingStrategy{Fn: wfn}, 
in.Bounded())
+func NewWindowInto(g *Graph, s *Scope, ws *window.WindowingStrategy, in *Node) 
*MultiEdge {
+       n := g.NewNode(in.Type(), ws, in.Bounded())
        n.Coder = in.Coder
 
        edge := g.NewEdge(s)
        edge.Op = WindowInto
-       edge.WindowFn = wfn
+       edge.WindowFn = ws.Fn
        edge.Input = []*Inbound{{Kind: Main, From: in, Type: in.Type()}}
        edge.Output = []*Outbound{{To: n, Type: in.Type()}}
        return edge
diff --git a/sdks/go/pkg/beam/core/graph/window/strategy.go 
b/sdks/go/pkg/beam/core/graph/window/strategy.go
index 09609d6..0dca6ab 100644
--- a/sdks/go/pkg/beam/core/graph/window/strategy.go
+++ b/sdks/go/pkg/beam/core/graph/window/strategy.go
@@ -16,12 +16,22 @@
 // Package window contains window representation, windowing strategies and 
utilities.
 package window
 
+type AccumulationMode string
+
+const (
+       Unspecified  AccumulationMode = "AccumulationMode_UNSPECIFIED"
+       Discarding   AccumulationMode = "AccumulationMode_DISCARDING"
+       Accumulating AccumulationMode = "AccumulationMode_ACCUMULATING"
+       Retracting   AccumulationMode = "AccumulationMode_RETRACTING"
+)
+
 // WindowingStrategy defines the types of windowing used in a pipeline and 
contains
 // the data to support executing a windowing strategy.
 type WindowingStrategy struct {
-       Fn *Fn
-
-       // TODO(BEAM-3304): trigger support
+       Fn               *Fn
+       Trigger          Trigger
+       AccumulationMode AccumulationMode
+       AllowedLateness  int // in milliseconds
 }
 
 func (ws *WindowingStrategy) Equals(o *WindowingStrategy) bool {
@@ -34,5 +44,5 @@ func (ws *WindowingStrategy) String() string {
 
 // DefaultWindowingStrategy returns the default windowing strategy.
 func DefaultWindowingStrategy() *WindowingStrategy {
-       return &WindowingStrategy{Fn: NewGlobalWindows()}
+       return &WindowingStrategy{Fn: NewGlobalWindows(), Trigger: 
Trigger{Kind: DefaultTrigger}, AccumulationMode: Discarding, AllowedLateness: 0}
 }
diff --git a/sdks/go/pkg/beam/core/graph/window/trigger.go 
b/sdks/go/pkg/beam/core/graph/window/trigger.go
new file mode 100644
index 0000000..8368310
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/window/trigger.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+type Trigger struct {
+       Kind         string
+       SubTriggers  []Trigger
+       Delay        int64 // in milliseconds
+       ElementCount int32
+}
+
+const (
+       DefaultTrigger                         string = "Trigger_Default_"
+       AlwaysTrigger                          string = "Trigger_Always_"
+       AfterAnyTrigger                        string = "Trigger_AfterAny_"
+       AfterAllTrigger                        string = "Trigger_AfterAll_"
+       AfterProcessingTimeTrigger             string = 
"Trigger_AfterProcessing_Time_"
+       ElementCountTrigger                    string = "Trigger_ElementCount_"
+       AfterEndOfWindowTrigger                string = 
"Trigger_AfterEndOfWindow_"
+       RepeatTrigger                          string = "Trigger_Repeat_"
+       OrFinallyTrigger                       string = "Trigger_OrFinally_"
+       NeverTrigger                           string = "Trigger_Never_"
+       AfterSynchronizedProcessingTimeTrigger string = 
"Trigger_AfterSynchronizedProcessingTime_"
+)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 6156f23..c7a19ea 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -122,6 +122,9 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder {
                        enc: MakeWindowEncoder(c.Window),
                }
 
+       case coder.PaneInfo:
+               return &paneEncoder{}
+
        case coder.Iterable:
                return &iterableEncoder{
                        enc: MakeElementEncoder(c.Components[0]),
@@ -228,6 +231,9 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder {
                        dec: MakeWindowDecoder(c.Window),
                }
 
+       case coder.PaneInfo:
+               return &paneDecoder{}
+
        // Note: Iterables in CoGBK are handled in datasource.go instead.
        case coder.Iterable:
                return &iterableDecoder{
@@ -749,10 +755,11 @@ func (c *arrayDecoder) Decode(r io.Reader) (*FullValue, 
error) {
 type windowedValueEncoder struct {
        elm ElementEncoder
        win WindowEncoder
+       // need to add pane encoder here
 }
 
 func (e *windowedValueEncoder) Encode(val *FullValue, w io.Writer) error {
-       if err := EncodeWindowedValueHeader(e.win, val.Windows, val.Timestamp, 
w); err != nil {
+       if err := EncodeWindowedValueHeader(e.win, val.Windows, val.Timestamp, 
val.Pane, w); err != nil {
                return err
        }
        return e.elm.Encode(val, w)
@@ -765,7 +772,7 @@ type windowedValueDecoder struct {
 
 func (d *windowedValueDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
        // Encoding: beam utf8 string (length prefix + run of bytes)
-       w, et, err := DecodeWindowedValueHeader(d.win, r)
+       w, et, pn, err := DecodeWindowedValueHeader(d.win, r)
        if err != nil {
                return err
        }
@@ -774,6 +781,7 @@ func (d *windowedValueDecoder) DecodeTo(r io.Reader, fv 
*FullValue) error {
        }
        fv.Windows = w
        fv.Timestamp = et
+       fv.Pane = pn
        return nil
 }
 
@@ -842,6 +850,31 @@ func (d *timerDecoder) Decode(r io.Reader) (*FullValue, 
error) {
        return fv, nil
 }
 
+type paneEncoder struct{}
+
+func (*paneEncoder) Encode(val *FullValue, w io.Writer) error {
+       return coder.EncodePane(val.Pane, w)
+}
+
+type paneDecoder struct{}
+
+func (*paneDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
+       data, err := coder.DecodePane(r)
+       if err != nil {
+               return err
+       }
+       *fv = FullValue{Pane: data}
+       return nil
+}
+
+func (d *paneDecoder) Decode(r io.Reader) (*FullValue, error) {
+       fv := &FullValue{}
+       if err := d.DecodeTo(r, fv); err != nil {
+               return nil, err
+       }
+       return fv, nil
+}
+
 type rowEncoder struct {
        enc func(interface{}, io.Writer) error
 }
@@ -1076,10 +1109,8 @@ func (*intervalWindowDecoder) DecodeSingle(r io.Reader) 
(typex.Window, error) {
        return window.IntervalWindow{Start: 
mtime.FromMilliseconds(end.Milliseconds() - int64(duration)), End: end}, nil
 }
 
-var paneNoFiring = []byte{0xf}
-
 // EncodeWindowedValueHeader serializes a windowed value header.
-func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t 
typex.EventTime, w io.Writer) error {
+func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t 
typex.EventTime, p typex.PaneInfo, w io.Writer) error {
        // Encoding: Timestamp, Window, Pane (header) + Element
 
        if err := coder.EncodeEventTime(t, w); err != nil {
@@ -1088,25 +1119,30 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws 
[]typex.Window, t typex.Eve
        if err := enc.Encode(ws, w); err != nil {
                return err
        }
-       _, err := w.Write(paneNoFiring)
+       err := coder.EncodePane(p, w)
        return err
 }
 
 // DecodeWindowedValueHeader deserializes a windowed value header.
-func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) 
([]typex.Window, typex.EventTime, error) {
+func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) 
([]typex.Window, typex.EventTime, typex.PaneInfo, error) {
        // Encoding: Timestamp, Window, Pane (header) + Element
 
+       onError := func(err error) ([]typex.Window, typex.EventTime, 
typex.PaneInfo, error) {
+               return nil, mtime.ZeroTimestamp, typex.NoFiringPane(), err
+       }
+
        t, err := coder.DecodeEventTime(r)
        if err != nil {
-               return nil, mtime.ZeroTimestamp, err
+               return onError(err)
        }
        ws, err := dec.Decode(r)
        if err != nil {
-               return nil, mtime.ZeroTimestamp, err
+               return onError(err)
        }
-       var data [1]byte
-       if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { // NO_FIRING 
pane
-               return nil, mtime.ZeroTimestamp, err
+       pn, err := coder.DecodePane(r)
+       if err != nil {
+               return onError(err)
        }
-       return ws, t, nil
+
+       return ws, t, pn, nil
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
index 5eea030..473ac38 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
@@ -60,7 +60,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value 
*FullValue, values
        // unit.
        var b bytes.Buffer
 
-       if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, 
value.Timestamp, &b); err != nil {
+       if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, 
value.Timestamp, value.Pane, &b); err != nil {
                return err
        }
        if err := n.enc.Encode(value, &b); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index c75ca5b..2e8f25d 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -145,7 +145,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                        return nil
                }
                // TODO(lostluck) 2020/02/22: Should we include window headers 
or just count the element sizes?
-               ws, t, err := DecodeWindowedValueHeader(wc, r)
+               ws, t, pn, err := DecodeWindowedValueHeader(wc, r)
                if err != nil {
                        if err == io.EOF {
                                return nil
@@ -160,6 +160,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                }
                pe.Timestamp = t
                pe.Windows = ws
+               pe.Pane = pn
 
                var valReStreams []ReStream
                for _, cv := range cvs {
@@ -539,7 +540,7 @@ func splitHelper(
 
 func encodeElm(elm *FullValue, wc WindowEncoder, ec ElementEncoder) ([]byte, 
error) {
        var b bytes.Buffer
-       if err := EncodeWindowedValueHeader(wc, elm.Windows, elm.Timestamp, 
&b); err != nil {
+       if err := EncodeWindowedValueHeader(wc, elm.Windows, elm.Timestamp, 
elm.Pane, &b); err != nil {
                return nil, err
        }
        if err := ec.Encode(elm, &b); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 2de6e30..8591933 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 )
 
@@ -43,7 +44,7 @@ func TestDataSource_PerElement(t *testing.T) {
                                wc := MakeWindowEncoder(c.Window)
                                ec := MakeElementEncoder(coder.SkipW(c))
                                for _, v := range expected {
-                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, pw)
+                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), pw)
                                        ec.Encode(&FullValue{Elm: v}, pw)
                                }
                                pw.Close()
@@ -98,7 +99,7 @@ func TestDataSource_Iterators(t *testing.T) {
                        driver: func(c *coder.Coder, dmw io.WriteCloser, _ 
func() io.WriteCloser, ks, vs []interface{}) {
                                wc, kc, vc := extractCoders(c)
                                for _, k := range ks {
-                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, dmw)
+                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), dmw)
                                        kc.Encode(&FullValue{Elm: k}, dmw)
                                        coder.EncodeInt32(int32(len(vs)), dmw) 
// Number of elements.
                                        for _, v := range vs {
@@ -116,7 +117,7 @@ func TestDataSource_Iterators(t *testing.T) {
                        driver: func(c *coder.Coder, dmw io.WriteCloser, _ 
func() io.WriteCloser, ks, vs []interface{}) {
                                wc, kc, vc := extractCoders(c)
                                for _, k := range ks {
-                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, dmw)
+                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), dmw)
                                        kc.Encode(&FullValue{Elm: k}, dmw)
 
                                        coder.EncodeInt32(-1, dmw) // Mark this 
as a multi-Chunk (though beam runner proto says to use 0)
@@ -137,7 +138,7 @@ func TestDataSource_Iterators(t *testing.T) {
                        driver: func(c *coder.Coder, dmw io.WriteCloser, swFn 
func() io.WriteCloser, ks, vs []interface{}) {
                                wc, kc, vc := extractCoders(c)
                                for _, k := range ks {
-                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, dmw)
+                                       EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), dmw)
                                        kc.Encode(&FullValue{Elm: k}, dmw)
                                        coder.EncodeInt32(-1, dmw)  // Mark as 
multi-chunk (though beam, runner says to use 0)
                                        coder.EncodeVarInt(-1, dmw) // Mark 
subsequent chunks as "state backed"
@@ -235,7 +236,7 @@ func TestDataSource_Split(t *testing.T) {
                        wc := MakeWindowEncoder(c.Window)
                        ec := MakeElementEncoder(coder.SkipW(c))
                        for _, v := range elements {
-                               EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, pw)
+                               EncodeWindowedValueHeader(wc, 
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), pw)
                                ec.Encode(&FullValue{Elm: v}, pw)
                        }
                        pw.Close()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
index b057579..2c87a3b 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
@@ -279,7 +279,7 @@ func createSdfPlan(t *testing.T, name string, fn 
*graph.DoFn, cdr *coder.Coder)
 func writeElm(elm *FullValue, cdr *coder.Coder, pw *io.PipeWriter) {
        wc := MakeWindowEncoder(cdr.Window)
        ec := MakeElementEncoder(coder.SkipW(cdr))
-       if err := EncodeWindowedValueHeader(wc, window.SingleGlobalWindow, 
mtime.ZeroTimestamp, pw); err != nil {
+       if err := EncodeWindowedValueHeader(wc, window.SingleGlobalWindow, 
mtime.ZeroTimestamp, typex.NoFiringPane(), pw); err != nil {
                panic("err")
        }
        if err := ec.Encode(elm, pw); err != nil {
@@ -294,7 +294,7 @@ func decodeDynSplitElm(elm []byte, cdr *coder.Coder) 
(*FullValue, error) {
        wd := MakeWindowDecoder(cdr.Window)
        ed := MakeElementDecoder(coder.SkipW(cdr))
        b := bytes.NewBuffer(elm)
-       w, t, err := DecodeWindowedValueHeader(wd, b)
+       w, t, pn, err := DecodeWindowedValueHeader(wd, b)
        if err != nil {
                return nil, err
        }
@@ -304,6 +304,7 @@ func decodeDynSplitElm(elm []byte, cdr *coder.Coder) 
(*FullValue, error) {
        }
        e.Windows = w
        e.Timestamp = t
+       e.Pane = pn
        return e, nil
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go 
b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
index 90e4763..c46d655 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
@@ -37,13 +37,14 @@ type FullValue struct {
 
        Timestamp typex.EventTime
        Windows   []typex.Window
+       Pane      typex.PaneInfo
 }
 
 func (v *FullValue) String() string {
        if v.Elm2 == nil {
-               return fmt.Sprintf("%v [@%v:%v]", v.Elm, v.Timestamp, v.Windows)
+               return fmt.Sprintf("%v [@%v:%v:%v]", v.Elm, v.Timestamp, 
v.Windows, v.Pane)
        }
-       return fmt.Sprintf("KV<%v,%v> [@%v:%v]", v.Elm, v.Elm2, v.Timestamp, 
v.Windows)
+       return fmt.Sprintf("KV<%v,%v> [@%v:%v:%v]", v.Elm, v.Elm2, v.Timestamp, 
v.Windows, v.Pane)
 }
 
 // Stream is a FullValue reader. It returns io.EOF when complete, but can be
diff --git a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go 
b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
index 83a0813..3a1900f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
@@ -62,7 +62,7 @@ func (n *ReshuffleInput) StartBundle(ctx context.Context, id 
string, data DataCo
 
 func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue, 
values ...ReStream) error {
        n.b.Reset()
-       if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, 
value.Timestamp, &n.b); err != nil {
+       if err := EncodeWindowedValueHeader(n.wEnc, value.Windows, 
value.Timestamp, value.Pane, &n.b); err != nil {
                return err
        }
        if err := n.enc.Encode(value, &n.b); err != nil {
@@ -135,7 +135,7 @@ func (n *ReshuffleOutput) ProcessElement(ctx 
context.Context, value *FullValue,
                        return errors.WithContextf(err, "reading values for 
%v", n)
                }
                n.b = *bytes.NewBuffer(v.Elm.([]byte))
-               ws, ts, err := DecodeWindowedValueHeader(n.wDec, &n.b)
+               ws, ts, pn, err := DecodeWindowedValueHeader(n.wDec, &n.b)
                if err != nil {
                        return errors.WithContextf(err, "decoding windows for 
%v", n)
                }
@@ -144,6 +144,7 @@ func (n *ReshuffleOutput) ProcessElement(ctx 
context.Context, value *FullValue,
                }
                n.ret.Windows = ws
                n.ret.Timestamp = ts
+               n.ret.Pane = pn
                if err := n.Out.ProcessElement(ctx, &n.ret); err != nil {
                        return err
                }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/window.go 
b/sdks/go/pkg/beam/core/runtime/exec/window.go
index c75046c..c6a6546 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/window.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/window.go
@@ -50,6 +50,7 @@ func (w *WindowInto) ProcessElement(ctx context.Context, elm 
*FullValue, values
                Timestamp: elm.Timestamp,
                Elm:       elm.Elm,
                Elm2:      elm.Elm2,
+               Pane:      elm.Pane,
        }
        return w.Out.ProcessElement(ctx, windowed, values...)
 }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 2f6b8a7..f3f7b9e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -981,22 +981,135 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
-               AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          makeTrigger(w.Trigger),
+               AccumulationMode: makeAccumulationMode(w.AccumulationMode),
+               OutputTime:       pipepb.OutputTime_END_OF_WINDOW,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  0,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY,
+       }
+       return ws, nil
+}
+
+func makeAccumulationMode(m window.AccumulationMode) 
pipepb.AccumulationMode_Enum {
+       switch m {
+       case window.Accumulating:
+               return pipepb.AccumulationMode_ACCUMULATING
+       case window.Discarding:
+               return pipepb.AccumulationMode_DISCARDING
+       case window.Unspecified:
+               return pipepb.AccumulationMode_UNSPECIFIED
+       case window.Retracting:
+               return pipepb.AccumulationMode_RETRACTING
+       default:
+               return pipepb.AccumulationMode_DISCARDING
+       }
+}
+
+func makeTrigger(t window.Trigger) *pipepb.Trigger {
+       switch t.Kind {
+       case window.DefaultTrigger:
+               return &pipepb.Trigger{
                        Trigger: &pipepb.Trigger_Default_{
                                Default: &pipepb.Trigger_Default{},
                        },
-               },
-               OutputTime:      pipepb.OutputTime_END_OF_WINDOW,
-               ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
-               AllowedLateness: 0,
-               OnTimeBehavior:  pipepb.OnTimeBehavior_FIRE_ALWAYS,
+               }
+       case window.AlwaysTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Always_{
+                               Always: &pipepb.Trigger_Always{},
+                       },
+               }
+       case window.AfterAnyTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_AfterAny_{
+                               AfterAny: &pipepb.Trigger_AfterAny{
+                                       Subtriggers: 
extractSubtriggers(t.SubTriggers),
+                               },
+                       },
+               }
+       case window.AfterAllTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_AfterAll_{
+                               AfterAll: &pipepb.Trigger_AfterAll{
+                                       Subtriggers: 
extractSubtriggers(t.SubTriggers),
+                               },
+                       },
+               }
+       case window.AfterProcessingTimeTrigger:
+               // TODO(BEAM-3304) Right now would work only for single delay 
value.
+               // could be configured to take more than one delay values later.
+               ttd := &pipepb.TimestampTransform{
+                       TimestampTransform: &pipepb.TimestampTransform_Delay_{
+                               Delay: 
&pipepb.TimestampTransform_Delay{DelayMillis: t.Delay},
+                       }}
+               tt := []*pipepb.TimestampTransform{ttd}
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_AfterProcessingTime_{
+                               AfterProcessingTime: 
&pipepb.Trigger_AfterProcessingTime{TimestampTransforms: tt},
+                       },
+               }
+       case window.ElementCountTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_ElementCount_{
+                               ElementCount: 
&pipepb.Trigger_ElementCount{ElementCount: t.ElementCount},
+                       },
+               }
+       case window.AfterEndOfWindowTrigger:
+               // TODO: change it to take user config triggers for early and 
late firings
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_AfterEndOfWindow_{
+                               AfterEndOfWindow: 
&pipepb.Trigger_AfterEndOfWindow{
+                                       EarlyFirings: 
makeTrigger(window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 1}),
+                                       LateFirings:  nil,
+                               },
+                       },
+               }
+       case window.RepeatTrigger:
+               if len(t.SubTriggers) != 1 {
+                       panic("Only 1 Subtrigger should be passed to Repeat 
Trigger")
+               }
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Repeat_{
+                               Repeat: &pipepb.Trigger_Repeat{Subtrigger: 
makeTrigger(t.SubTriggers[0])},
+                       },
+               }
+       case window.NeverTrigger:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Never_{
+                               Never: &pipepb.Trigger_Never{},
+                       },
+               }
+       case window.AfterSynchronizedProcessingTimeTrigger:
+               return &pipepb.Trigger{
+                       Trigger: 
&pipepb.Trigger_AfterSynchronizedProcessingTime_{
+                               AfterSynchronizedProcessingTime: 
&pipepb.Trigger_AfterSynchronizedProcessingTime{},
+                       },
+               }
+       default:
+               return &pipepb.Trigger{
+                       Trigger: &pipepb.Trigger_Default_{
+                               Default: &pipepb.Trigger_Default{},
+                       },
+               }
        }
-       return ws, nil
+}
+
+func extractSubtriggers(t []window.Trigger) []*pipepb.Trigger {
+       if len(t) <= 0 {
+               panic("At least one subtrigger required for composite 
triggers.")
+       }
+
+       var result []*pipepb.Trigger
+       for _, tr := range t {
+               result = append(result, makeTrigger(tr))
+       }
+       return result
 }
 
 func makeWindowFn(w *window.Fn) (*pipepb.FunctionSpec, error) {
diff --git a/sdks/go/pkg/beam/core/typex/class.go 
b/sdks/go/pkg/beam/core/typex/class.go
index 8c3fc53..a14197f 100644
--- a/sdks/go/pkg/beam/core/typex/class.go
+++ b/sdks/go/pkg/beam/core/typex/class.go
@@ -108,6 +108,7 @@ func isConcrete(t reflect.Type, visited map[uintptr]bool) 
bool {
        if t == nil ||
                t == EventTimeType ||
                t.Implements(WindowType) ||
+               t == PaneInfoType ||
                t == reflectx.Error ||
                t == reflectx.Context ||
                IsUniversal(t) {
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go 
b/sdks/go/pkg/beam/core/typex/fulltype.go
index abdaf64..cbfb443 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -416,3 +416,9 @@ func checkTypesNotNil(list []FullType) {
                }
        }
 }
+
+// NoFiringPane return PaneInfo assigned as NoFiringPane(0x0f)
+func NoFiringPane() PaneInfo {
+       pn := PaneInfo{IsFirst: true, IsLast: true, Timing: PaneUnknown}
+       return pn
+}
diff --git a/sdks/go/pkg/beam/core/typex/special.go 
b/sdks/go/pkg/beam/core/typex/special.go
index 3fc57f8..dd4199c 100644
--- a/sdks/go/pkg/beam/core/typex/special.go
+++ b/sdks/go/pkg/beam/core/typex/special.go
@@ -35,6 +35,7 @@ var (
 
        EventTimeType = reflect.TypeOf((*EventTime)(nil)).Elem()
        WindowType    = reflect.TypeOf((*Window)(nil)).Elem()
+       PaneInfoType  = reflect.TypeOf((*PaneInfo)(nil)).Elem()
 
        KVType            = reflect.TypeOf((*KV)(nil)).Elem()
        CoGBKType         = reflect.TypeOf((*CoGBK)(nil)).Elem()
@@ -64,6 +65,21 @@ type Window interface {
        Equals(o Window) bool
 }
 
+type PaneTiming byte
+
+const (
+       PaneEarly   PaneTiming = 0
+       PaneOnTime  PaneTiming = 1
+       PaneLate    PaneTiming = 2
+       PaneUnknown PaneTiming = 3
+)
+
+type PaneInfo struct {
+       Timing                     PaneTiming
+       IsFirst, IsLast            bool
+       Index, NonSpeculativeIndex int64
+}
+
 // KV, CoGBK, WindowedValue represent composite generic types. They are not 
used
 // directly in user code signatures, but only in FullTypes.
 
diff --git a/sdks/go/pkg/beam/pcollection.go b/sdks/go/pkg/beam/pcollection.go
index eb54211..e5dc632 100644
--- a/sdks/go/pkg/beam/pcollection.go
+++ b/sdks/go/pkg/beam/pcollection.go
@@ -49,7 +49,6 @@ func (p PCollection) IsValid() bool {
 }
 
 // TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.
-// TODO(herohde) 5/30/2017: add windowing strategy and documentation.
 
 // Type returns the full type 'A' of the elements. 'A' must be a concrete
 // type, such as int or KV<int,string>.
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
index caff970..e18ff6c 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/stringx"
@@ -117,7 +118,7 @@ func (x *translator) translateTransform(trunk string, id 
string) ([]*df.Step, er
                // URL Query-escaped windowed _unnested_ value. It is read back 
in
                // a nested context at runtime.
                var buf bytes.Buffer
-               if err := 
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+               if err := 
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), &buf); 
err != nil {
                        return nil, err
                }
                value := string(append(buf.Bytes(), t.GetSpec().Payload...))
diff --git a/sdks/go/pkg/beam/windowing.go b/sdks/go/pkg/beam/windowing.go
index 581ffa0..8882ed2 100644
--- a/sdks/go/pkg/beam/windowing.go
+++ b/sdks/go/pkg/beam/windowing.go
@@ -16,26 +16,55 @@
 package beam
 
 import (
+       "fmt"
+
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+       windowIntoOption()
+}
+
+type WindowTrigger struct {
+       Name window.Trigger
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
+type AccumulationMode struct {
+       Mode window.AccumulationMode
+}
+
+func (m AccumulationMode) windowIntoOption() {}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-       return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts 
...WindowIntoOption) PCollection {
+       return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, 
error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts 
...WindowIntoOption) (PCollection, error) {
        if !s.IsValid() {
                return PCollection{}, errors.New("invalid scope")
        }
        if !col.IsValid() {
                return PCollection{}, errors.New("invalid input pcollection")
        }
+       ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Trigger{}}
+       for _, opt := range opts {
+               switch opt := opt.(type) {
+               case WindowTrigger:
+                       ws.Trigger = opt.Name
+               case AccumulationMode:
+                       ws.AccumulationMode = opt.Mode
+               default:
+                       panic(fmt.Sprintf("Unknown WindowInto option type: %T: 
%v", opt, opt))
+               }
+       }
 
-       edge := graph.NewWindowInto(s.real, s.scope, ws, col.n)
+       edge := graph.NewWindowInto(s.real, s.scope, &ws, col.n)
        ret := PCollection{edge.Output[0].To}
        return ret, nil
 }
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index d76b09a..beeacdf 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -60,6 +60,8 @@ var sickbay = []string{}
 var directFilters = []string{
        // The direct runner does not yet support cross-language.
        "TestXLang.*",
+       // Triggers are not yet supported
+       "TestTrigger.*",
        // The direct runner does not support the TestStream primitive
        "TestTestStream.*",
 }
@@ -67,6 +69,8 @@ var directFilters = []string{
 var portableFilters = []string{
        // The portable runner does not support the TestStream primitive
        "TestTestStream.*",
+       // The trigger tests uses TestStream
+       "TestTrigger.*",
 }
 
 var flinkFilters = []string{
@@ -74,6 +78,8 @@ var flinkFilters = []string{
        "TestXLang_Combine.*",
        // TODO(BEAM-12753): Flink test stream fails for non-string/byte slice 
inputs
        "TestTestStream.*Sequence.*",
+       // Triggers are not yet supported
+       "TestTrigger.*",
 }
 
 var samzaFilters = []string{
@@ -82,6 +88,8 @@ var samzaFilters = []string{
        "TestReshuffleKV",
        // The Samza runner does not support the TestStream primitive
        "TestTestStream.*",
+       // The trigger tests uses TestStream
+       "TestTrigger.*",
 }
 
 var sparkFilters = []string{
@@ -91,6 +99,8 @@ var sparkFilters = []string{
        "TestParDoKVSideInput",
        // The Spark runner does not support the TestStream primitive
        "TestTestStream.*",
+       // The trigger tests uses TestStream
+       "TestTrigger.*",
 }
 
 var dataflowFilters = []string{
@@ -98,6 +108,8 @@ var dataflowFilters = []string{
        "TestFlattenDup",
        // The Dataflow runner does not support the TestStream primitive
        "TestTestStream.*",
+       // The trigger tests uses TestStream
+       "TestTrigger.*",
 }
 
 // CheckFilters checks if an integration test is filtered to be skipped, either
diff --git a/sdks/go/test/integration/primitives/windowinto.go 
b/sdks/go/test/integration/primitives/windowinto.go
index 0c2be7f..fbd69b1 100644
--- a/sdks/go/test/integration/primitives/windowinto.go
+++ b/sdks/go/test/integration/primitives/windowinto.go
@@ -23,6 +23,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
 )
 
@@ -43,7 +44,7 @@ func (f *createTimestampedData) ProcessElement(_ []byte, emit 
func(beam.EventTim
        }
 }
 
-// WindowsSums produces a pipeline that generates the numbers of a 3x3 magic 
square, and
+// WindowSums produces a pipeline that generates the numbers of a 3x3 magic 
square, and
 // configures the pipeline so that PCollection. Sum is a closure to handle 
summing data over the window, in a few conditions.
 func WindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection) 
beam.PCollection) {
        timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
@@ -91,3 +92,91 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr 
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+       windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, 
beam.AccumulationMode{Mode: m})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end 
of the window
+func TriggerDefault(s beam.Scope) {
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       con.AddElements(12000, 4.0, 5.0)
+       con.AdvanceWatermark(13000)
+
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every 
input value as the output.
+func TriggerAlways(s beam.Scope) {
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.AlwaysTrigger}, window.Discarding, 1.0, 2.0, 3.0)
+}
+
+// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N 
elements to be ready
+// to fire an output pane
+func TriggerElementCount(s beam.Scope) {
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(2000)
+       con.AddElements(6000, 4.0, 5.0)
+       con.AdvanceWatermark(10000)
+       con.AddElements(52000, 10.0)
+       con.AdvanceWatermark(53000)
+
+       col := teststream.Create(s, con)
+
+       // waits only for two elements to arrive and fires output after that 
and never fires that.
+       // For the trigger to fire every 2 elements, combine it with Repeat 
Trigger
+       tr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 2}
+       windowed := beam.WindowInto(s, window.NewGlobalWindows(), col, 
beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Count(s, sums, "total collections", 1)
+}
+
+// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires 
output panes once 't' processing time has passed
+// Not yet supported by the flink runner:
+// java.lang.UnsupportedOperationException: Advancing Processing time is not 
supported by the Flink Runner.
+func TriggerAfterProcessingTime(s beam.Scope) {
+       con := teststream.NewConfig()
+       con.AdvanceProcessingTime(100)
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceProcessingTime(2000)
+       con.AddElements(22000, 4.0)
+
+       col := teststream.Create(s, con)
+
+       validate(s.Scope("Fixed"), window.NewGlobalWindows(), col, 
window.Trigger{Kind: window.AfterProcessingTimeTrigger, Delay: 5000}, 
window.Discarding, 6.0)
+}
+
+// TriggerRepeat tests the repeat trigger. As of now is it is configure to 
take only one trigger as a subtrigger.
+// In the below test, it is expected to receive three output panes with two 
elements each.
+func TriggerRepeat(s beam.Scope) {
+       // create a teststream pipeline and get the pcollection
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(2000)
+       con.AddElements(6000, 4.0, 5.0, 6.0)
+       con.AdvanceWatermark(10000)
+
+       col := teststream.Create(s, con)
+
+       subTr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 
2}
+       tr := window.Trigger{Kind: window.RepeatTrigger, SubTriggers: 
[]window.Trigger{subTr}}
+       windowed := beam.WindowInto(s, window.NewGlobalWindows(), col, 
beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Count(s, sums, "total collections", 3)
+}
diff --git a/sdks/go/test/integration/primitives/windowinto_test.go 
b/sdks/go/test/integration/primitives/windowinto_test.go
index 4552176..ab5c6746 100644
--- a/sdks/go/test/integration/primitives/windowinto_test.go
+++ b/sdks/go/test/integration/primitives/windowinto_test.go
@@ -36,3 +36,31 @@ func TestWindowSums_GBK(t *testing.T) {
        WindowSums_GBK(s)
        ptest.RunAndValidate(t, p)
 }
+
+func TestTriggerDefault(t *testing.T) {
+       integration.CheckFilters(t)
+       p, s := beam.NewPipelineWithRoot()
+       TriggerDefault(s)
+       ptest.RunAndValidate(t, p)
+}
+
+func TestTriggerAlways(t *testing.T) {
+       integration.CheckFilters(t)
+       p, s := beam.NewPipelineWithRoot()
+       TriggerAlways(s)
+       ptest.RunAndValidate(t, p)
+}
+
+func TestTriggerElementCount(t *testing.T) {
+       integration.CheckFilters(t)
+       p, s := beam.NewPipelineWithRoot()
+       TriggerElementCount(s)
+       ptest.RunAndValidate(t, p)
+}
+
+func TestTriggerRepeat(t *testing.T) {
+       integration.CheckFilters(t)
+       p, s := beam.NewPipelineWithRoot()
+       TriggerRepeat(s)
+       ptest.RunAndValidate(t, p)
+}

Reply via email to