lostluck commented on a change in pull request #15239:
URL: https://github.com/apache/beam/pull/15239#discussion_r682988766



##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+       FIRST     int = 0
+       ONE_INDEX int = 1
+       TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+       if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == 
typex.UNKNOWN {
+               return FIRST
+       } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+               return ONE_INDEX
+       } else {
+               return TWO_INDEX
+       }

Review comment:
       Prefer using a switch statment for things like this instead of relying 
on if-else ladders.
   ```suggestion
        switch {
        case v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == 
typex.UNKNOWN:
                return FIRST
        case v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY:
                return ONE_INDEX
        default:
                return TWO_INDEX
        }
   ```

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+       FIRST     int = 0
+       ONE_INDEX int = 1
+       TWO_INDEX int = 2

Review comment:
       In Go, constants, exported or not should use MixedCaps if they're 
Exported or mixedCaps if they're unexported never ALL_CAPS.

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
        Equals(o Window) bool
 }
 
+type Timing string

Review comment:
       Instead of using a string, consider using an `int` (or even a uint8) as 
the base type. It's more compact in memory. It also lets one avoid doing string 
comparisons to know values for encodings, and avoids some awkward conversions 
that this PR is currently making.
   
   See https://yourbasic.org/golang/iota/ for a good rundown for how to do 
"enums" in Go. As well as in Effective Go. 
https://golang.org/doc/effective_go#constants

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
        Equals(o Window) bool
 }
 
+type Timing string
+
+const (
+       EARLY   Timing = "EARLY"
+       ON_TIME Timing = "ON_TIME"
+       LATE    Timing = "LATE"
+       UNKNOWN Timing = "UNKNOWN"
+)
+
+type PaneInfo struct {
+       Timing              Timing
+       IsFirst             bool
+       IsLast              bool
+       Index               int64
+       NonSpeculativeIndex int64

Review comment:
       Note, there's nothing wrong with the current approach, I'm pointing out 
a thing about Go:
   
   In this case, you can write these fields more compactly as: 
   ```suggestion
        IsFirst, IsLast                         bool
        Index, NonSpeculativeIndex              int64
   ```

##########
File path: sdks/go/pkg/beam/core/typex/special.go
##########
@@ -64,6 +65,23 @@ type Window interface {
        Equals(o Window) bool
 }
 
+type Timing string
+
+const (
+       EARLY   Timing = "EARLY"
+       ON_TIME Timing = "ON_TIME"
+       LATE    Timing = "LATE"
+       UNKNOWN Timing = "UNKNOWN"

Review comment:
       Despite these being constants, it's not idiomatic go to use All caps for 
constants. 
   
   I also recommend prefixing what these are related to eg. PaneEarly, 
PaneOnTime, PaneLate, PaneUnknown 
   
   Mostly so they're adjacent in Go Doc. vs suffixing, which is a bit more 
readable.
   
   We can only get away without prefixing because we don't have a "pane" 
package which would delare what they're for earlier. In this case we shouldn't 
have a pane package, it would be too small for practical uses.
   
   This is because the typex package has a pretty grab bag set of values, so 
the enumerations need to be made clearer for their use elsewhere.
   
   

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+       Default  TriggerType = "Trigger_Default_"
+       Always   TriggerType = "Trigger_Always_"
+       AfterAny TriggerType = "Trigger_AfterAny_"
+       AfterAll TriggerType = "Trigger_AfterAll_"
+)
+
+func (ws *WindowingStrategy) SetAfterAll() {
+       ws.Trigger = AfterAll
+}
+
+func (ws *WindowingStrategy) SetAfterAny() {
+       ws.Trigger = AfterAny
+}
+
+func (ws *WindowingStrategy) SetAlways() {
+       ws.Trigger = Always
+}
+
+func (ws *WindowingStrategy) SetDefault() {
+       ws.Trigger = Default
+}

Review comment:
       We can probably remove these helper methods since they aren't being 
called, in favour of directly setting the fields ourselves.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1088,25 +1121,22 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws 
[]typex.Window, t typex.Eve
        if err := enc.Encode(ws, w); err != nil {
                return err
        }
-       _, err := w.Write(paneNoFiring)
+       err := coder.EncodePane(p, w)
        return err
 }
 
 // DecodeWindowedValueHeader deserializes a windowed value header.
-func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) 
([]typex.Window, typex.EventTime, error) {
+func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) 
([]typex.Window, typex.EventTime, typex.PaneInfo, error) {
        // Encoding: Timestamp, Window, Pane (header) + Element
-
+       pn := typex.PaneInfo{}

Review comment:
       It would be more idiomatic to declare the zero value in place for the 
returns, which makes it explicit that they're the zero value. 
   The reason for this is to limit scope, and make it easier on readers. If 
possible we keep that as small as possible. With a pre-declaration like this, a 
reader has to keep an eye out for it, for when it's used, and what might have 
affected it before it's returned. By writing our intent (return a zero value on 
error) directly where it's happening, we avoid this.
   
   That said, I understand the repetition can be tiresome. You can instead have 
defined in this function scope helper function
   
   `onError := func(err error) { return  nil, mtime.ZeroTimestamp, 
typex.PaneInfo{}, err }`
   
   Which you can then call as `return onError(err)` and it will compile down to 
the same thing.
   
   Non-pointer zeros being inconvenient is a hot topic of discussion for Go and 
eventually someone will propose a satisfying solution for it.

##########
File path: sdks/go/pkg/beam/windowing.go
##########
@@ -21,21 +21,40 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
+type WindowIntoOption interface {
+       windowIntoOption()
+}
+
+type WindowTrigger struct {
+       Name window.TriggerType
+}
+
+func (t WindowTrigger) windowIntoOption() {}
+
 // WindowInto applies the windowing strategy to each element.
-func WindowInto(s Scope, ws *window.Fn, col PCollection) PCollection {
-       return Must(TryWindowInto(s, ws, col))
+func WindowInto(s Scope, ws *window.Fn, col PCollection, opts 
...WindowIntoOption) PCollection {
+       return Must(TryWindowInto(s, ws, col, opts...))
 }
 
 // TryWindowInto attempts to insert a WindowInto transform.
-func TryWindowInto(s Scope, ws *window.Fn, col PCollection) (PCollection, 
error) {
+func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts 
...WindowIntoOption) (PCollection, error) {
        if !s.IsValid() {
                return PCollection{}, errors.New("invalid scope")
        }
        if !col.IsValid() {
                return PCollection{}, errors.New("invalid input pcollection")
        }
+       ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Default}
+       for _, opt := range opts {
+               switch opt.(type) {
+               case WindowTrigger:
+                       ws.Trigger = opt.(WindowTrigger).Name
+               default:
+                       ws.Trigger = window.Default

Review comment:
       Since this switch is only run if we have options, it's valid to return 
an error or even `panic`if we don't know what type of option it is. That way 
users won't accidently override earlier set triggers because an implementation 
was half done somehow.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+       FIRST     int = 0
+       ONE_INDEX int = 1
+       TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+       if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == 
typex.UNKNOWN {
+               return FIRST
+       } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+               return ONE_INDEX
+       } else {
+               return TWO_INDEX
+       }
+}
+
+func timing(v typex.Timing) int {
+       if v == typex.EARLY {
+               return 0
+       } else if v == typex.ON_TIME {
+               return 1
+       } else if v == typex.LATE {
+               return 2
+       } else {
+               return 3
+       }
+}
+
+// EncodePane encodes a single byte.

Review comment:
       ```suggestion
   // EncodePane encodes a typex.PaneInfo.
   ```

##########
File path: sdks/go/pkg/beam/core/graph/window/trigger.go
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package window
+
+// TODO [BEAM-3304](riteshghorse): add configurable parameters to trigger
+type TriggerType string
+
+const (
+       Default  TriggerType = "Trigger_Default_"
+       Always   TriggerType = "Trigger_Always_"
+       AfterAny TriggerType = "Trigger_AfterAny_"
+       AfterAll TriggerType = "Trigger_AfterAll_"

Review comment:
       WRT to the const names here, we probably want to call these 
DefaultTrigger, AlwaysTrigger, etc, since users will see these as  
"window.Default" and "window.Always" and it won't necessarily be clear what 
they mean.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -134,7 +134,7 @@ func (n *DataSource) Process(ctx context.Context) error {
                }
                pe.Timestamp = t
                pe.Windows = ws
-
+               pe.Pane = pn

Review comment:
       Please keep the blank line before the next block. 
   Blank lines used carefully help readability.

##########
File path: sdks/go/test/integration/primitives/windowinto.go
##########
@@ -91,3 +91,32 @@ func WindowSums_GBK(s beam.Scope) {
 func WindowSums_Lifted(s beam.Scope) {
        WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
+
+// TriggerWindowSums, much like WindowSums described above has an addition of 
configuring
+// a trigger here. SetDefault works fine. Other triggers such as SetAlways 
throws
+// pane decoding error.
+func TriggerWindowSums(s beam.Scope, sumPerKey func(beam.Scope, 
beam.PCollection) beam.PCollection) {
+       timestampedData := beam.ParDo(s, &createTimestampedData{Data: []int{4, 
9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
+
+       windowSize := 3 * time.Second
+
+       validate := func(s beam.Scope, wfn *window.Fn, in beam.PCollection, 
expected ...interface{}) {
+               // Window the data.
+               windowed := beam.WindowInto(s, wfn, in, 
beam.WindowTrigger{Name: window.Always})
+               // To get the pane decoding error, change above statement to
+               // windowed := beam.WindowInto(s, wfn, in, 
beam.WindowTrigger{Name: window.Always})
+               // Perform the appropriate sum operation.
+               sums := sumPerKey(s, windowed)
+               // Drop back to Global windows, and drop the key otherwise 
passert.Equals doesn't work.
+               sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
+               sums = beam.DropKey(s, sums)
+               passert.Equals(s, sums, expected...)
+       }
+
+       // Use fixed windows to divide the data into 3 chunks.
+       validate(s.Scope("Fixed"), window.NewFixedWindows(windowSize), 
timestampedData, 15, 15, 15)

Review comment:
       Doesn't the "Always" trigger happen on every element? Should we be 
actually expecting the sums, or the individual elements?  (I don't know what's 
correct here, outside of the default trigger.)

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+       FIRST     int = 0
+       ONE_INDEX int = 1
+       TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+       if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == 
typex.UNKNOWN {
+               return FIRST
+       } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+               return ONE_INDEX
+       } else {
+               return TWO_INDEX
+       }
+}
+
+func timing(v typex.Timing) int {
+       if v == typex.EARLY {
+               return 0
+       } else if v == typex.ON_TIME {
+               return 1
+       } else if v == typex.LATE {
+               return 2
+       } else {
+               return 3
+       }
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+       // Encoding: typex.PaneInfo
+
+       pane := 0
+       if v.IsFirst {
+               pane |= 1
+       }
+       if v.IsLast {
+               pane |= 2
+       }
+       pane |= timing(v.Timing) << 2
+
+       switch chooseEncoding(v) {
+       case FIRST:
+               paneByte := []byte{byte(pane)}
+               w.Write(paneByte)
+       case ONE_INDEX:
+               paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+               w.Write(paneByte)
+               EncodeVarInt(v.Index, w)
+       case TWO_INDEX:
+               paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+               w.Write(paneByte)
+               EncodeVarInt(v.Index, w)
+               EncodeVarInt(v.NonSpeculativeIndex, w)
+       }
+       return nil
+}
+
+func encodingType(b byte) int64 {
+       return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+       pn := typex.PaneInfo{}
+       if b&0x01 == 1 {
+               pn.IsFirst = true
+       }
+       if b&0x02 == 2 {
+               pn.IsLast = true
+       }
+       switch int64((b >> 2) & 0x03) {
+       case 0:
+               pn.Timing = typex.EARLY
+       case 1:
+               pn.Timing = typex.ON_TIME
+       case 2:
+               pn.Timing = typex.LATE
+       case 3:
+               pn.Timing = typex.UNKNOWN
+       }
+
+       return pn
+}
+
+// DecodePane decodes a single byte.
+func DecodePane(r io.Reader) (typex.PaneInfo, error) {
+       // Decoding: typex.PaneInfo
+       var data [1]byte
+       if err := ioutilx.ReadNBufUnsafe(r, data[:]); err != nil { // NO_FIRING 
pane
+               return typex.PaneInfo{}, err
+       }
+       pn := NewPane(data[0] & 0x0f)
+       switch encodingType(data[0]) {

Review comment:
       Cleaning up this switch statement I'll leave as an excercise based on my 
other comments.

##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
##########
@@ -117,7 +118,7 @@ func (x *translator) translateTransform(trunk string, id 
string) ([]*df.Step, er
                // URL Query-escaped windowed _unnested_ value. It is read back 
in
                // a nested context at runtime.
                var buf bytes.Buffer
-               if err := 
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, &buf); err != nil {
+               if err := 
exec.EncodeWindowedValueHeader(exec.MakeWindowEncoder(coder.NewGlobalWindow()), 
window.SingleGlobalWindow, mtime.ZeroTimestamp, typex.PaneInfo{}, &buf); err != 
nil {

Review comment:
       Question: Is the zero value for your pane info type identical to the "No 
firing" pane we've been writing? If so, very nice!
   
   Optionally, also add a function to the typex package: typex.NoFiringPane() 
to document the intended semantic meaning of the type at the call sights like 
this.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+       FIRST     int = 0
+       ONE_INDEX int = 1
+       TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+       if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == 
typex.UNKNOWN {
+               return FIRST
+       } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+               return ONE_INDEX
+       } else {
+               return TWO_INDEX
+       }
+}
+
+func timing(v typex.Timing) int {
+       if v == typex.EARLY {
+               return 0
+       } else if v == typex.ON_TIME {
+               return 1
+       } else if v == typex.LATE {
+               return 2
+       } else {
+               return 3
+       }
+}
+
+// EncodePane encodes a single byte.
+func EncodePane(v typex.PaneInfo, w io.Writer) error {
+       // Encoding: typex.PaneInfo
+
+       pane := 0
+       if v.IsFirst {
+               pane |= 1
+       }
+       if v.IsLast {
+               pane |= 2
+       }
+       pane |= timing(v.Timing) << 2
+
+       switch chooseEncoding(v) {
+       case FIRST:
+               paneByte := []byte{byte(pane)}
+               w.Write(paneByte)
+       case ONE_INDEX:
+               paneByte := []byte{byte(pane | (ONE_INDEX)<<4)}
+               w.Write(paneByte)
+               EncodeVarInt(v.Index, w)
+       case TWO_INDEX:
+               paneByte := []byte{byte(pane | (TWO_INDEX)<<4)}
+               w.Write(paneByte)
+               EncodeVarInt(v.Index, w)
+               EncodeVarInt(v.NonSpeculativeIndex, w)
+       }
+       return nil
+}
+
+func encodingType(b byte) int64 {
+       return int64(b >> 4)
+}
+
+func NewPane(b byte) typex.PaneInfo {
+       pn := typex.PaneInfo{}
+       if b&0x01 == 1 {
+               pn.IsFirst = true
+       }
+       if b&0x02 == 2 {
+               pn.IsLast = true
+       }
+       switch int64((b >> 2) & 0x03) {
+       case 0:
+               pn.Timing = typex.EARLY
+       case 1:
+               pn.Timing = typex.ON_TIME
+       case 2:
+               pn.Timing = typex.LATE
+       case 3:
+               pn.Timing = typex.UNKNOWN
+       }

Review comment:
       If you define those arrival conditions (early late etc)  as based on 
integers or uints with iota, you can replace this whole switch with: `pn.Timing 
= typex.Timing((b >> 2) & 0x03)`

##########
File path: sdks/go/pkg/beam/core/runtime/exec/coder.go
##########
@@ -1079,7 +1112,7 @@ func (*intervalWindowDecoder) DecodeSingle(r io.Reader) 
(typex.Window, error) {
 var paneNoFiring = []byte{0xf}

Review comment:
       If we aren't using this variable anywhere anymore we can remove it.

##########
File path: sdks/go/pkg/beam/core/graph/coder/panes.go
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+)
+
+const (
+       FIRST     int = 0
+       ONE_INDEX int = 1
+       TWO_INDEX int = 2
+)
+
+func chooseEncoding(v typex.PaneInfo) int {
+       if (v.Index == 0 || v.NonSpeculativeIndex == 0) || v.Timing == 
typex.UNKNOWN {
+               return FIRST
+       } else if v.Index == v.NonSpeculativeIndex || v.Timing == typex.EARLY {
+               return ONE_INDEX
+       } else {
+               return TWO_INDEX
+       }

Review comment:
       As a result, we can probably just move these conditions into the 
encoder, rather than relating them through constants, and have a comment that 
explains their differences, which you're using as a the constant name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to