This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e7a3392  [BEAM-9615] Add bytes, bool, and iterable coders (#12127)
e7a3392 is described below

commit e7a3392595e14cdb377fe95fb679305ea9a32249
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Tue Jun 30 10:55:31 2020 -0700

    [BEAM-9615] Add bytes, bool, and iterable coders (#12127)
---
 sdks/go/pkg/beam/core/graph/coder/bool.go          |  57 ++++++++++
 sdks/go/pkg/beam/core/graph/coder/bool_test.go     |  79 +++++++++++++
 sdks/go/pkg/beam/core/graph/coder/bytes.go         |  68 ++++++++++++
 sdks/go/pkg/beam/core/graph/coder/bytes_test.go    |  61 +++++++++++
 sdks/go/pkg/beam/core/graph/coder/iterable.go      | 122 +++++++++++++++++++++
 sdks/go/pkg/beam/core/graph/coder/iterable_test.go | 113 +++++++++++++++++++
 6 files changed, 500 insertions(+)

diff --git a/sdks/go/pkg/beam/core/graph/coder/bool.go 
b/sdks/go/pkg/beam/core/graph/coder/bool.go
new file mode 100644
index 0000000..309ae24
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/bool.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// EncodeBool encodes a boolean according to the beam protocol.
+func EncodeBool(v bool, w io.Writer) error {
+       // Encoding: false = 0, true = 1
+       var err error
+       if v {
+               _, err = ioutilx.WriteUnsafe(w, []byte{1})
+       } else {
+               _, err = ioutilx.WriteUnsafe(w, []byte{0})
+       }
+       if err != nil {
+               return errors.Wrap(err, "error encoding bool")
+       }
+       return nil
+}
+
+// DecodeBool decodes a boolean according to the beam protocol.
+func DecodeBool(r io.Reader) (bool, error) {
+       // Encoding: false = 0, true = 1
+       var b [1]byte
+       if err := ioutilx.ReadNBufUnsafe(r, b[:]); err != nil {
+               if err == io.EOF {
+                       return false, err
+               }
+               return false, errors.Wrap(err, "error decoding bool")
+       }
+       switch b[0] {
+       case 0:
+               return false, nil
+       case 1:
+               return true, nil
+       }
+       return false, errors.Errorf("error decoding bool: received invalid 
value %v", b[0])
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/bool_test.go 
b/sdks/go/pkg/beam/core/graph/coder/bool_test.go
new file mode 100644
index 0000000..77aaf8b
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/bool_test.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "bytes"
+       "fmt"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       "github.com/google/go-cmp/cmp"
+)
+
+func TestEncodeBool(t *testing.T) {
+       tests := []struct {
+               v    bool
+               want []byte
+       }{
+               {v: false, want: []byte{0}},
+               {v: true, want: []byte{1}},
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("%v", test.v), func(t *testing.T) {
+                       var buf bytes.Buffer
+                       err := EncodeBool(test.v, &buf)
+                       if err != nil {
+                               t.Fatalf("EncodeBool(%v) = %v", test.v, err)
+                       }
+                       if d := cmp.Diff(test.want, buf.Bytes()); d != "" {
+                               t.Errorf("EncodeBool(%v) = %v, want %v 
diff(-want,+got):\n %v", test.v, buf.Bytes(), test.want, d)
+                       }
+               })
+       }
+}
+
+func TestDecodeBool(t *testing.T) {
+       tests := []struct {
+               b    []byte
+               want bool
+               err  error
+       }{
+               {want: false, b: []byte{0}},
+               {want: true, b: []byte{1}},
+               {b: []byte{42}, err: errors.Errorf("error decoding bool: 
received invalid value %v", 42)},
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("%v", test.want), func(t *testing.T) {
+                       buf := bytes.NewBuffer(test.b)
+                       got, err := DecodeBool(buf)
+                       if test.err != nil && err != nil {
+                               if d := cmp.Diff(test.err.Error(), 
err.Error()); d != "" {
+                                       t.Errorf("DecodeBool(%v) = %v, want %v 
diff(-want,+got):\n %v", test.b, err, test.err, d)
+                               }
+                               return
+                       }
+                       if err != nil {
+                               t.Fatalf("DecodeBool(%v) = %v", test.b, err)
+                       }
+                       if d := cmp.Diff(test.want, got); d != "" {
+                               t.Errorf("DecodeBool(%v) = %v, want %v 
diff(-want,+got):\n %v", test.b, got, test.want, d)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/bytes.go 
b/sdks/go/pkg/beam/core/graph/coder/bytes.go
new file mode 100644
index 0000000..5bb5692
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/bytes.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "fmt"
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// EncodeByte encodes a single byte.
+func EncodeByte(v byte, w io.Writer) error {
+       // Encoding: raw byte.
+       if _, err := ioutilx.WriteUnsafe(w, []byte{v}); err != nil {
+               return fmt.Errorf("error encoding byte: %v", err)
+       }
+       return nil
+}
+
+// DecodeByte decodes a single byte.
+func DecodeByte(r io.Reader) (byte, error) {
+       // Encoding: raw byte
+       var b [1]byte
+       if err := ioutilx.ReadNBufUnsafe(r, b[:]); err != nil {
+               if err == io.EOF {
+                       return 0, err
+               }
+               return 0, errors.Wrap(err, "error decoding byte")
+       }
+       return b[0], nil
+}
+
+// EncodeBytes encodes a []byte with a length prefix per the beam protocol.
+func EncodeBytes(v []byte, w io.Writer) error {
+       // Encoding: size (varint) + raw data
+       size := len(v)
+       if err := EncodeVarInt((int64)(size), w); err != nil {
+               return err
+       }
+       _, err := w.Write(v)
+       return err
+
+}
+
+// DecodeBytes decodes a length prefixed []byte according to the beam protocol.
+func DecodeBytes(r io.Reader) ([]byte, error) {
+       // Encoding: size (varint) + raw data
+       size, err := DecodeVarInt(r)
+       if err != nil {
+               return nil, err
+       }
+       return ioutilx.ReadN(r, (int)(size))
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/bytes_test.go 
b/sdks/go/pkg/beam/core/graph/coder/bytes_test.go
new file mode 100644
index 0000000..0d7d1a7
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/bytes_test.go
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "bytes"
+       "fmt"
+       "strings"
+       "testing"
+
+       "github.com/google/go-cmp/cmp"
+)
+
+func TestEncodeDecodeBytes(t *testing.T) {
+       longString := strings.Repeat(" this sentence is 32 characters.", 8) // 
256 characters to ensure LP works.
+       tests := []struct {
+               v       []byte
+               encoded []byte
+       }{
+               {v: []byte{}, encoded: []byte{0}},
+               {v: []byte{42}, encoded: []byte{1, 42}},
+               {v: []byte{42, 23}, encoded: []byte{2, 42, 23}},
+               {v: []byte(longString), encoded: append([]byte{128, 2}, 
[]byte(longString)...)},
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("encode %q", test.v), func(t *testing.T) {
+                       var buf bytes.Buffer
+                       err := EncodeBytes(test.v, &buf)
+                       if err != nil {
+                               t.Fatalf("EncodeBytes(%q) = %v", test.v, err)
+                       }
+                       if d := cmp.Diff(test.encoded, buf.Bytes()); d != "" {
+                               t.Errorf("EncodeBytes(%q) = %v, want %v 
diff(-want,+got):\n %v", test.v, buf.Bytes(), test.encoded, d)
+                       }
+               })
+               t.Run(fmt.Sprintf("decode %v", test.v), func(t *testing.T) {
+                       buf := bytes.NewBuffer(test.encoded)
+                       got, err := DecodeBytes(buf)
+                       if err != nil {
+                               t.Fatalf("DecodeBytes(%q) = %v", test.encoded, 
err)
+                       }
+                       if d := cmp.Diff(test.v, got); d != "" {
+                               t.Errorf("DecodeBytes(%q) = %q, want %v 
diff(-want,+got):\n %v", test.encoded, got, test.v, d)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/iterable.go 
b/sdks/go/pkg/beam/core/graph/coder/iterable.go
new file mode 100644
index 0000000..c6affc6
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/iterable.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "io"
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+// TODO(lostluck): 2020.06.29 export these for use for others?
+
+// iterableEncoder reflectively encodes a slice or array type using
+// the beam fixed length iterable encoding.
+func iterableEncoder(rt reflect.Type, encode func(reflect.Value, io.Writer) 
error) func(reflect.Value, io.Writer) error {
+       return func(rv reflect.Value, w io.Writer) error {
+               size := rv.Len()
+               if err := EncodeInt32((int32)(size), w); err != nil {
+                       return err
+               }
+               for i := 0; i < size; i++ {
+                       if err := encode(rv.Index(i), w); err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+}
+
+// iterableDecoderForSlice can decode from both the fixed sized and
+// multi-chunk variants of the beam iterable protocol.
+// Returns an error for other protocols (such as state backed).
+func iterableDecoderForSlice(rt reflect.Type, decodeToElem func(reflect.Value, 
io.Reader) error) func(reflect.Value, io.Reader) error {
+       return func(ret reflect.Value, r io.Reader) error {
+               // (1) Read count prefixed encoded data
+               size, err := DecodeInt32(r)
+               if err != nil {
+                       return err
+               }
+               n := int(size)
+               switch {
+               case n >= 0:
+                       rv := reflect.MakeSlice(rt, n, n)
+                       if err := decodeToIterable(rv, r, decodeToElem); err != 
nil {
+                               return err
+                       }
+                       ret.Set(rv)
+                       return nil
+               case n == -1:
+                       chunk, err := DecodeVarInt(r)
+                       if err != nil {
+                               return err
+                       }
+                       rv := reflect.MakeSlice(rt, 0, int(chunk))
+                       for chunk != 0 {
+                               rvi := reflect.MakeSlice(rt, int(chunk), 
int(chunk))
+                               if err := decodeToIterable(rvi, r, 
decodeToElem); err != nil {
+                                       return err
+                               }
+                               rv = reflect.AppendSlice(rv, rvi)
+                               chunk, err = DecodeVarInt(r)
+                               if err != nil {
+                                       return err
+                               }
+                       }
+                       ret.Set(rv)
+                       return nil
+               default:
+                       return errors.Errorf("unable to decode slice iterable 
with size: %d", n)
+               }
+       }
+}
+
+// iterableDecoderForArray can decode from only the fixed sized and
+// multi-chunk variant of the beam iterable protocol.
+// Returns an error for other protocols (such as state backed).
+func iterableDecoderForArray(rt reflect.Type, decodeToElem func(reflect.Value, 
io.Reader) error) func(reflect.Value, io.Reader) error {
+       return func(ret reflect.Value, r io.Reader) error {
+               // (1) Read count prefixed encoded data
+               size, err := DecodeInt32(r)
+               if err != nil {
+                       return err
+               }
+               n := int(size)
+               if rt.Len() != n {
+                       return errors.Errorf("len mismatch decoding a %v: want 
%d got %d", rt, rt.Len(), n)
+               }
+               switch {
+               case n >= 0:
+                       if err := decodeToIterable(ret, r, decodeToElem); err 
!= nil {
+                               return err
+                       }
+                       return nil
+               default:
+                       return errors.Errorf("unable to decode array iterable 
with size: %d", n)
+               }
+       }
+}
+
+func decodeToIterable(rv reflect.Value, r io.Reader, decodeTo 
func(reflect.Value, io.Reader) error) error {
+       for i := 0; i < rv.Len(); i++ {
+               err := decodeTo(rv.Index(i), r)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
diff --git a/sdks/go/pkg/beam/core/graph/coder/iterable_test.go 
b/sdks/go/pkg/beam/core/graph/coder/iterable_test.go
new file mode 100644
index 0000000..977b588
--- /dev/null
+++ b/sdks/go/pkg/beam/core/graph/coder/iterable_test.go
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "reflect"
+       "testing"
+
+       "github.com/google/go-cmp/cmp"
+)
+
+func TestEncodeDecodeIterable(t *testing.T) {
+       stringEnc := func(rv reflect.Value, w io.Writer) error {
+               return EncodeStringUTF8(rv.String(), w)
+       }
+       stringDec := func(rv reflect.Value, r io.Reader) error {
+               v, err := DecodeStringUTF8(r)
+               if err != nil {
+                       return nil
+               }
+               t.Log(v)
+               rv.SetString(v)
+               return nil
+       }
+
+       tests := []struct {
+               v          interface{}
+               encElm     func(reflect.Value, io.Writer) error
+               decElm     func(reflect.Value, io.Reader) error
+               encoded    []byte
+               decodeOnly bool
+       }{
+               {
+                       v: [4]byte{1, 2, 3, 4},
+                       encElm: func(rv reflect.Value, w io.Writer) error {
+                               return EncodeByte(byte(rv.Uint()), w)
+                       },
+                       decElm: func(rv reflect.Value, r io.Reader) error {
+                               b, err := DecodeByte(r)
+                               if err != nil {
+                                       return nil
+                               }
+                               rv.SetUint(uint64(b))
+                               return nil
+                       },
+                       encoded: []byte{0, 0, 0, 4, 1, 2, 3, 4},
+               },
+               {
+                       v:       []string{"my", "gopher"},
+                       encElm:  stringEnc,
+                       decElm:  stringDec,
+                       encoded: []byte{0, 0, 0, 2, 2, 'm', 'y', 6, 'g', 'o', 
'p', 'h', 'e', 'r'},
+               },
+               {
+                       v:          []string{"my", "gopher", "rocks"},
+                       encElm:     stringEnc,
+                       decElm:     stringDec,
+                       encoded:    []byte{255, 255, 255, 255, 1, 2, 'm', 'y', 
2, 6, 'g', 'o', 'p', 'h', 'e', 'r', 5, 'r', 'o', 'c', 'k', 's', 0},
+                       decodeOnly: true,
+               },
+       }
+       for _, test := range tests {
+               test := test
+               if !test.decodeOnly {
+                       t.Run(fmt.Sprintf("encode %q", test.v), func(t 
*testing.T) {
+                               var buf bytes.Buffer
+                               err := iterableEncoder(reflect.TypeOf(test.v), 
test.encElm)(reflect.ValueOf(test.v), &buf)
+                               if err != nil {
+                                       t.Fatalf("EncodeBytes(%q) = %v", 
test.v, err)
+                               }
+                               if d := cmp.Diff(test.encoded, buf.Bytes()); d 
!= "" {
+                                       t.Errorf("EncodeBytes(%q) = %v, want %v 
diff(-want,+got):\n %v", test.v, buf.Bytes(), test.encoded, d)
+                               }
+                       })
+               }
+               t.Run(fmt.Sprintf("decode %v", test.v), func(t *testing.T) {
+                       buf := bytes.NewBuffer(test.encoded)
+                       rt := reflect.TypeOf(test.v)
+                       var dec func(reflect.Value, io.Reader) error
+                       switch rt.Kind() {
+                       case reflect.Slice:
+                               dec = iterableDecoderForSlice(rt, test.decElm)
+                       case reflect.Array:
+                               dec = iterableDecoderForArray(rt, test.decElm)
+                       }
+                       rv := reflect.New(rt).Elem()
+                       err := dec(rv, buf)
+                       if err != nil {
+                               t.Fatalf("DecodeBytes(%q) = %v", test.encoded, 
err)
+                       }
+                       got := rv.Interface()
+                       if d := cmp.Diff(test.v, got); d != "" {
+                               t.Errorf("DecodeBytes(%q) = %q, want %v 
diff(-want,+got):\n %v", test.encoded, got, test.v, d)
+                       }
+               })
+       }
+}

Reply via email to