lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687272096
##########
File path: sdks/go/pkg/beam/core/graph/coder/coder.go
##########
@@ -188,6 +188,8 @@ const (
//
// TODO(BEAM-490): once this JIRA is done, this coder should become the
new thing.
CoGBK Kind = "CoGBK"
+
+ PaneInfo Kind = "PaneInfo"
Review comment:
Nit: I'd put this just below Timer in the above group, rather than
isolated by itself down here. Also feel free to have the short version be "PI"
rather than the full "PaneInfo" to match the other "parts of encoded window
infrastructure" convention we have.
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+ // Encoding: typex.PaneInfo
+
+ pane := 0
+ if v.IsFirst {
+ pane |= 1
+ }
+ if v.IsLast {
+ pane |= 2
+ }
+ pane |= v.Timing << 2
+
+ switch {
+ case (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing ==
typex.PaneUnknown:
Review comment:
Nit: I don't think it's necessary to pair the first two conditions with
parenthesese, since it's all a disjunction (||) anyway.
##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+ windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr},
beam.AccumulationMode{Mode: m})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end
of the window
+func TriggerDefault(s beam.Scope) {
+ // create a teststream pipeline and get the pcollection
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ con.AddElements(12000, 4.0, 5.0)
+ con.AdvanceWatermark(13000)
+
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every
input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in
the case of this trigger
+func TriggerAlways(s beam.Scope) {
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
window.Trigger{Kind: window.AlwaysTrigger}, window.Discarding, 1.0, 2.0, 3.0,
0.0)
+}
+
+// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N
elements to be ready
+// to fire an output pane
+func TriggerElementCount(s beam.Scope) {
+ // create a teststream pipeline and get the pcollection
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(2000)
+ con.AddElements(6000, 4.0, 5.0)
+ con.AdvanceWatermark(10000)
+ con.AddElements(52000, 10.0)
+ con.AdvanceWatermark(53000)
+
+ col := teststream.Create(s, con)
+
+ // waits only for two elements to arrive and fires output after that
and never fires that.
+ // For the trigger to fire every 2 elements, combine it with Repeat
Trigger
+ tr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 2}
+ windowed := beam.WindowInto(s, window.NewGlobalWindows(), col,
beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Count(s, sums, "total collections", 1)
Review comment:
I'm guessing we can't assert which two values we get, so that's why we
check with on Count?
##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+ windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr},
beam.AccumulationMode{Mode: m})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end
of the window
+func TriggerDefault(s beam.Scope) {
+ // create a teststream pipeline and get the pcollection
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ con.AddElements(12000, 4.0, 5.0)
+ con.AdvanceWatermark(13000)
+
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every
input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in
the case of this trigger
Review comment:
Consider asking about this extra empty pane. Is it documented? Is it
something wrong with how we're specifying things?
##########
File path: sdks/go/pkg/beam/core/graph/window/strategy.go
##########
@@ -16,12 +16,22 @@
// Package window contains window representation, windowing strategies and
utilities.
package window
+type AccumulationMode string
+
+const (
+ Unspecified AccumulationMode = "AccumulationMode_UNSPECIFIED"
+ Discarding AccumulationMode = "AccumulationMode_DISCARDING"
+ Accumulating AccumulationMode = "AccumulationMode_ACCUMULATING"
+ Retracting AccumulationMode = "AccumulationMode_RETRACTING"
+)
+
// WindowingStrategy defines the types of windowing used in a pipeline and
contains
// the data to support executing a windowing strategy.
type WindowingStrategy struct {
- Fn *Fn
-
- // TODO(BEAM-3304): trigger support
+ Fn *Fn
+ Trigger Trigger
+ AccumulationMode AccumulationMode
+ AllowedLateness int
Review comment:
Consider adding a comment here that it's expected to be in milliseconds
or use a time.Duration instead, and convert to milliseconds later. Explicit
types are friends!
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+ trigger := makeTrigger(w.Trigger)
+ accMode := makeAccumulationMode(w.AccumulationMode)
ws := &pipepb.WindowingStrategy{
WindowFn: windowFn,
MergeStatus: mergeStat,
- AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: windowCoderId,
- Trigger: &pipepb.Trigger{
+ Trigger: trigger,
+ AccumulationMode: accMode,
+ OutputTime: pipepb.OutputTime_EARLIEST_IN_PANE,
Review comment:
This changed from OutputTime_END_OF_WINDOW to this. We probably don't
want that right now.
##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+ windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr},
beam.AccumulationMode{Mode: m})
+ sums := stats.Sum(s, windowed)
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end
of the window
+func TriggerDefault(s beam.Scope) {
+ // create a teststream pipeline and get the pcollection
+ con := teststream.NewConfig()
+ con.AddElements(1000, 1.0, 2.0, 3.0)
+ con.AdvanceWatermark(11000)
+ con.AddElements(12000, 4.0, 5.0)
+ con.AdvanceWatermark(13000)
+
+ col := teststream.Create(s, con)
+ windowSize := 10 * time.Second
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
Review comment:
rm spare blank line.
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+ trigger := makeTrigger(w.Trigger)
+ accMode := makeAccumulationMode(w.AccumulationMode)
ws := &pipepb.WindowingStrategy{
WindowFn: windowFn,
MergeStatus: mergeStat,
- AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: windowCoderId,
- Trigger: &pipepb.Trigger{
+ Trigger: trigger,
+ AccumulationMode: accMode,
+ OutputTime: pipepb.OutputTime_EARLIEST_IN_PANE,
+ ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ AllowedLateness: 10,
Review comment:
This should default to 0, unless we're pulling it from the passed in
strategy.
##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,19 @@ type Window interface {
Equals(o Window) bool
}
+const (
+ PaneEarly int = 0 // EARLY
+ PaneOnTime int = 1 // ON_TIME
+ PaneLate int = 2 // LATE
+ PaneUnknown int = 3 // UNKNOWN
+)
+
+type PaneInfo struct {
+ Timing int
Review comment:
As much as it's probably extra, we don't want this to be an int in type,
as then users might try to manipulate it as an int.
Instead add a new type `type PaneTiming byte` (since we really won't need
more than that presently, or likely, ever) then the above enum declaration can
be
```
const (
PaneEarly PaneTiming = 0
PaneOnTime PaneTiming = 1
PaneLate PaneTiming = 2
PaneUnknown PaneTiming = 3
)
```
(Since we have specific definitions, we can't lean on an iota, even though
it comes out the same. We want them to match their defined meaning in the coder
definition).
The net result is that in order to use it in math/etc, a user would need to
explicitly type cast to a numeric value. This avoids being able to misuse the
enum type as a value rather than symbolically which is all it needs to have.
This is necessary since the PaneInfo will eventually exposed to user DoFns,
and it needs to be hard to misuse, which we're doing with the type system.
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+ // Encoding: typex.PaneInfo
+
+ pane := 0
Review comment:
It is probably worth converting this to a byte(0), or uint8(0) early, to
avoid errors sooner. It also telegraphs the up coming bitwise operations.
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+ // Encoding: typex.PaneInfo
+
+ pane := 0
+ if v.IsFirst {
+ pane |= 1
+ }
+ if v.IsLast {
+ pane |= 2
Review comment:
Minor Nit: Please use the hex notation for the bitwise operations to
make it much clearer to readers that the bits are significant. In this case the
0x01 and 0x02 will be unambiguous what we're doing with the byte layout.
Yes, more verbose, but also unambiguous.
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+ trigger := makeTrigger(w.Trigger)
+ accMode := makeAccumulationMode(w.AccumulationMode)
ws := &pipepb.WindowingStrategy{
WindowFn: windowFn,
MergeStatus: mergeStat,
- AccumulationMode: pipepb.AccumulationMode_DISCARDING,
WindowCoderId: windowCoderId,
- Trigger: &pipepb.Trigger{
+ Trigger: trigger,
+ AccumulationMode: accMode,
+ OutputTime: pipepb.OutputTime_EARLIEST_IN_PANE,
+ ClosingBehavior: pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+ AllowedLateness: 10,
+ OnTimeBehavior: pipepb.OnTimeBehavior_FIRE_ALWAYS,
+ EnvironmentId: "",
Review comment:
We can also keep this unset.
##########
File path: sdks/go/test/integration/primitives/windowinto_test.go
##########
@@ -36,3 +36,24 @@ func TestWindowSums_GBK(t *testing.T) {
WindowSums_GBK(s)
ptest.RunAndValidate(t, p)
}
+
+func TestTriggerDefault(t *testing.T) {
+ integration.CheckFilters(t)
Review comment:
This has been mentioned elsewhere, since we're using TestStream here, we
need to filter these tests out from everything but the Flink runner.
Fortunately, we can use regex and just cover the prefix "TestTrigger"
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+type Trigger struct {
+ Kind string
+ SubTriggers []Trigger
+ Delay int64
Review comment:
Same comment here about what the int64 type is representing, or use a
time.Duration
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w
*window.WindowingStrategy) (
} else {
mergeStat = pipepb.MergeStatus_NON_MERGING
}
+ trigger := makeTrigger(w.Trigger)
+ accMode := makeAccumulationMode(w.AccumulationMode)
Review comment:
We can inline these calls to the proto field assignments instead of
assigning to variables first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]