This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 657caa88c01 [BEAM-14306] Add unit testing to pane coder (#17370)
657caa88c01 is described below

commit 657caa88c011cae818377f64f02afe9d18614da5
Author: Jack McCluskey <[email protected]>
AuthorDate: Thu Apr 21 20:06:07 2022 -0400

    [BEAM-14306] Add unit testing to pane coder (#17370)
---
 sdks/go/pkg/beam/core/graph/coder/panes.go      |  18 ++-
 sdks/go/pkg/beam/core/graph/coder/panes_test.go | 179 ++++++++++++++++++++++++
 2 files changed, 190 insertions(+), 7 deletions(-)

diff --git a/sdks/go/pkg/beam/core/graph/coder/panes.go 
b/sdks/go/pkg/beam/core/graph/coder/panes.go
index 3ccd987e765..bb193c0fe5b 100644
--- a/sdks/go/pkg/beam/core/graph/coder/panes.go
+++ b/sdks/go/pkg/beam/core/graph/coder/panes.go
@@ -16,6 +16,7 @@
 package coder
 
 import (
+       "fmt"
        "io"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
@@ -28,20 +29,23 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
 
        pane := byte(0)
        if v.IsFirst {
-               pane |= 0x01
+               pane |= 0x02
        }
        if v.IsLast {
-               pane |= 0x02
+               pane |= 0x01
        }
        pane |= byte(v.Timing << 2)
 
        switch {
-       case v.Index == 0 || v.NonSpeculativeIndex == 0 || v.Timing == 
typex.PaneUnknown:
+       case (v.Index == 0 && v.NonSpeculativeIndex == 0) || v.Timing == 
typex.PaneUnknown:
                // The entire pane info is encoded as a single byte
                paneByte := []byte{pane}
                w.Write(paneByte)
        case v.Index == v.NonSpeculativeIndex || v.Timing == typex.PaneEarly:
                // The pane info is encoded as this byte plus a single VarInt 
encoded integer
+               if v.Timing == typex.PaneEarly && v.NonSpeculativeIndex != -1 {
+                       return fmt.Errorf("error encoding pane %v: 
non-speculative index value must be equal to -1 if the pane timing is early", v)
+               }
                paneByte := []byte{pane | 1<<4}
                w.Write(paneByte)
                EncodeVarInt(v.Index, w)
@@ -60,11 +64,11 @@ func EncodePane(v typex.PaneInfo, w io.Writer) error {
 func NewPane(b byte) typex.PaneInfo {
        pn := typex.NoFiringPane()
 
-       if b&0x01 == 1 {
-               pn.IsFirst = true
+       if !(b&0x02 == 2) {
+               pn.IsFirst = false
        }
-       if b&0x02 == 2 {
-               pn.IsLast = true
+       if !(b&0x01 == 1) {
+               pn.IsLast = false
        }
 
        pn.Timing = typex.PaneTiming((b >> 2) & 0x03)
diff --git a/sdks/go/pkg/beam/core/graph/coder/panes_test.go 
b/sdks/go/pkg/beam/core/graph/coder/panes_test.go
new file mode 100644
index 00000000000..49088e7f60c
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/panes_test.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "bytes"
+       "math"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+func makePaneInfo(timing typex.PaneTiming, first, last bool, index, nsIndex 
int64) typex.PaneInfo {
+       return typex.PaneInfo{Timing: timing, IsFirst: first, IsLast: last, 
Index: index, NonSpeculativeIndex: nsIndex}
+}
+
+func equalPanes(left, right typex.PaneInfo) bool {
+       return (left.Timing == right.Timing) && (left.IsFirst == right.IsFirst) 
&& (left.IsLast == right.IsLast) && (left.Index == right.Index) && 
(left.NonSpeculativeIndex == right.NonSpeculativeIndex)
+}
+
+func TestPaneCoder(t *testing.T) {
+       tests := []struct {
+               name    string
+               timing  typex.PaneTiming
+               first   bool
+               last    bool
+               index   int64
+               nsIndex int64
+       }{
+               {
+                       "false bools",
+                       typex.PaneUnknown,
+                       false,
+                       false,
+                       0,
+                       0,
+               },
+               {
+                       "true bools",
+                       typex.PaneUnknown,
+                       true,
+                       true,
+                       0,
+                       0,
+               },
+               {
+                       "first pane",
+                       typex.PaneUnknown,
+                       true,
+                       false,
+                       0,
+                       0,
+               },
+               {
+                       "last pane",
+                       typex.PaneUnknown,
+                       false,
+                       true,
+                       0,
+                       0,
+               },
+               {
+                       "on time, different index and non-speculative",
+                       typex.PaneOnTime,
+                       false,
+                       false,
+                       1,
+                       2,
+               },
+               {
+                       "valid early pane",
+                       typex.PaneEarly,
+                       true,
+                       false,
+                       math.MaxInt64,
+                       -1,
+               },
+               {
+                       "on time, max non-speculative index",
+                       typex.PaneOnTime,
+                       false,
+                       true,
+                       0,
+                       math.MaxInt64,
+               },
+               {
+                       "late pane, max index",
+                       typex.PaneLate,
+                       false,
+                       false,
+                       math.MaxInt64,
+                       0,
+               },
+               {
+                       "on time, min non-speculative index",
+                       typex.PaneOnTime,
+                       false,
+                       true,
+                       0,
+                       math.MinInt64,
+               },
+               {
+                       "late, min index",
+                       typex.PaneLate,
+                       false,
+                       false,
+                       math.MinInt64,
+                       0,
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       input := makePaneInfo(test.timing, test.first, 
test.last, test.index, test.nsIndex)
+                       var buf bytes.Buffer
+                       err := EncodePane(input, &buf)
+                       if err != nil {
+                               t.Fatalf("failed to encode pane %v, got %v", 
input, err)
+                       }
+                       got, err := DecodePane(&buf)
+                       if err != nil {
+                               t.Fatalf("failed to decode pane from buffer %v, 
got %v", buf, err)
+                       }
+                       if want := input; !equalPanes(got, want) {
+                               t.Errorf("got pane %v, want %v", got, want)
+                       }
+               })
+       }
+}
+
+func TestEncodePane_bad(t *testing.T) {
+       tests := []struct {
+               name    string
+               timing  typex.PaneTiming
+               first   bool
+               last    bool
+               index   int64
+               nsIndex int64
+       }{
+               {
+                       "invalid early pane, max ints",
+                       typex.PaneEarly,
+                       true,
+                       false,
+                       math.MaxInt64,
+                       math.MaxInt64,
+               },
+               {
+                       "invalid early pane, min ints",
+                       typex.PaneEarly,
+                       true,
+                       false,
+                       math.MinInt64,
+                       math.MinInt64,
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       input := makePaneInfo(test.timing, test.first, 
test.last, test.index, test.nsIndex)
+                       var buf bytes.Buffer
+                       err := EncodePane(input, &buf)
+                       if err == nil {
+                               t.Errorf("successfully encoded pane when it 
should have failed, got %v", buf)
+                       }
+               })
+       }
+}

Reply via email to