This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/go-sdk by this push:
     new 14eb2cd  [BEAM-3356] Add Go SDK int and varint custom coders (#4276)
14eb2cd is described below

commit 14eb2cddcd11c73f79555bc8ddbc121e29b10c4d
Author: Henning Rohde <[email protected]>
AuthorDate: Mon Dec 18 15:37:50 2017 -0800

    [BEAM-3356] Add Go SDK int and varint custom coders (#4276)
    
    * Add Go SDK int and varint custom coders
    
    * CR: [BEAM-3356] Add Go SDK int and varint custom coders
---
 sdks/go/pkg/beam/coder.go                        | 29 +++++++----
 sdks/go/pkg/beam/core/graph/coder/int.go         | 66 ++++++++++++++++++++++++
 sdks/go/pkg/beam/core/graph/coder/varint.go      | 66 ++++++++++++++++++++++--
 sdks/go/pkg/beam/core/graph/coder/varint_test.go | 64 +++++++++++++++++++++++
 4 files changed, 211 insertions(+), 14 deletions(-)

diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 9405573..b81152a 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -85,18 +85,29 @@ func inferCoder(t FullType) (*coder.Coder, error) {
        switch t.Class() {
        case typex.Concrete, typex.Container:
                switch t.Type() {
-               // The type conversions here are very conservative. We handle 
bytes/strings
-               // equivalently because they are essentially equivalent in the 
language.
-               // Notably, we do not (currently) support equivalences in 
numeric data types
-               // due to risks around inadvertent widening or narrowing of 
data.
+               case reflectx.Int, reflectx.Int8, reflectx.Int16, 
reflectx.Int32, reflectx.Int64:
+                       c, err := coder.NewVarIntZ(t.Type())
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &coder.Coder{Kind: coder.Custom, T: t, Custom: 
c}, nil
+               case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, 
reflectx.Uint32, reflectx.Uint64:
+                       c, err := coder.NewVarUintZ(t.Type())
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &coder.Coder{Kind: coder.Custom, T: t, Custom: 
c}, nil
                case reflectx.String, reflectx.ByteSlice:
+                       // We handle bytes/strings equivalently because they 
are essentially
+                       // equivalent in the language. We generally infer exact 
coders only.
                        return &coder.Coder{Kind: coder.Bytes}, nil
+               default:
+                       c, err := newJSONCoder(t.Type())
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &coder.Coder{Kind: coder.Custom, T: t, Custom: 
c}, nil
                }
-               c, err := newJSONCoder(t.Type())
-               if err != nil {
-                       return nil, err
-               }
-               return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
 
        case typex.Composite:
                c, err := inferCoders(t.Components())
diff --git a/sdks/go/pkg/beam/core/graph/coder/int.go 
b/sdks/go/pkg/beam/core/graph/coder/int.go
index 336e32b..77f2814 100644
--- a/sdks/go/pkg/beam/core/graph/coder/int.go
+++ b/sdks/go/pkg/beam/core/graph/coder/int.go
@@ -20,8 +20,74 @@ import (
        "io"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
+var (
+       // Fixed-sized custom coders for integers.
+
+       Uint32 *CustomCoder
+       Int32  *CustomCoder
+       Uint64 *CustomCoder
+       Int64  *CustomCoder
+)
+
+func init() {
+       var err error
+       Uint32, err = NewCustomCoder("uint32", reflectx.Uint32, encUint32, 
decUint32)
+       if err != nil {
+               panic(err)
+       }
+       Int32, err = NewCustomCoder("int32", reflectx.Int32, encInt32, decInt32)
+       if err != nil {
+               panic(err)
+       }
+       Uint64, err = NewCustomCoder("uint64", reflectx.Uint64, encUint64, 
decUint64)
+       if err != nil {
+               panic(err)
+       }
+       Int64, err = NewCustomCoder("int64", reflectx.Int64, encInt64, decInt64)
+       if err != nil {
+               panic(err)
+       }
+}
+
+func encUint32(v uint32) []byte {
+       ret := make([]byte, 4)
+       binary.BigEndian.PutUint32(ret, v)
+       return ret
+}
+
+func decUint32(data []byte) uint32 {
+       return binary.BigEndian.Uint32(data)
+}
+
+func encInt32(v int32) []byte {
+       return encUint32(uint32(v))
+}
+
+func decInt32(data []byte) int32 {
+       return int32(decUint32(data))
+}
+
+func encUint64(v uint64) []byte {
+       ret := make([]byte, 8)
+       binary.BigEndian.PutUint64(ret, v)
+       return ret
+}
+
+func decUint64(data []byte) uint64 {
+       return binary.BigEndian.Uint64(data)
+}
+
+func encInt64(v int64) []byte {
+       return encUint64(uint64(v))
+}
+
+func decInt64(data []byte) int64 {
+       return int64(decUint64(data))
+}
+
 // EncodeUint64 encodes an uint64 in big endian format.
 func EncodeUint64(value uint64, w io.Writer) error {
        ret := make([]byte, 8)
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go 
b/sdks/go/pkg/beam/core/graph/coder/varint.go
index 511d176..1eed24f 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -16,8 +16,14 @@
 package coder
 
 import (
+       "encoding/binary"
        "errors"
+       "fmt"
        "io"
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 // ErrVarIntTooLong indicates a data corruption issue that needs special
@@ -25,11 +31,55 @@ import (
 // this special handling.
 var ErrVarIntTooLong = errors.New("varint too long")
 
-// Variable-length encoding for integers.
-//
-// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
-// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
-// uint64 (over int64) as the primitive form to get logical bit shifts.
+// NewVarIntZ returns a varint coder for the given integer type. It uses a 
zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarIntZ(t reflect.Type) (*CustomCoder, error) {
+       switch t.Kind() {
+       case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, 
reflect.Int64:
+               return NewCustomCoder("varintz", t, encVarIntZ, decVarIntZ)
+       default:
+               return nil, fmt.Errorf("not a signed integer type: %v", t)
+       }
+}
+
+// NewVarUintZ returns a uvarint coder for the given integer type. It uses a 
zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarUintZ(t reflect.Type) (*CustomCoder, error) {
+       switch t.Kind() {
+       case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, 
reflect.Uint64:
+               return NewCustomCoder("varuintz", t, encVarUintZ, decVarUintZ)
+       default:
+               return nil, fmt.Errorf("not a unsigned integer type: %v", t)
+       }
+}
+
+func encVarIntZ(v typex.T) []byte {
+       ret := make([]byte, binary.MaxVarintLen64)
+       size := binary.PutVarint(ret, 
reflect.ValueOf(v).Convert(reflectx.Int64).Interface().(int64))
+       return ret[:size]
+}
+
+func decVarIntZ(t reflect.Type, data []byte) (typex.T, error) {
+       n, size := binary.Varint(data)
+       if size <= 0 {
+               return nil, fmt.Errorf("invalid varintz encoding for: %v", data)
+       }
+       return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
+
+func encVarUintZ(v typex.T) []byte {
+       ret := make([]byte, binary.MaxVarintLen64)
+       size := binary.PutUvarint(ret, 
reflect.ValueOf(v).Convert(reflectx.Uint64).Interface().(uint64))
+       return ret[:size]
+}
+
+func decVarUintZ(t reflect.Type, data []byte) (typex.T, error) {
+       n, size := binary.Uvarint(data)
+       if size <= 0 {
+               return nil, fmt.Errorf("invalid varuintz encoding for: %v", 
data)
+       }
+       return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
 
 // EncodeVarUint64 encodes an uint64.
 func EncodeVarUint64(value uint64, w io.Writer) error {
@@ -51,6 +101,12 @@ func EncodeVarUint64(value uint64, w io.Writer) error {
        }
 }
 
+// Variable-length encoding for integers.
+//
+// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
+// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
+// uint64 (over int64) as the primitive form to get logical bit shifts.
+
 // TODO(herohde) 5/16/2017: figure out whether it's too slow to read one byte
 // at a time here. If not, we may need a more sophisticated reader than
 // io.Reader with lookahead, say.
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint_test.go 
b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
index ab9d215..0460880 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
@@ -17,7 +17,10 @@ package coder
 
 import (
        "bytes"
+       "reflect"
        "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 func TestEncodeDecodeVarUint64(t *testing.T) {
@@ -92,3 +95,64 @@ func TestEncodeDecodeVarInt(t *testing.T) {
                }
        }
 }
+
+func TestVarIntZ(t *testing.T) {
+       tests := []interface{}{
+               int(1),
+               int(-1),
+               int8(8),
+               int8(-8),
+               int16(16),
+               int16(-16),
+               int32(32),
+               int32(-32),
+               int64(64),
+               int64(-64),
+       }
+
+       for _, v := range tests {
+               typ := reflect.ValueOf(v).Type()
+
+               data := encVarIntZ(v)
+               result, err := decVarIntZ(typ, data)
+               if err != nil {
+                       t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+               }
+
+               if v != result {
+                       t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+               }
+               resultT := 
reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+               if resultT != typ {
+                       t.Errorf("type(dec(enc(%v))) = %v, want id", typ, 
resultT)
+               }
+       }
+}
+
+func TestVarUintZ(t *testing.T) {
+       tests := []interface{}{
+               uint(1),
+               uint8(8),
+               uint16(16),
+               uint32(32),
+               uint64(64),
+       }
+
+       for _, v := range tests {
+               typ := reflect.ValueOf(v).Type()
+
+               data := encVarUintZ(v)
+               result, err := decVarUintZ(typ, data)
+               if err != nil {
+                       t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+               }
+
+               if v != result {
+                       t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+               }
+               resultT := 
reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+               if resultT != typ {
+                       t.Errorf("type(dec(enc(%v))) = %v, want id", typ, 
resultT)
+               }
+       }
+}

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to