[
https://issues.apache.org/jira/browse/BEAM-3304?focusedWorklogId=633870&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-633870
]
ASF GitHub Bot logged work on BEAM-3304:
----------------------------------------
Author: ASF GitHub Bot
Created on: 04/Aug/21 22:53
Start Date: 04/Aug/21 22:53
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r682988766
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+ FIRST int = 0
+ ONE_INDEX int = 1
+ TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+ if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing ==
typex.UNKNOWN {
+ return FIRST
+ } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+ return ONE_INDEX
+ } else {
+ return TWO_INDEX
+ }
Review comment:
Prefer using a switch statment for things like this instead of relying
on if-else ladders.
```suggestion
switch {
case v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing ==
typex.UNKNOWN:
return FIRST
case v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY:
return ONE_INDEX
default:
return TWO_INDEX
}
```
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+ FIRST int = 0
+ ONE_INDEX int = 1
+ TWO_INDEX int = 2
Review comment:
In Go, constants, exported or not should use MixedCaps if they're
Exported or mixedCaps if they're unexported never ALL_CAPS.
##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
Equals(o Window) bool
}
+type Timing string
Review comment:
Instead of using a string, consider using an `int` (or even a uint8) as
the base type. It's more compact in memory. It also lets one avoid doing string
comparisons to know values for encodings, and avoids some awkward conversions
that this PR is currently making.
See https://yourbasic.org/golang/iota/ for a good rundown for how to do
"enums" in Go. As well as in Effective Go.
https://golang.org/doc/effective_go#constants
##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
Equals(o Window) bool
}
+type Timing string
+
+const (
+ EARLY Timing = "EARLY"
+ ON_TIME Timing = "ON_TIME"
+ LATE Timing = "LATE"
+ UNKNOWN Timing = "UNKNOWN"
+)
+
+type PaneInfo struct {
+ Timing Timing
+ IsFirst bool
+ IsLast bool
+ Index int64
+ NonSpeculativeIndex int64
Review comment:
Note, there's nothing wrong with the current approach, I'm pointing out
a thing about Go:
In this case, you can write these fields more compactly as:
```suggestion
IsFirst, IsLast bool
Index, NonSpeculativeIndex int64
```
##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
Equals(o Window) bool
}
+type Timing string
+
+const (
+ EARLY Timing = "EARLY"
+ ON_TIME Timing = "ON_TIME"
+ LATE Timing = "LATE"
+ UNKNOWN Timing = "UNKNOWN"
Review comment:
Despite these being constants, it's not idiomatic go to use All caps for
constants.
I also recommend prefixing what these are related to eg. PaneEarly,
PaneOnTime, PaneLate, PaneUnknown
Mostly so they're adjacent in Go Doc. vs suffixing, which is a bit more
readable.
We can only get away without prefixing because we don't have a "pane"
package which would delare what they're for earlier. In this case we shouldn't
have a pane package, it would be too small for practical uses.
This is because the typex package has a pretty grab bag set of values, so
the enumerations need to be made clearer for their use elsewhere.
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+ Default TriggerType = "Trigger_Default_"
+ Always TriggerType = "Trigger_Always_"
+ AfterAny TriggerType = "Trigger_AfterAny_"
+ AfterAll TriggerType = "Trigger_AfterAll_"
+)
+
+func (ws *WindowingStrategy) SetAfterAll() {
+ ws.Trigger = AfterAll
+}
+
+func (ws *WindowingStrategy) SetAfterAny() {
+ ws.Trigger = AfterAny
+}
+
+func (ws *WindowingStrategy) SetAlways() {
+ ws.Trigger = Always
+}
+
+func (ws *WindowingStrategy) SetDefault() {
+ ws.Trigger = Default
+}
Review comment:
We can probably remove these helper methods since they aren't being
called, in favour of directly setting the fields ourselves.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1088,25 +1121,22 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws
[]typex.Window, t typex.Eve
if err := enc.Encode(ws, w); err != nil {
return err
}
- _, err := w.Write(paneNoFiring)
+ err := coder.EncodePane(p, w)
return err
}
// DecodeWindowedValueHeader deserializes a windowed value header.
-func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader)
([]typex.Window, typex.EventTime, error) {
+func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader)
([]typex.Window, typex.EventTime, typex.PaneInfo, error) {
// Encoding: Timestamp, Window, Pane (header) + Element
-
+ pn := typex.PaneInfo{}
Review comment:
It would be more idiomatic to declare the zero value in place for the
returns, which makes it explicit that they're the zero value.
The reason for this is to limit scope, and make it easier on readers. If
possible we keep that as small as possible. With a pre-declaration like this, a
reader has to keep an eye out for it, for when it's used, and what might have
affected it before it's returned. By writing our intent (return a zero value on
error) directly where it's happening, we avoid this.
That said, I understand the repetition can be tiresome. You can instead have
defined in this function scope helper function
`onError := func(err error) { return nil, mtime.ZeroTimestamp,
typex.PaneInfo{}, err }`
Which you can then call as `return onError(err)` and it will compile down to
the same thing.
Non-pointer zeros being inconvenient is a hot topic of discussion for Go and
eventually someone will propose a satisfying solution for it.
##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,40 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
+type WindowIntoOption interface {
+ windowIntoOption()
+}
+
+type WindowTrigger struct {
+ Name window.TriggerType
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
// WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
- return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts
...WindowIntoOption) PCollection {
+ return Must(TryWindowInto(s, ws, col, opts...))
}
// TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection,
error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts
...WindowIntoOption) (PCollection, error) {
if !s.IsValid() {
return PCollection{}, errors.New("invalid scope")
}
if !col.IsValid() {
return PCollection{}, errors.New("invalid input pcollection")
}
+ ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Default}
+ for _, opt := range opts {
+ switch opt.(type) {
+ case WindowTrigger:
+ ws.Trigger = opt.(WindowTrigger).Name
+ default:
+ ws.Trigger = window.Default
Review comment:
Since this switch is only run if we have options, it's valid to return
an error or even `panic`if we don't know what type of option it is. That way
users won't accidently override earlier set triggers because an implementation
was half done somehow.
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+ FIRST int = 0
+ ONE_INDEX int = 1
+ TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+ if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing ==
typex.UNKNOWN {
+ return FIRST
+ } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+ return ONE_INDEX
+ } else {
+ return TWO_INDEX
+ }
+}
+
+func timing(v typex.Timing) int {
+ if v == typex.EARLY {
+ return 0
+ } else if v == typex.ON_TIME {
+ return 1
+ } else if v == typex.LATE {
+ return 2
+ } else {
+ return 3
+ }
+}
+
+// EncodePane encodes a single byte.
Review comment:
```suggestion
// EncodePane encodes a typex.PaneInfo.
```
##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+ Default TriggerType = "Trigger_Default_"
+ Always TriggerType = "Trigger_Always_"
+ AfterAny TriggerType = "Trigger_AfterAny_"
+ AfterAll TriggerType = "Trigger_AfterAll_"
Review comment:
WRT to the const names here, we probably want to call these
DefaultTrigger, AlwaysTrigger, etc, since users will see these as
"window.Default" and "window.Always" and it won't necessarily be clear what
they mean.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -134,7 +134,7 @@ func (n *DataSource) Process(ctx context.Context) error {
}
pe.Timestamp = t
pe.Windows = ws
-
+ pe.Pane = pn
Review comment:
Please keep the blank line before the next block.
Blank lines used carefully help readability.
##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,32 @@ func WindowSums_GBK(s beam.Scope) {
func WindowSums_Lifted(s beam.Scope) {
WindowSums(s.Scope("Lifted"), stats.SumPerKey)
}
+
+// TriggerWindowSums, much like WindowSums described above has an addition of
configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways
throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope,
beam.PCollection) beam.PCollection) {
+ timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4,
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+ windowSize := 3 * time.Second
+
+ validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection,
expected ...interface{}) {
+ // Window the data.
+ windowed := beam.WindowInto(s, wfn, in,
beam.WindowTrigger{Name: window.Always})
+ // To get the pane decoding error, change above statement to
+ // windowed := beam.WindowInto(s, wfn, in,
beam.WindowTrigger{Name: window.Always})
+ // Perform the appropriate sum operation.
+ sums := sumPerKey(s, windowed)
+ // Drop back to Global windows, and drop the key otherwise
passert.Equals doesn't work.
+ sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+ sums = beam.DropKey(s, sums)
+ passert.Equals(s, sums, expected...)
+ }
+
+ // Use fixed windows to divide the data into 3 chunks.
+ validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize),
timestampedData, 15, 15, 15)
Review comment:
Doesn't the "Always" trigger happen on every element? Should we be
actually expecting the sums, or the individual elements? (I don't know what's
correct here, outside of the default trigger.)
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+ FIRST int = 0
+ ONE_INDEX int = 1
+ TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+ if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing ==
typex.UNKNOWN {
+ return FIRST
+ } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+ return ONE_INDEX
+ } else {
+ return TWO_INDEX
+ }
+}
+
+func timing(v typex.Timing) int {
+ if v == typex.EARLY {
+ return 0
+ } else if v == typex.ON_TIME {
+ return 1
+ } else if v == typex.LATE {
+ return 2
+ } else {
+ return 3
+ }
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+ // Encoding: typex.PaneInfo
+
+ pane := 0
+ if v.IsFirst {
+ pane |= 1
+ }
+ if v.IsLast {
+ pane |= 2
+ }
+ pane |= timing(v.Timing) << 2
+
+ switch chooseEncoding(v) {
+ case FIRST:
+ paneByte := []byte{byte(pane)}
+ w.Write(paneByte)
+ case ONE_INDEX:
+ paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+ w.Write(paneByte)
+ EncodeVarInt(v.Index, w)
+ case TWO_INDEX:
+ paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+ w.Write(paneByte)
+ EncodeVarInt(v.Index, w)
+ EncodeVarInt(v.NonSpeculativeIndex, w)
+ }
+ return nil
+}
+
+func encodingType(b byte) int64 {
+ return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+ pn := typex.PaneInfo{}
+ if b&0x01 == 1 {
+ pn.IsFirst = true
+ }
+ if b&0x02 == 2 {
+ pn.IsLast = true
+ }
+ switch int64((b >> 2) & 0x03) {
+ case 0:
+ pn.Timing = typex.EARLY
+ case 1:
+ pn.Timing = typex.ON_TIME
+ case 2:
+ pn.Timing = typex.LATE
+ case 3:
+ pn.Timing = typex.UNKNOWN
+ }
+
+ return pn
+}
+
+// DecodePane decodes a single byte.
+func DecodePane(r io.Reader) (typex.PaneInfo, error) {
+ // Decoding: typex.PaneInfo
+ var data [1]byte
+ if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { // NO_FIRING
pane
+ return typex.PaneInfo{}, err
+ }
+ pn := NewPane(data[0] & 0x0f)
+ switch encodingType(data[0]) {
Review comment:
Cleaning up this switch statement I'll leave as an excercise based on my
other comments.
##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
##########
@@ -117,7 +118,7 @@ func (x *translator) translateTransform(trunk string, id
string) ([]*df.Step, er
// URL Query-escaped windowed _unnested_ value. It is read back
in
// a nested context at runtime.
var buf bytes.Buffer
- if err :=
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()),
window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+ if err :=
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()),
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.PaneInfo{}, &buf); err !=
nil {
Review comment:
Question: Is the zero value for your pane info type identical to the "No
firing" pane we've been writing? If so, very nice!
Optionally, also add a function to the typex package: typex.NoFiringPane()
to document the intended semantic meaning of the type at the call sights like
this.
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+ FIRST int = 0
+ ONE_INDEX int = 1
+ TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+ if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing ==
typex.UNKNOWN {
+ return FIRST
+ } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+ return ONE_INDEX
+ } else {
+ return TWO_INDEX
+ }
+}
+
+func timing(v typex.Timing) int {
+ if v == typex.EARLY {
+ return 0
+ } else if v == typex.ON_TIME {
+ return 1
+ } else if v == typex.LATE {
+ return 2
+ } else {
+ return 3
+ }
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+ // Encoding: typex.PaneInfo
+
+ pane := 0
+ if v.IsFirst {
+ pane |= 1
+ }
+ if v.IsLast {
+ pane |= 2
+ }
+ pane |= timing(v.Timing) << 2
+
+ switch chooseEncoding(v) {
+ case FIRST:
+ paneByte := []byte{byte(pane)}
+ w.Write(paneByte)
+ case ONE_INDEX:
+ paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+ w.Write(paneByte)
+ EncodeVarInt(v.Index, w)
+ case TWO_INDEX:
+ paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+ w.Write(paneByte)
+ EncodeVarInt(v.Index, w)
+ EncodeVarInt(v.NonSpeculativeIndex, w)
+ }
+ return nil
+}
+
+func encodingType(b byte) int64 {
+ return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+ pn := typex.PaneInfo{}
+ if b&0x01 == 1 {
+ pn.IsFirst = true
+ }
+ if b&0x02 == 2 {
+ pn.IsLast = true
+ }
+ switch int64((b >> 2) & 0x03) {
+ case 0:
+ pn.Timing = typex.EARLY
+ case 1:
+ pn.Timing = typex.ON_TIME
+ case 2:
+ pn.Timing = typex.LATE
+ case 3:
+ pn.Timing = typex.UNKNOWN
+ }
Review comment:
If you define those arrival conditions (early late etc) as based on
integers or uints with iota, you can replace this whole switch with: `pn.Timing
= typex.Timing((b >> 2) & 0x03)`
##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1079,7 +1112,7 @@ func (*intervalWindowDecoder) DecodeSingle(r io.Reader)
(typex.Window, error) {
var paneNoFiring = []byte{0xf}
Review comment:
If we aren't using this variable anywhere anymore we can remove it.
##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+ "io"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+ FIRST int = 0
+ ONE_INDEX int = 1
+ TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+ if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing ==
typex.UNKNOWN {
+ return FIRST
+ } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+ return ONE_INDEX
+ } else {
+ return TWO_INDEX
+ }
Review comment:
As a result, we can probably just move these conditions into the
encoder, rather than relating them through constants, and have a comment that
explains their differences, which you're using as a the constant name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 633870)
Time Spent: 2h 10m (was: 2h)
> Go triggering support
> ---------------------
>
> Key: BEAM-3304
> URL: https://issues.apache.org/jira/browse/BEAM-3304
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Ritesh Ghorse
> Priority: P3
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> `Add support for triggers.
> [https://beam.apache.org/documentation/programming-guide/#triggers]
> Triggers are special runner side behavior indicating how to handle data WRT
> the watermark and window. Commonly configuring the processing for “late data”
> and similar.
> These are not currently implemented for user use in the Go SDK. Reshuffle
> configures triggers, but it’s not accessible. A correct trigger
> implementation can at least re-implement Reshuffle in a user pipeline, rather
> than handled specially within the framework.
> * Requires extending the window package to be able to configure the various
> triggers.
> * Specifically being able to compose triggers as also permitted by the proto.
> **
> [https://github.com/apache/beam/blob/6e7b1c44bc7275ee047afc059fd610cd3f4e5bee/model/pipeline/src/main/proto/beam_runner_api.proto#L1111]
>
> * Requires updating the graphx package translate.go to marshal (and
> unmarshal?) the triggers to and from Beam PipelineProto Windowing strategies.
> * Requires supporting triggers with the beam.WindowInto transform for user
> pipeline use as well as complete documentation on its use from the user side.
> **
> [https://github.com/apache/beam/blob/6e7b1c44bc7275ee047afc059fd610cd3f4e5bee/sdks/go/pkg/beam/windowing.go]
>
> * Panes need to be decoded, otherwise triggering will cause runtime errors:
> [https://lists.apache.org/thread.html/r94c42d2d116f6464cd6b689543e5e578edf8310bf7c6e48a0958a56c%40%3Cdev.beam.apache.org%3E]
>
> * Handle pane propagation and observation in the exec package, and in user
> dofns.
> ** Panes indicate whether data was on time or not, and similar facets which
> may be relevant for processing.
> ** Might simply extend the existing window interface.
>
> Similar to windowing, many of the same places as
> https://issues.apache.org/jira/browse/BEAM-11100 need to be modified.
> At simplest though, it's mostly a runner side construction, with less concern
> on the exec side, and generally much simpler.
> Appropriate integration tests against portable runners must be implemented:
> [https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives]
>
> And optionally add support for the configurable triggers to the the Go Direct
> Runner. However, the results must be compared and validated against a
> semantically correct runner like the python portable runner first. At
> minimum, the Go Direct Runner should be made aware of triggers and produce a
> coherent error whenever there's a trigger it can't deal with.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)