[ 
https://issues.apache.org/jira/browse/BEAM-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295863#comment-16295863
 ] 

ASF GitHub Bot commented on BEAM-3356:
--------------------------------------

robertwb closed pull request #4276: [BEAM-3356] Add Go SDK int and varint 
custom coders
URL: https://github.com/apache/beam/pull/4276
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 9405573e8af..b81152afed7 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -85,18 +85,29 @@ func inferCoder(t FullType) (*coder.Coder, error) {
        switch t.Class() {
        case typex.Concrete, typex.Container:
                switch t.Type() {
-               // The type conversions here are very conservative. We handle 
bytes/strings
-               // equivalently because they are essentially equivalent in the 
language.
-               // Notably, we do not (currently) support equivalences in 
numeric data types
-               // due to risks around inadvertent widening or narrowing of 
data.
+               case reflectx.Int, reflectx.Int8, reflectx.Int16, 
reflectx.Int32, reflectx.Int64:
+                       c, err := coder.NewVarIntZ(t.Type())
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &coder.Coder{Kind: coder.Custom, T: t, Custom: 
c}, nil
+               case reflectx.Uint, reflectx.Uint8, reflectx.Uint16, 
reflectx.Uint32, reflectx.Uint64:
+                       c, err := coder.NewVarUintZ(t.Type())
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &coder.Coder{Kind: coder.Custom, T: t, Custom: 
c}, nil
                case reflectx.String, reflectx.ByteSlice:
+                       // We handle bytes/strings equivalently because they 
are essentially
+                       // equivalent in the language. We generally infer exact 
coders only.
                        return &coder.Coder{Kind: coder.Bytes}, nil
+               default:
+                       c, err := newJSONCoder(t.Type())
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &coder.Coder{Kind: coder.Custom, T: t, Custom: 
c}, nil
                }
-               c, err := newJSONCoder(t.Type())
-               if err != nil {
-                       return nil, err
-               }
-               return &coder.Coder{Kind: coder.Custom, T: t, Custom: c}, nil
 
        case typex.Composite:
                c, err := inferCoders(t.Components())
diff --git a/sdks/go/pkg/beam/core/graph/coder/int.go 
b/sdks/go/pkg/beam/core/graph/coder/int.go
index 336e32bb28f..77f28146361 100644
--- a/sdks/go/pkg/beam/core/graph/coder/int.go
+++ b/sdks/go/pkg/beam/core/graph/coder/int.go
@@ -20,8 +20,74 @@ import (
        "io"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
+var (
+       // Fixed-sized custom coders for integers.
+
+       Uint32 *CustomCoder
+       Int32  *CustomCoder
+       Uint64 *CustomCoder
+       Int64  *CustomCoder
+)
+
+func init() {
+       var err error
+       Uint32, err = NewCustomCoder("uint32", reflectx.Uint32, encUint32, 
decUint32)
+       if err != nil {
+               panic(err)
+       }
+       Int32, err = NewCustomCoder("int32", reflectx.Int32, encInt32, decInt32)
+       if err != nil {
+               panic(err)
+       }
+       Uint64, err = NewCustomCoder("uint64", reflectx.Uint64, encUint64, 
decUint64)
+       if err != nil {
+               panic(err)
+       }
+       Int64, err = NewCustomCoder("int64", reflectx.Int64, encInt64, decInt64)
+       if err != nil {
+               panic(err)
+       }
+}
+
+func encUint32(v uint32) []byte {
+       ret := make([]byte, 4)
+       binary.BigEndian.PutUint32(ret, v)
+       return ret
+}
+
+func decUint32(data []byte) uint32 {
+       return binary.BigEndian.Uint32(data)
+}
+
+func encInt32(v int32) []byte {
+       return encUint32(uint32(v))
+}
+
+func decInt32(data []byte) int32 {
+       return int32(decUint32(data))
+}
+
+func encUint64(v uint64) []byte {
+       ret := make([]byte, 8)
+       binary.BigEndian.PutUint64(ret, v)
+       return ret
+}
+
+func decUint64(data []byte) uint64 {
+       return binary.BigEndian.Uint64(data)
+}
+
+func encInt64(v int64) []byte {
+       return encUint64(uint64(v))
+}
+
+func decInt64(data []byte) int64 {
+       return int64(decUint64(data))
+}
+
 // EncodeUint64 encodes an uint64 in big endian format.
 func EncodeUint64(value uint64, w io.Writer) error {
        ret := make([]byte, 8)
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go 
b/sdks/go/pkg/beam/core/graph/coder/varint.go
index 511d1760236..1eed24fa2e1 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -16,8 +16,14 @@
 package coder
 
 import (
+       "encoding/binary"
        "errors"
+       "fmt"
        "io"
+       "reflect"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 // ErrVarIntTooLong indicates a data corruption issue that needs special
@@ -25,11 +31,55 @@ import (
 // this special handling.
 var ErrVarIntTooLong = errors.New("varint too long")
 
-// Variable-length encoding for integers.
-//
-// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
-// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
-// uint64 (over int64) as the primitive form to get logical bit shifts.
+// NewVarIntZ returns a varint coder for the given integer type. It uses a 
zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarIntZ(t reflect.Type) (*CustomCoder, error) {
+       switch t.Kind() {
+       case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, 
reflect.Int64:
+               return NewCustomCoder("varintz", t, encVarIntZ, decVarIntZ)
+       default:
+               return nil, fmt.Errorf("not a signed integer type: %v", t)
+       }
+}
+
+// NewVarUintZ returns a uvarint coder for the given integer type. It uses a 
zig-zag scheme,
+// which is _different_ from the Beam standard coding scheme.
+func NewVarUintZ(t reflect.Type) (*CustomCoder, error) {
+       switch t.Kind() {
+       case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, 
reflect.Uint64:
+               return NewCustomCoder("varuintz", t, encVarUintZ, decVarUintZ)
+       default:
+               return nil, fmt.Errorf("not a unsigned integer type: %v", t)
+       }
+}
+
+func encVarIntZ(v typex.T) []byte {
+       ret := make([]byte, binary.MaxVarintLen64)
+       size := binary.PutVarint(ret, 
reflect.ValueOf(v).Convert(reflectx.Int64).Interface().(int64))
+       return ret[:size]
+}
+
+func decVarIntZ(t reflect.Type, data []byte) (typex.T, error) {
+       n, size := binary.Varint(data)
+       if size <= 0 {
+               return nil, fmt.Errorf("invalid varintz encoding for: %v", data)
+       }
+       return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
+
+func encVarUintZ(v typex.T) []byte {
+       ret := make([]byte, binary.MaxVarintLen64)
+       size := binary.PutUvarint(ret, 
reflect.ValueOf(v).Convert(reflectx.Uint64).Interface().(uint64))
+       return ret[:size]
+}
+
+func decVarUintZ(t reflect.Type, data []byte) (typex.T, error) {
+       n, size := binary.Uvarint(data)
+       if size <= 0 {
+               return nil, fmt.Errorf("invalid varuintz encoding for: %v", 
data)
+       }
+       return reflect.ValueOf(n).Convert(t).Interface(), nil
+}
 
 // EncodeVarUint64 encodes an uint64.
 func EncodeVarUint64(value uint64, w io.Writer) error {
@@ -51,6 +101,12 @@ func EncodeVarUint64(value uint64, w io.Writer) error {
        }
 }
 
+// Variable-length encoding for integers.
+//
+// Takes between 1 and 10 bytes. Less efficient for negative or large numbers.
+// All negative ints are encoded using 5 bytes, longs take 10 bytes. We use
+// uint64 (over int64) as the primitive form to get logical bit shifts.
+
 // TODO(herohde) 5/16/2017: figure out whether it's too slow to read one byte
 // at a time here. If not, we may need a more sophisticated reader than
 // io.Reader with lookahead, say.
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint_test.go 
b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
index ab9d215122e..0460880655f 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
@@ -17,7 +17,10 @@ package coder
 
 import (
        "bytes"
+       "reflect"
        "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 func TestEncodeDecodeVarUint64(t *testing.T) {
@@ -92,3 +95,64 @@ func TestEncodeDecodeVarInt(t *testing.T) {
                }
        }
 }
+
+func TestVarIntZ(t *testing.T) {
+       tests := []interface{}{
+               int(1),
+               int(-1),
+               int8(8),
+               int8(-8),
+               int16(16),
+               int16(-16),
+               int32(32),
+               int32(-32),
+               int64(64),
+               int64(-64),
+       }
+
+       for _, v := range tests {
+               typ := reflect.ValueOf(v).Type()
+
+               data := encVarIntZ(v)
+               result, err := decVarIntZ(typ, data)
+               if err != nil {
+                       t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+               }
+
+               if v != result {
+                       t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+               }
+               resultT := 
reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+               if resultT != typ {
+                       t.Errorf("type(dec(enc(%v))) = %v, want id", typ, 
resultT)
+               }
+       }
+}
+
+func TestVarUintZ(t *testing.T) {
+       tests := []interface{}{
+               uint(1),
+               uint8(8),
+               uint16(16),
+               uint32(32),
+               uint64(64),
+       }
+
+       for _, v := range tests {
+               typ := reflect.ValueOf(v).Type()
+
+               data := encVarUintZ(v)
+               result, err := decVarUintZ(typ, data)
+               if err != nil {
+                       t.Fatalf("dec(enc(%v)) failed: %v", v, err)
+               }
+
+               if v != result {
+                       t.Errorf("dec(enc(%v)) = %v, want id", v, result)
+               }
+               resultT := 
reflectx.UnderlyingType(reflect.ValueOf(result)).Type()
+               if resultT != typ {
+                       t.Errorf("type(dec(enc(%v))) = %v, want id", typ, 
resultT)
+               }
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add varint coder
> ----------------
>
>                 Key: BEAM-3356
>                 URL: https://issues.apache.org/jira/browse/BEAM-3356
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Minor
>
> We should add a built-in coder for varint. The underlying functionality is 
> present, but not exposed as a coder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to