This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b0e9f26 [BEAM-3304] Go triggering support (#15239)
b0e9f26 is described below
commit b0e9f2638cbaca822ee6a58bd8ec0e61db8e799e
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Thu Aug 19 16:55:34 2021 -0400
[BEAM-3304] Go triggering support (#15239)
---
sdks/go/pkg/beam/core/graph/coder/coder.go | 6 +
sdks/go/pkg/beam/core/graph/coder/panes.go | 116 ++++++++++++++++++
sdks/go/pkg/beam/core/graph/edge.go | 6 +-
sdks/go/pkg/beam/core/graph/window/strategy.go | 18 ++-
sdks/go/pkg/beam/core/graph/window/trigger.go | 37 ++++++
sdks/go/pkg/beam/core/runtime/exec/coder.go | 62 +++++++---
sdks/go/pkg/beam/core/runtime/exec/datasink.go | 2 +-
sdks/go/pkg/beam/core/runtime/exec/datasource.go | 5 +-
.../pkg/beam/core/runtime/exec/datasource_test.go | 11 +-
.../go/pkg/beam/core/runtime/exec/dynsplit_test.go | 5 +-
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go | 5 +-
sdks/go/pkg/beam/core/runtime/exec/reshuffle.go | 5 +-
sdks/go/pkg/beam/core/runtime/exec/window.go | 1 +
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 129 +++++++++++++++++++--
sdks/go/pkg/beam/core/typex/class.go | 1 +
sdks/go/pkg/beam/core/typex/fulltype.go | 6 +
sdks/go/pkg/beam/core/typex/special.go | 16 +++
sdks/go/pkg/beam/pcollection.go | 1 -
.../beam/runners/dataflow/dataflowlib/translate.go | 3 +-
sdks/go/pkg/beam/windowing.go | 37 +++++-
sdks/go/test/integration/integration.go | 12 ++
sdks/go/test/integration/primitives/windowinto.go | 91 ++++++++++++++-
.../test/integration/primitives/windowinto_test.go | 28 +++++
23 files changed, 554 insertions(+), 49 deletions(-)
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index ebc26a2..6eea66b 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -170,6 +170,7 @@ const (
Double Kind = "double"
Row Kind = "R"
Timer Kind = "T"
+ PaneInfo Kind = "PI"
WindowedValue Kind = "W"
ParamWindowedValue Kind = "PW"
Iterable Kind = "I"
@@ -297,6 +298,11 @@ func IsW(c *Coder) bool {
return c.Kind == WindowedValue
}
+// NewPI returns a PaneInfo coder
+func NewPI() *Coder {
+ return &Coder{Kind: PaneInfo, T: typex.New(typex.PaneInfoType)}
+}
+
// NewW returns a WindowedValue coder for the window of elements.
func NewW(c *Coder, w *WindowCoder) *Coder {
if c == nil {
diff --git a/sdks/go/pkg/beam/core/graph/coder/panes.go
b/sdks/go/pkg/beam/core/graph/coder/panes.go
new file mode 100644
index 0000000..3ccd987
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/panes.go
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+ // Encoding: typex.PaneInfo
+
+ pane := byte(0)
+ if v.IsFirst {
+ pane |= 0x01
+ }
+ if v.IsLast {
+ pane |= 0x02
+ }
+ pane |= byte(v.Timing << 2)
+
+ switch {
+ case v.Index == 0 || v.NonSpeculativeIndex == 0 || v.Timing ==
typex.PaneUnknown:
+ // The entire pane info is encoded as a single byte
+ paneByte := []byte{pane}
+ w.Write(paneByte)
+ case v.Index == v.NonSpeculativeIndex || v.Timing == typex.PaneEarly:
+ // The pane info is encoded as this byte plus a single VarInt
encoded integer
+ paneByte := []byte{pane | 1<<4}
+ w.Write(paneByte)
+ EncodeVarInt(v.Index, w)
+ default:
+ // The pane info is encoded as this byte plus two VarInt
encoded integer
+ paneByte := []byte{pane | 2<<4}
+ w.Write(paneByte)
+ EncodeVarInt(v.Index, w)
+ EncodeVarInt(v.NonSpeculativeIndex, w)
+ }
+ return nil
+}
+
+// NewPane initializes the PaneInfo from a given byte.
+// By default, PaneInfo is assigned to NoFiringPane.
+func NewPane(b byte) typex.PaneInfo {
+ pn := typex.NoFiringPane()
+
+ if b&0x01 == 1 {
+ pn.IsFirst = true
+ }
+ if b&0x02 == 2 {
+ pn.IsLast = true
+ }
+
+ pn.Timing = typex.PaneTiming((b >> 2) & 0x03)
+ return pn
+}
+
+// DecodePane decodes a single byte.
+func DecodePane(r io.Reader) (typex.PaneInfo, error) {
+ // Decoding: typex.PaneInfo
+
+ var data [1]byte
+ if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil {
+ return typex.PaneInfo{}, err
+ }
+
+ pn := NewPane(data[0] & 0x0f)
+
+ switch data[0] >> 4 {
+ case 0:
+ // Result encoded in only one pane.
+ return pn, nil
+ case 1:
+ // Result encoded in one pane plus a VarInt encoded integer.
+ data, err := DecodeVarInt(r)
+ if err != nil {
+ return typex.PaneInfo{}, err
+ }
+
+ pn.Index = data
+ if pn.Timing == typex.PaneEarly {
+ pn.NonSpeculativeIndex = -1
+ } else {
+ pn.NonSpeculativeIndex = pn.Index
+ }
+ case 2:
+ // Result encoded in one pane plus two VarInt encoded integer.
+ data, err := DecodeVarInt(r)
+ if err != nil {
+ return typex.PaneInfo{}, err
+ }
+
+ pn.Index = data
+ pn.NonSpeculativeIndex, err = DecodeVarInt(r)
+ if err != nil {
+ return typex.PaneInfo{}, err
+ }
+ }
+ return pn, nil
+}
diff --git a/sdks/go/pkg/beam/core/graph/edge.go
b/sdks/go/pkg/beam/core/graph/edge.go
index 3f1933f..e1f0d0b 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -533,13 +533,13 @@ func NewImpulse(g *Graph, s *Scope, value []byte)
*MultiEdge {
}
// NewWindowInto inserts a new WindowInto edge into the graph.
-func NewWindowInto(g *Graph, s *Scope, wfn *window.Fn, in *Node) *MultiEdge {
- n := g.NewNode(in.Type(), &window.WindowingStrategy{Fn: wfn},
in.Bounded())
+func NewWindowInto(g *Graph, s *Scope, ws *window.WindowingStrategy, in *Node)
*MultiEdge {
+ n := g.NewNode(in.Type(), ws, in.Bounded())
n.Coder = in.Coder
edge := g.NewEdge(s)
edge.Op = WindowInto
- edge.WindowFn = wfn
+ edge.WindowFn = ws.Fn
edge.Input = []*Inbound{{Kind: Main, From: in, Type: in.Type()}}
edge.Output = []*Outbound{{To: n, Type: in.Type()}}
return edge
diff --git a/sdks/go/pkg/beam/core/graph/window/strategy.go
b/sdks/go/pkg/beam/core/graph/window/strategy.go
index 09609d6..0dca6ab 100644
--- a/sdks/go/pkg/beam/core/graph/window/strategy.go
+++ b/sdks/go/pkg/beam/core/graph/window/strategy.go
@@ -16,12 +16,22 @@
// Package window contains window representation, windowing strategies and
utilities.
package window
+type AccumulationMode string
+
+const (
+ Unspecified AccumulationMode = "AccumulationMode_UNSPECIFIED"
+ Discarding AccumulationMode = "AccumulationMode_DISCARDING"
+ Accumulating AccumulationMode = "AccumulationMode_ACCUMULATING"
+ Retracting AccumulationMode = "AccumulationMode_RETRACTING"
+)
+
// WindowingStrategy defines the types of windowing used in a pipeline and
contains
// the data to support executing a windowing strategy.
type WindowingStrategy struct {
- Fn *Fn
-
- // TODO(BEAM-3304): trigger support
+ Fn *Fn
+ Trigger Trigger
+ AccumulationMode AccumulationMode
+ AllowedLateness int // in milliseconds
}
func (ws *WindowingStrategy) Equals(o *WindowingStrategy) bool {
@@ -34,5 +44,5 @@ func (ws *WindowingStrategy) String() string {
// DefaultWindowingStrategy returns the default windowing strategy.
func DefaultWindowingStrategy() *WindowingStrategy {
- return &WindowingStrategy{Fn: NewGlobalWindows()}
+ return &WindowingStrategy{Fn: NewGlobalWindows(), Trigger:
Trigger{Kind: DefaultTrigger}, AccumulationMode: Discarding, AllowedLateness: 0}
}
diff --git a/sdks/go/pkg/beam/core/graph/window/trigger.go
b/sdks/go/pkg/beam/core/graph/window/trigger.go
new file mode 100644
index 0000000..8368310
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/window/trigger.go
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+type Trigger struct {
+ Kind string
+ SubTriggers []Trigger
+ Delay int64 // in milliseconds
+ ElementCount int32
+}
+
+const (
+ DefaultTrigger string = "Trigger_Default_"
+ AlwaysTrigger string = "Trigger_Always_"
+ AfterAnyTrigger string = "Trigger_AfterAny_"
+ AfterAllTrigger string = "Trigger_AfterAll_"
+ AfterProcessingTimeTrigger string =
"Trigger_AfterProcessing_Time_"
+ ElementCountTrigger string = "Trigger_ElementCount_"
+ AfterEndOfWindowTrigger string =
"Trigger_AfterEndOfWindow_"
+ RepeatTrigger string = "Trigger_Repeat_"
+ OrFinallyTrigger string = "Trigger_OrFinally_"
+ NeverTrigger string = "Trigger_Never_"
+ AfterSynchronizedProcessingTimeTrigger string =
"Trigger_AfterSynchronizedProcessingTime_"
+)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 6156f23..c7a19ea 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -122,6 +122,9 @@ func MakeElementEncoder(c *coder.Coder) ElementEncoder {
enc: MakeWindowEncoder(c.Window),
}
+ case coder.PaneInfo:
+ return &paneEncoder{}
+
case coder.Iterable:
return &iterableEncoder{
enc: MakeElementEncoder(c.Components[0]),
@@ -228,6 +231,9 @@ func MakeElementDecoder(c *coder.Coder) ElementDecoder {
dec: MakeWindowDecoder(c.Window),
}
+ case coder.PaneInfo:
+ return &paneDecoder{}
+
// Note: Iterables in CoGBK are handled in datasource.go instead.
case coder.Iterable:
return &iterableDecoder{
@@ -749,10 +755,11 @@ func (c *arrayDecoder) Decode(r io.Reader) (*FullValue,
error) {
type windowedValueEncoder struct {
elm ElementEncoder
win WindowEncoder
+ // need to add pane encoder here
}
func (e *windowedValueEncoder) Encode(val *FullValue, w io.Writer) error {
- if err := EncodeWindowedValueHeader(e.win, val.Windows, val.Timestamp,
w); err != nil {
+ if err := EncodeWindowedValueHeader(e.win, val.Windows, val.Timestamp,
val.Pane, w); err != nil {
return err
}
return e.elm.Encode(val, w)
@@ -765,7 +772,7 @@ type windowedValueDecoder struct {
func (d *windowedValueDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
// Encoding: beam utf8 string (length prefix + run of bytes)
- w, et, err := DecodeWindowedValueHeader(d.win, r)
+ w, et, pn, err := DecodeWindowedValueHeader(d.win, r)
if err != nil {
return err
}
@@ -774,6 +781,7 @@ func (d *windowedValueDecoder) DecodeTo(r io.Reader, fv
*FullValue) error {
}
fv.Windows = w
fv.Timestamp = et
+ fv.Pane = pn
return nil
}
@@ -842,6 +850,31 @@ func (d *timerDecoder) Decode(r io.Reader) (*FullValue,
error) {
return fv, nil
}
+type paneEncoder struct{}
+
+func (*paneEncoder) Encode(val *FullValue, w io.Writer) error {
+ return coder.EncodePane(val.Pane, w)
+}
+
+type paneDecoder struct{}
+
+func (*paneDecoder) DecodeTo(r io.Reader, fv *FullValue) error {
+ data, err := coder.DecodePane(r)
+ if err != nil {
+ return err
+ }
+ *fv = FullValue{Pane: data}
+ return nil
+}
+
+func (d *paneDecoder) Decode(r io.Reader) (*FullValue, error) {
+ fv := &FullValue{}
+ if err := d.DecodeTo(r, fv); err != nil {
+ return nil, err
+ }
+ return fv, nil
+}
+
type rowEncoder struct {
enc func(interface{}, io.Writer) error
}
@@ -1076,10 +1109,8 @@ func (*intervalWindowDecoder) DecodeSingle(r io.Reader)
(typex.Window, error) {
return window.IntervalWindow{Start:
mtime.FromMilliseconds(end.Milliseconds() - int64(duration)), End: end}, nil
}
-var paneNoFiring = []byte{0xf}
-
// EncodeWindowedValueHeader serializes a windowed value header.
-func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t
typex.EventTime, w io.Writer) error {
+func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t
typex.EventTime, p typex.PaneInfo, w io.Writer) error {
// Encoding: Timestamp, Window, Pane (header) + Element
if err := coder.EncodeEventTime(t, w); err != nil {
@@ -1088,25 +1119,30 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws
[]typex.Window, t typex.Eve
if err := enc.Encode(ws, w); err != nil {
return err
}
- _, err := w.Write(paneNoFiring)
+ err := coder.EncodePane(p, w)
return err
}
// DecodeWindowedValueHeader deserializes a windowed value header.
-func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader)
([]typex.Window, typex.EventTime, error) {
+func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader)
([]typex.Window, typex.EventTime, typex.PaneInfo, error) {
// Encoding: Timestamp, Window, Pane (header) + Element
+ onError := func(err error) ([]typex.Window, typex.EventTime,
typex.PaneInfo, error) {
+ return nil, mtime.ZeroTimestamp, typex.NoFiringPane(), err
+ }
+
t, err := coder.DecodeEventTime(r)
if err != nil {
- return nil, mtime.ZeroTimestamp, err
+ return onError(err)
}
ws, err := dec.Decode(r)
if err != nil {
- return nil, mtime.ZeroTimestamp, err
+ return onError(err)
}
- var data [1]byte
- if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { // NO_FIRING
pane
- return nil, mtime.ZeroTimestamp, err
+ pn, err := coder.DecodePane(r)
+ if err != nil {
+ return onError(err)
}
- return ws, t, nil
+
+ return ws, t, pn, nil
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
index 5eea030..473ac38 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
@@ -60,7 +60,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value
*FullValue, values
// unit.
var b bytes.Buffer
- if err := EncodeWindowedValueHeader(n.wEnc, value.Windows,
value.Timestamp, &b); err != nil {
+ if err := EncodeWindowedValueHeader(n.wEnc, value.Windows,
value.Timestamp, value.Pane, &b); err != nil {
return err
}
if err := n.enc.Encode(value, &b); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index c75ca5b..2e8f25d 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -145,7 +145,7 @@ func (n *DataSource) Process(ctx context.Context) error {
return nil
}
// TODO(lostluck) 2020/02/22: Should we include window headers
or just count the element sizes?
- ws, t, err := DecodeWindowedValueHeader(wc, r)
+ ws, t, pn, err := DecodeWindowedValueHeader(wc, r)
if err != nil {
if err == io.EOF {
return nil
@@ -160,6 +160,7 @@ func (n *DataSource) Process(ctx context.Context) error {
}
pe.Timestamp = t
pe.Windows = ws
+ pe.Pane = pn
var valReStreams []ReStream
for _, cv := range cvs {
@@ -539,7 +540,7 @@ func splitHelper(
func encodeElm(elm *FullValue, wc WindowEncoder, ec ElementEncoder) ([]byte,
error) {
var b bytes.Buffer
- if err := EncodeWindowedValueHeader(wc, elm.Windows, elm.Timestamp,
&b); err != nil {
+ if err := EncodeWindowedValueHeader(wc, elm.Windows, elm.Timestamp,
elm.Pane, &b); err != nil {
return nil, err
}
if err := ec.Encode(elm, &b); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
index 2de6e30..8591933 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)
@@ -43,7 +44,7 @@ func TestDataSource_PerElement(t *testing.T) {
wc := MakeWindowEncoder(c.Window)
ec := MakeElementEncoder(coder.SkipW(c))
for _, v := range expected {
- EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, pw)
+ EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), pw)
ec.Encode(&FullValue{Elm: v}, pw)
}
pw.Close()
@@ -98,7 +99,7 @@ func TestDataSource_Iterators(t *testing.T) {
driver: func(c *coder.Coder, dmw io.WriteCloser, _
func() io.WriteCloser, ks, vs []interface{}) {
wc, kc, vc := extractCoders(c)
for _, k := range ks {
- EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, dmw)
+ EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), dmw)
kc.Encode(&FullValue{Elm: k}, dmw)
coder.EncodeInt32(int32(len(vs)), dmw)
// Number of elements.
for _, v := range vs {
@@ -116,7 +117,7 @@ func TestDataSource_Iterators(t *testing.T) {
driver: func(c *coder.Coder, dmw io.WriteCloser, _
func() io.WriteCloser, ks, vs []interface{}) {
wc, kc, vc := extractCoders(c)
for _, k := range ks {
- EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, dmw)
+ EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), dmw)
kc.Encode(&FullValue{Elm: k}, dmw)
coder.EncodeInt32(-1, dmw) // Mark this
as a multi-Chunk (though beam runner proto says to use 0)
@@ -137,7 +138,7 @@ func TestDataSource_Iterators(t *testing.T) {
driver: func(c *coder.Coder, dmw io.WriteCloser, swFn
func() io.WriteCloser, ks, vs []interface{}) {
wc, kc, vc := extractCoders(c)
for _, k := range ks {
- EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, dmw)
+ EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), dmw)
kc.Encode(&FullValue{Elm: k}, dmw)
coder.EncodeInt32(-1, dmw) // Mark as
multi-chunk (though beam, runner says to use 0)
coder.EncodeVarInt(-1, dmw) // Mark
subsequent chunks as "state backed"
@@ -235,7 +236,7 @@ func TestDataSource_Split(t *testing.T) {
wc := MakeWindowEncoder(c.Window)
ec := MakeElementEncoder(coder.SkipW(c))
for _, v := range elements {
- EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, pw)
+ EncodeWindowedValueHeader(wc,
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), pw)
ec.Encode(&FullValue{Elm: v}, pw)
}
pw.Close()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
index b057579..2c87a3b 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
@@ -279,7 +279,7 @@ func createSdfPlan(t *testing.T, name string, fn
*graph.DoFn, cdr *coder.Coder)
func writeElm(elm *FullValue, cdr *coder.Coder, pw *io.PipeWriter) {
wc := MakeWindowEncoder(cdr.Window)
ec := MakeElementEncoder(coder.SkipW(cdr))
- if err := EncodeWindowedValueHeader(wc, window.SingleGlobalWindow,
mtime.ZeroTimestamp, pw); err != nil {
+ if err := EncodeWindowedValueHeader(wc, window.SingleGlobalWindow,
mtime.ZeroTimestamp, typex.NoFiringPane(), pw); err != nil {
panic("err")
}
if err := ec.Encode(elm, pw); err != nil {
@@ -294,7 +294,7 @@ func decodeDynSplitElm(elm []byte, cdr *coder.Coder)
(*FullValue, error) {
wd := MakeWindowDecoder(cdr.Window)
ed := MakeElementDecoder(coder.SkipW(cdr))
b := bytes.NewBuffer(elm)
- w, t, err := DecodeWindowedValueHeader(wd, b)
+ w, t, pn, err := DecodeWindowedValueHeader(wd, b)
if err != nil {
return nil, err
}
@@ -304,6 +304,7 @@ func decodeDynSplitElm(elm []byte, cdr *coder.Coder)
(*FullValue, error) {
}
e.Windows = w
e.Timestamp = t
+ e.Pane = pn
return e, nil
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
index 90e4763..c46d655 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
@@ -37,13 +37,14 @@ type FullValue struct {
Timestamp typex.EventTime
Windows []typex.Window
+ Pane typex.PaneInfo
}
func (v *FullValue) String() string {
if v.Elm2 == nil {
- return fmt.Sprintf("%v [@%v:%v]", v.Elm, v.Timestamp, v.Windows)
+ return fmt.Sprintf("%v [@%v:%v:%v]", v.Elm, v.Timestamp,
v.Windows, v.Pane)
}
- return fmt.Sprintf("KV<%v,%v> [@%v:%v]", v.Elm, v.Elm2, v.Timestamp,
v.Windows)
+ return fmt.Sprintf("KV<%v,%v> [@%v:%v:%v]", v.Elm, v.Elm2, v.Timestamp,
v.Windows, v.Pane)
}
// Stream is a FullValue reader. It returns io.EOF when complete, but can be
diff --git a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
index 83a0813..3a1900f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/reshuffle.go
@@ -62,7 +62,7 @@ func (n *ReshuffleInput) StartBundle(ctx context.Context, id
string, data DataCo
func (n *ReshuffleInput) ProcessElement(ctx context.Context, value *FullValue,
values ...ReStream) error {
n.b.Reset()
- if err := EncodeWindowedValueHeader(n.wEnc, value.Windows,
value.Timestamp, &n.b); err != nil {
+ if err := EncodeWindowedValueHeader(n.wEnc, value.Windows,
value.Timestamp, value.Pane, &n.b); err != nil {
return err
}
if err := n.enc.Encode(value, &n.b); err != nil {
@@ -135,7 +135,7 @@ func (n *ReshuffleOutput) ProcessElement(ctx
context.Context, value *FullValue,
return errors.WithContextf(err, "reading values for
%v", n)
}
n.b = *bytes.NewBuffer(v.Elm.([]byte))
- ws, ts, err := DecodeWindowedValueHeader(n.wDec, &n.b)
+ ws, ts, pn, err := DecodeWindowedValueHeader(n.wDec, &n.b)
if err != nil {
return errors.WithContextf(err, "decoding windows for
%v", n)
}
@@ -144,6 +144,7 @@ func (n *ReshuffleOutput) ProcessElement(ctx
context.Context, value *FullValue,
}
n.ret.Windows = ws
n.ret.Timestamp = ts
+ n.ret.Pane = pn
if err := n.Out.ProcessElement(ctx, &n.ret); err != nil {
return err
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/window.go
b/sdks/go/pkg/beam/core/runtime/exec/window.go
index c75046c..c6a6546 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/window.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/window.go
@@ -50,6 +50,7 @@ func (w *WindowInto) ProcessElement(ctx context.Context, elm
*FullValue, values
Timestamp: elm.Timestamp,
Elm: elm.Elm,
Elm2: elm.Elm2,
+ Pane: elm.Pane,
}
return w.Out.ProcessElement(ctx, windowed, values...)
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 2f6b8a7..f3f7b9e 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -981,22 +981,135 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+
ws := &pipepb.WindowingStrategy{
WindowFn: windowFn,
MergeStatus: mergeStat,
- AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: windowCoderId,
- Trigger: &pipepb.Trigger{
+ Trigger: makeTrigger(w.Trigger),
+ AccumulationMode: makeAccumulationMode(w.AccumulationMode),
+ OutputTime: pipepb.OutputTime_END_OF_WINDOW,
+ ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ AllowedLateness: 0,
+ OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY,
+ }
+ return ws, nil
+}
+
+func makeAccumulationMode(m window.AccumulationMode)
pipepb.AccumulationMode_Enum {
+ switch m {
+ case window.Accumulating:
+ return pipepb.AccumulationMode_ACCUMULATING
+ case window.Discarding:
+ return pipepb.AccumulationMode_DISCARDING
+ case window.Unspecified:
+ return pipepb.AccumulationMode_UNSPECIFIED
+ case window.Retracting:
+ return pipepb.AccumulationMode_RETRACTING
+ default:
+ return pipepb.AccumulationMode_DISCARDING
+ }
+}
+
+func makeTrigger(t window.Trigger) *pipepb.Trigger {
+ switch t.Kind {
+ case window.DefaultTrigger:
+ return &pipepb.Trigger{
Trigger: &pipepb.Trigger_Default_{
Default: &pipepb.Trigger_Default{},
},
- },
- OutputTime: pipepb.OutputTime_END_OF_WINDOW,
- ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
- AllowedLateness: 0,
- OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
+ }
+ case window.AlwaysTrigger:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Always_{
+ Always: &pipepb.Trigger_Always{},
+ },
+ }
+ case window.AfterAnyTrigger:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_AfterAny_{
+ AfterAny: &pipepb.Trigger_AfterAny{
+ Subtriggers:
extractSubtriggers(t.SubTriggers),
+ },
+ },
+ }
+ case window.AfterAllTrigger:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_AfterAll_{
+ AfterAll: &pipepb.Trigger_AfterAll{
+ Subtriggers:
extractSubtriggers(t.SubTriggers),
+ },
+ },
+ }
+ case window.AfterProcessingTimeTrigger:
+ // TODO(BEAM-3304) Right now would work only for single delay
value.
+ // could be configured to take more than one delay values later.
+ ttd := &pipepb.TimestampTransform{
+ TimestampTransform: &pipepb.TimestampTransform_Delay_{
+ Delay:
&pipepb.TimestampTransform_Delay{DelayMillis: t.Delay},
+ }}
+ tt := []*pipepb.TimestampTransform{ttd}
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_AfterProcessingTime_{
+ AfterProcessingTime:
&pipepb.Trigger_AfterProcessingTime{TimestampTransforms: tt},
+ },
+ }
+ case window.ElementCountTrigger:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_ElementCount_{
+ ElementCount:
&pipepb.Trigger_ElementCount{ElementCount: t.ElementCount},
+ },
+ }
+ case window.AfterEndOfWindowTrigger:
+ // TODO: change it to take user config triggers for early and
late firings
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_AfterEndOfWindow_{
+ AfterEndOfWindow:
&pipepb.Trigger_AfterEndOfWindow{
+ EarlyFirings:
makeTrigger(window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 1}),
+ LateFirings: nil,
+ },
+ },
+ }
+ case window.RepeatTrigger:
+ if len(t.SubTriggers) != 1 {
+ panic("Only 1 Subtrigger should be passed to Repeat
Trigger")
+ }
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Repeat_{
+ Repeat: &pipepb.Trigger_Repeat{Subtrigger:
makeTrigger(t.SubTriggers[0])},
+ },
+ }
+ case window.NeverTrigger:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Never_{
+ Never: &pipepb.Trigger_Never{},
+ },
+ }
+ case window.AfterSynchronizedProcessingTimeTrigger:
+ return &pipepb.Trigger{
+ Trigger:
&pipepb.Trigger_AfterSynchronizedProcessingTime_{
+ AfterSynchronizedProcessingTime:
&pipepb.Trigger_AfterSynchronizedProcessingTime{},
+ },
+ }
+ default:
+ return &pipepb.Trigger{
+ Trigger: &pipepb.Trigger_Default_{
+ Default: &pipepb.Trigger_Default{},
+ },
+ }
}
- return ws, nil
+}
+
+func extractSubtriggers(t []window.Trigger) []*pipepb.Trigger {
+ if len(t) <= 0 {
+ panic("At least one subtrigger required for composite
triggers.")
+ }
+
+ var result []*pipepb.Trigger
+ for _, tr := range t {
+ result = append(result, makeTrigger(tr))
+ }
+ return result
}
func makeWindowFn(w *window.Fn) (*pipepb.FunctionSpec, error) {
diff --git a/sdks/go/pkg/beam/core/typex/class.go
b/sdks/go/pkg/beam/core/typex/class.go
index 8c3fc53..a14197f 100644
--- a/sdks/go/pkg/beam/core/typex/class.go
+++ b/sdks/go/pkg/beam/core/typex/class.go
@@ -108,6 +108,7 @@ func isConcrete(t reflect.Type, visited map[uintptr]bool)
bool {
if t == nil ||
t == EventTimeType ||
t.Implements(WindowType) ||
+ t == PaneInfoType ||
t == reflectx.Error ||
t == reflectx.Context ||
IsUniversal(t) {
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go
b/sdks/go/pkg/beam/core/typex/fulltype.go
index abdaf64..cbfb443 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -416,3 +416,9 @@ func checkTypesNotNil(list []FullType) {
}
}
}
+
+// NoFiringPane return PaneInfo assigned as NoFiringPane(0x0f)
+func NoFiringPane() PaneInfo {
+ pn := PaneInfo{IsFirst: true, IsLast: true, Timing: PaneUnknown}
+ return pn
+}
diff --git a/sdks/go/pkg/beam/core/typex/special.go
b/sdks/go/pkg/beam/core/typex/special.go
index 3fc57f8..dd4199c 100644
--- a/sdks/go/pkg/beam/core/typex/special.go
+++ b/sdks/go/pkg/beam/core/typex/special.go
@@ -35,6 +35,7 @@ var (
EventTimeType = reflect.TypeOf((*EventTime)(nil)).Elem()
WindowType = reflect.TypeOf((*Window)(nil)).Elem()
+ PaneInfoType = reflect.TypeOf((*PaneInfo)(nil)).Elem()
KVType = reflect.TypeOf((*KV)(nil)).Elem()
CoGBKType = reflect.TypeOf((*CoGBK)(nil)).Elem()
@@ -64,6 +65,21 @@ type Window interface {
Equals(o Window) bool
}
+type PaneTiming byte
+
+const (
+ PaneEarly PaneTiming = 0
+ PaneOnTime PaneTiming = 1
+ PaneLate PaneTiming = 2
+ PaneUnknown PaneTiming = 3
+)
+
+type PaneInfo struct {
+ Timing PaneTiming
+ IsFirst, IsLast bool
+ Index, NonSpeculativeIndex int64
+}
+
// KV, CoGBK, WindowedValue represent composite generic types. They are not
used
// directly in user code signatures, but only in FullTypes.
diff --git a/sdks/go/pkg/beam/pcollection.go b/sdks/go/pkg/beam/pcollection.go
index eb54211..e5dc632 100644
--- a/sdks/go/pkg/beam/pcollection.go
+++ b/sdks/go/pkg/beam/pcollection.go
@@ -49,7 +49,6 @@ func (p PCollection) IsValid() bool {
}
// TODO(herohde) 5/30/2017: add name for PCollections? Java supports it.
-// TODO(herohde) 5/30/2017: add windowing strategy and documentation.
// Type returns the full type 'A' of the elements. 'A' must be a concrete
// type, such as int or KV<int,string>.
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
index caff970..e18ff6c 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/stringx"
@@ -117,7 +118,7 @@ func (x *translator) translateTransform(trunk string, id
string) ([]*df.Step, er
// URL Query-escaped windowed _unnested_ value. It is read back
in
// a nested context at runtime.
var buf bytes.Buffer
- if err :=
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()),
window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+ if err :=
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()),
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.NoFiringPane(), &buf);
err != nil {
return nil, err
}
value := string(append(buf.Bytes(), t.GetSpec().Payload...))
diff --git a/sdks/go/pkg/beam/windowing.go b/sdks/go/pkg/beam/windowing.go
index 581ffa0..8882ed2 100644
--- a/sdks/go/pkg/beam/windowing.go
+++ b/sdks/go/pkg/beam/windowing.go
@@ -16,26 +16,55 @@
package beam
import (
+ "fmt"
+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)
+type WindowIntoOption interface {
+ windowIntoOption()
+}
+
+type WindowTrigger struct {
+ Name window.Trigger
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
+type AccumulationMode struct {
+ Mode window.AccumulationMode
+}
+
+func (m AccumulationMode) windowIntoOption() {}
+
// WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
- return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts
...WindowIntoOption) PCollection {
+ return Must(TryWindowInto(s, ws, col, opts...))
}
// TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection,
error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts
...WindowIntoOption) (PCollection, error) {
if !s.IsValid() {
return PCollection{}, errors.New("invalid scope")
}
if !col.IsValid() {
return PCollection{}, errors.New("invalid input pcollection")
}
+ ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Trigger{}}
+ for _, opt := range opts {
+ switch opt := opt.(type) {
+ case WindowTrigger:
+ ws.Trigger = opt.Name
+ case AccumulationMode:
+ ws.AccumulationMode = opt.Mode
+ default:
+ panic(fmt.Sprintf("Unknown WindowInto option type: %T:
%v", opt, opt))
+ }
+ }
- edge := graph.NewWindowInto(s.real, s.scope, ws, col.n)
+ edge := graph.NewWindowInto(s.real, s.scope, &ws, col.n)
ret := PCollection{edge.Output[0].To}
return ret, nil
}
diff --git a/sdks/go/test/integration/integration.go
b/sdks/go/test/integration/integration.go
index d76b09a..beeacdf 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -60,6 +60,8 @@ var sickbay = []string{}
var directFilters = []string{
// The direct runner does not yet support cross-language.
"TestXLang.*",
+ // Triggers are not yet supported
+ "TestTrigger.*",
// The direct runner does not support the TestStream primitive
"TestTestStream.*",
}
@@ -67,6 +69,8 @@ var directFilters = []string{
var portableFilters = []string{
// The portable runner does not support the TestStream primitive
"TestTestStream.*",
+ // The trigger tests uses TestStream
+ "TestTrigger.*",
}
var flinkFilters = []string{
@@ -74,6 +78,8 @@ var flinkFilters = []string{
"TestXLang_Combine.*",
// TODO(BEAM-12753): Flink test stream fails for non-string/byte slice
inputs
"TestTestStream.*Sequence.*",
+ // Triggers are not yet supported
+ "TestTrigger.*",
}
var samzaFilters = []string{
@@ -82,6 +88,8 @@ var samzaFilters = []string{
"TestReshuffleKV",
// The Samza runner does not support the TestStream primitive
"TestTestStream.*",
+ // The trigger tests uses TestStream
+ "TestTrigger.*",
}
var sparkFilters = []string{
@@ -91,6 +99,8 @@ var sparkFilters = []string{
"TestParDoKVSideInput",
// The Spark runner does not support the TestStream primitive
"TestTestStream.*",
+ // The trigger tests uses TestStream
+ "TestTrigger.*",
}
var dataflowFilters = []string{
@@ -98,6 +108,8 @@ var dataflowFilters = []string{
"TestFlattenDup",
// The Dataflow runner does not support the TestStream primitive
"TestTestStream.*",
+ // The trigger tests uses TestStream
+ "TestTrigger.*",
}
// CheckFilters checks if an integration test is filtered to be skipped, either
diff --git a/sdks/go/test/integration/primitives/windowinto.go
b/sdks/go/test/integration/primitives/windowinto.go
index 0c2be7f..fbd69b1 100644
--- a/sdks/go/test/integration/primitives/windowinto.go
+++ b/sdks/go/test/integration/primitives/windowinto.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)
@@ -43,7 +44,7 @@ func (f *createTimestampedData) ProcessElement(_ []byte, emit
func(beam.EventTim
}
}
-// WindowsSums produces a pipeline that generates the numbers of a 3x3 magic
square, and
+// WindowSums produces a pipeline that generates the numbers of a 3x3 magic
square, and
// configures the pipeline so that PCollection. Sum is a closure to handle
summing data over the window, in a few conditions.
func WindowSums(s beam.Scope, sumPerKey func(beam.Scope, beam.PCollection)
beam.PCollection) {
timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4,
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
@@ -91,3 +92,91 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+ windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr},
beam.AccumulationMode{Mode: m})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end
of the window
+func TriggerDefault(s beam.Scope) {
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ con.AddElements(12000, 4.0, 5.0)
+ con.AdvanceWatermark(13000)
+
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every
input value as the output.
+func TriggerAlways(s beam.Scope) {
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
window.Trigger{Kind: window.AlwaysTrigger}, window.Discarding, 1.0, 2.0, 3.0)
+}
+
+// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N
elements to be ready
+// to fire an output pane
+func TriggerElementCount(s beam.Scope) {
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(2000)
+ con.AddElements(6000, 4.0, 5.0)
+ con.AdvanceWatermark(10000)
+ con.AddElements(52000, 10.0)
+ con.AdvanceWatermark(53000)
+
+ col := teststream.Create(s, con)
+
+ // waits only for two elements to arrive and fires output after that
and never fires that.
+ // For the trigger to fire every 2 elements, combine it with Repeat
Trigger
+ tr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 2}
+ windowed := beam.WindowInto(s, window.NewGlobalWindows(), col,
beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Count(s, sums, "total collections", 1)
+}
+
+// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires
output panes once 't' processing time has passed
+// Not yet supported by the flink runner:
+// java.lang.UnsupportedOperationException: Advancing Processing time is not
supported by the Flink Runner.
+func TriggerAfterProcessingTime(s beam.Scope) {
+ con := teststream.NewConfig()
+ con.AdvanceProcessingTime(100)
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceProcessingTime(2000)
+ con.AddElements(22000, 4.0)
+
+ col := teststream.Create(s, con)
+
+ validate(s.Scope("Fixed"), window.NewGlobalWindows(), col,
window.Trigger{Kind: window.AfterProcessingTimeTrigger, Delay: 5000},
window.Discarding, 6.0)
+}
+
+// TriggerRepeat tests the repeat trigger. As of now is it is configure to
take only one trigger as a subtrigger.
+// In the below test, it is expected to receive three output panes with two
elements each.
+func TriggerRepeat(s beam.Scope) {
+ // create a teststream pipeline and get the pcollection
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(2000)
+ con.AddElements(6000, 4.0, 5.0, 6.0)
+ con.AdvanceWatermark(10000)
+
+ col := teststream.Create(s, con)
+
+ subTr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount:
2}
+ tr := window.Trigger{Kind: window.RepeatTrigger, SubTriggers:
[]window.Trigger{subTr}}
+ windowed := beam.WindowInto(s, window.NewGlobalWindows(), col,
beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Count(s, sums, "total collections", 3)
+}
diff --git a/sdks/go/test/integration/primitives/windowinto_test.go
b/sdks/go/test/integration/primitives/windowinto_test.go
index 4552176..ab5c6746 100644
--- a/sdks/go/test/integration/primitives/windowinto_test.go
+++ b/sdks/go/test/integration/primitives/windowinto_test.go
@@ -36,3 +36,31 @@ func TestWindowSums_GBK(t *testing.T) {
WindowSums_GBK(s)
ptest.RunAndValidate(t, p)
}
+
+func TestTriggerDefault(t *testing.T) {
+ integration.CheckFilters(t)
+ p, s := beam.NewPipelineWithRoot()
+ TriggerDefault(s)
+ ptest.RunAndValidate(t, p)
+}
+
+func TestTriggerAlways(t *testing.T) {
+ integration.CheckFilters(t)
+ p, s := beam.NewPipelineWithRoot()
+ TriggerAlways(s)
+ ptest.RunAndValidate(t, p)
+}
+
+func TestTriggerElementCount(t *testing.T) {
+ integration.CheckFilters(t)
+ p, s := beam.NewPipelineWithRoot()
+ TriggerElementCount(s)
+ ptest.RunAndValidate(t, p)
+}
+
+func TestTriggerRepeat(t *testing.T) {
+ integration.CheckFilters(t)
+ p, s := beam.NewPipelineWithRoot()
+ TriggerRepeat(s)
+ ptest.RunAndValidate(t, p)
+}