lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r687272096



##########
File path: sdks/go/pkg/beam/core/graph/coder/coder.go
##########
@@ -188,6 +188,8 @@ const (
        //
        // TODO(BEAM-490): once this JIRA is done, this coder should become the 
new thing.
        CoGBK Kind = "CoGBK"
+
+       PaneInfo Kind = "PaneInfo"

Review comment:
       Nit: I'd put this just below Timer in the above group, rather than 
isolated by itself down here.  Also feel free to have the short version be "PI" 
rather than the full "PaneInfo" to match the other "parts of encoded window 
infrastructure" convention we have.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+       // Encoding: typex.PaneInfo
+
+       pane := 0
+       if v.IsFirst {
+               pane |= 1
+       }
+       if v.IsLast {
+               pane |= 2
+       }
+       pane |= v.Timing << 2
+
+       switch {
+       case (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == 
typex.PaneUnknown:

Review comment:
       Nit: I don't think it's necessary to pair the first two conditions with 
parenthesese, since it's all a disjunction (||) anyway.

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr 
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+       windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, 
beam.AccumulationMode{Mode: m})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end 
of the window
+func TriggerDefault(s beam.Scope) {
+       // create a teststream pipeline and get the pcollection
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       con.AddElements(12000, 4.0, 5.0)
+       con.AdvanceWatermark(13000)
+
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every 
input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in 
the case of this trigger
+func TriggerAlways(s beam.Scope) {
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.AlwaysTrigger}, window.Discarding, 1.0, 2.0, 3.0, 
0.0)
+}
+
+// TriggerElementCount tests the ElementCount Trigger, it waits for atleast N 
elements to be ready
+// to fire an output pane
+func TriggerElementCount(s beam.Scope) {
+       // create a teststream pipeline and get the pcollection
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(2000)
+       con.AddElements(6000, 4.0, 5.0)
+       con.AdvanceWatermark(10000)
+       con.AddElements(52000, 10.0)
+       con.AdvanceWatermark(53000)
+
+       col := teststream.Create(s, con)
+
+       // waits only for two elements to arrive and fires output after that 
and never fires that.
+       // For the trigger to fire every 2 elements, combine it with Repeat 
Trigger
+       tr := window.Trigger{Kind: window.ElementCountTrigger, ElementCount: 2}
+       windowed := beam.WindowInto(s, window.NewGlobalWindows(), col, 
beam.WindowTrigger{Name: tr}, beam.AccumulationMode{Mode: window.Discarding})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Count(s, sums, "total collections", 1)

Review comment:
       I'm guessing we can't assert which two values we get, so that's why we 
check with on Count?

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr 
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+       windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, 
beam.AccumulationMode{Mode: m})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end 
of the window
+func TriggerDefault(s beam.Scope) {
+       // create a teststream pipeline and get the pcollection
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       con.AddElements(12000, 4.0, 5.0)
+       con.AdvanceWatermark(13000)
+
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+
+}
+
+// TriggerAlways tests the Always trigger, it is expected to receive every 
input value as the output.
+// It also return an extra empty pane. Not sure why it is so. It is only in 
the case of this trigger

Review comment:
       Consider asking about this extra empty pane. Is it documented? Is it 
something wrong with how we're specifying things?

##########
File path: sdks/go/pkg/beam/core/graph/window/strategy.go
##########
@@ -16,12 +16,22 @@
 // Package window contains window representation, windowing strategies and 
utilities.
 package window
 
+type AccumulationMode string
+
+const (
+       Unspecified  AccumulationMode = "AccumulationMode_UNSPECIFIED"
+       Discarding   AccumulationMode = "AccumulationMode_DISCARDING"
+       Accumulating AccumulationMode = "AccumulationMode_ACCUMULATING"
+       Retracting   AccumulationMode = "AccumulationMode_RETRACTING"
+)
+
 // WindowingStrategy defines the types of windowing used in a pipeline and 
contains
 // the data to support executing a windowing strategy.
 type WindowingStrategy struct {
-       Fn *Fn
-
-       // TODO(BEAM-3304): trigger support
+       Fn               *Fn
+       Trigger          Trigger
+       AccumulationMode AccumulationMode
+       AllowedLateness  int

Review comment:
       Consider adding a comment here that it's expected to be in milliseconds 
or use a time.Duration instead, and convert to milliseconds later. Explicit 
types are friends!

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger := makeTrigger(w.Trigger)
+       accMode := makeAccumulationMode(w.AccumulationMode)
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
-               AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               AccumulationMode: accMode,
+               OutputTime:       pipepb.OutputTime_EARLIEST_IN_PANE,

Review comment:
       This changed from OutputTime_END_OF_WINDOW to this. We probably don't 
want that right now.
   
   

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +92,60 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+func validate(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr 
window.Trigger, m window.AccumulationMode, expected ...interface{}) {
+       windowed := beam.WindowInto(s, wfn, in, beam.WindowTrigger{Name: tr}, 
beam.AccumulationMode{Mode: m})
+       sums := stats.Sum(s, windowed)
+       sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+       passert.Equals(s, sums, expected...)
+}
+
+// TriggerDefault tests the default trigger which fires the pane after the end 
of the window
+func TriggerDefault(s beam.Scope) {
+       // create a teststream pipeline and get the pcollection
+       con := teststream.NewConfig()
+       con.AddElements(1000, 1.0, 2.0, 3.0)
+       con.AdvanceWatermark(11000)
+       con.AddElements(12000, 4.0, 5.0)
+       con.AdvanceWatermark(13000)
+
+       col := teststream.Create(s, con)
+       windowSize := 10 * time.Second
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, 
window.Trigger{Kind: window.DefaultTrigger}, window.Accumulating, 6.0, 9.0)
+

Review comment:
       rm spare blank line.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger := makeTrigger(w.Trigger)
+       accMode := makeAccumulationMode(w.AccumulationMode)
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
-               AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               AccumulationMode: accMode,
+               OutputTime:       pipepb.OutputTime_EARLIEST_IN_PANE,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  10,

Review comment:
       This should default to 0, unless we're pulling it from the passed in 
strategy.

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,19 @@ type Window interface {
        Equals(o Window) bool
 }
 
+const (
+       PaneEarly   int = 0 // EARLY
+       PaneOnTime  int = 1 // ON_TIME
+       PaneLate    int = 2 // LATE
+       PaneUnknown int = 3 // UNKNOWN
+)
+
+type PaneInfo struct {
+       Timing                     int

Review comment:
       As much as it's probably extra, we don't want this to be an int in type, 
as then users might try to manipulate it as an int.
   Instead add a new type `type PaneTiming byte`  (since we really won't need 
more than that presently, or likely, ever) then the above enum declaration can 
be
   
   ```
   const (
        PaneEarly   PaneTiming = 0 
        PaneOnTime  PaneTiming = 1 
        PaneLate    PaneTiming = 2 
        PaneUnknown PaneTiming = 3 
   )
   ```
   (Since we have specific definitions, we can't lean on an iota, even though 
it comes out the same. We want them to match their defined meaning in the coder 
definition).
   
   The net result is that in order to use it in math/etc, a user would need to 
explicitly type cast to a numeric value. This avoids being able to misuse the 
enum type as a value rather than symbolically which is all it needs to have. 
   
   This is necessary since the PaneInfo will eventually exposed to user DoFns, 
and it needs to be hard to misuse, which we're doing with the type system.
   
   

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+       // Encoding: typex.PaneInfo
+
+       pane := 0

Review comment:
       It is probably worth converting this to a byte(0), or uint8(0) early, to 
avoid errors sooner. It also telegraphs the up coming bitwise operations.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+// EncodePane encodes a typex.PaneInfo.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+       // Encoding: typex.PaneInfo
+
+       pane := 0
+       if v.IsFirst {
+               pane |= 1
+       }
+       if v.IsLast {
+               pane |= 2

Review comment:
       Minor Nit: Please use the hex notation for the bitwise operations to 
make it much clearer to readers that the bits are significant. In this case the 
0x01 and 0x02 will be unambiguous what we're doing with the byte layout.
   
   Yes, more verbose, but also unambiguous.

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger := makeTrigger(w.Trigger)
+       accMode := makeAccumulationMode(w.AccumulationMode)
        ws := &pipepb.WindowingStrategy{
                WindowFn:         windowFn,
                MergeStatus:      mergeStat,
-               AccumulationMode: pipepb.AccumulationMode_DISCARDING,
                WindowCoderId:    windowCoderId,
-               Trigger: &pipepb.Trigger{
+               Trigger:          trigger,
+               AccumulationMode: accMode,
+               OutputTime:       pipepb.OutputTime_EARLIEST_IN_PANE,
+               ClosingBehavior:  pipepb.ClosingBehavior_EMIT_IF_NONEMPTY,
+               AllowedLateness:  10,
+               OnTimeBehavior:   pipepb.OnTimeBehavior_FIRE_ALWAYS,
+               EnvironmentId:    "",

Review comment:
       We can also keep this unset.

##########
File path: sdks/go/test/integration/primitives/windowinto_test.go
##########
@@ -36,3 +36,24 @@ func TestWindowSums_GBK(t *testing.T) {
        WindowSums_GBK(s)
        ptest.RunAndValidate(t, p)
 }
+
+func TestTriggerDefault(t *testing.T) {
+       integration.CheckFilters(t)

Review comment:
       This has been mentioned elsewhere, since we're using TestStream here, we 
need to filter these tests out from everything but the Flink runner. 
Fortunately, we can use regex and just cover the prefix "TestTrigger"

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+type Trigger struct {
+       Kind         string
+       SubTriggers  []Trigger
+       Delay        int64

Review comment:
       Same comment here about what the int64 type is representing, or use a 
time.Duration

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -981,22 +981,114 @@ func marshalWindowingStrategy(c *CoderMarshaller, w 
*window.WindowingStrategy) (
        } else {
                mergeStat = pipepb.MergeStatus_NON_MERGING
        }
+       trigger := makeTrigger(w.Trigger)
+       accMode := makeAccumulationMode(w.AccumulationMode)

Review comment:
       We can inline these calls to the proto field assignments instead of 
assigning to variables first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to