This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch go-sdk
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/go-sdk by this push:
     new e6f3af2  Add builtin varint coder
e6f3af2 is described below

commit e6f3af22f74bc37ee2e587f0399512ddf237d05a
Author: Henning Rohde <[email protected]>
AuthorDate: Mon Dec 18 10:51:18 2017 -0800

    Add builtin varint coder
---
 sdks/go/pkg/beam/core/graph/coder/coder.go         |  8 +++++++-
 sdks/go/pkg/beam/core/runtime/exec/coder.go        | 15 +++++++++++++++
 sdks/go/pkg/beam/core/runtime/graphx/coder.go      |  6 ++++++
 sdks/go/pkg/beam/core/runtime/graphx/coder_test.go |  4 ++++
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go  |  7 +++++++
 5 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go 
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index 7286aed..1c46d7a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -122,7 +122,8 @@ type Kind string
 // documents the usage of coders in the Beam environment.
 const (
        Custom        Kind = "Custom" // Implicitly length-prefixed
-       Bytes         Kind = "bytes"  // Implicitly length-prefixed
+       Bytes         Kind = "bytes"  // Implicitly length-prefixed as part of 
the encoding
+       VarInt        Kind = "varint"
        WindowedValue Kind = "W"
        KV            Kind = "KV"
        GBK           Kind = "GBK"
@@ -199,6 +200,11 @@ func NewBytes() *Coder {
        return &Coder{Kind: Bytes, T: typex.New(reflectx.ByteSlice)}
 }
 
+// NewVarInt returns a new int32 coder using the built-in scheme.
+func NewVarInt() *Coder {
+       return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
+}
+
 // Convenience methods to operate through the top-level WindowedValue.
 
 // IsW returns true iff the coder is for a WindowedValue.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index b162082..2253c36 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -78,6 +78,12 @@ func EncodeElement(c *coder.Coder, val FullValue, w 
io.Writer) error {
                _, err := w.Write(data)
                return err
 
+       case coder.VarInt:
+               // Encoding: beam varint
+
+               n := 
reflectx.UnderlyingType(val.Elm).Convert(reflectx.Int32).Interface().(int32)
+               return coder.EncodeVarInt(n, w)
+
        case coder.Custom:
                enc := c.Custom.Enc
 
@@ -135,6 +141,15 @@ func DecodeElement(c *coder.Coder, r io.Reader) 
(FullValue, error) {
                }
                return FullValue{Elm: reflect.ValueOf(data)}, nil
 
+       case coder.VarInt:
+               // Encoding: beam varint
+
+               n, err := coder.DecodeVarInt(r)
+               if err != nil {
+                       return FullValue{}, err
+               }
+               return FullValue{Elm: reflect.ValueOf(n)}, nil
+
        case coder.Custom:
                dec := c.Custom.Dec
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index bb4b9b7..35301fe 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -136,6 +136,9 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder) 
(*coder.Coder, error) {
        case urnBytesCoder:
                return coder.NewBytes(), nil
 
+       case urnVarIntCoder:
+               return coder.NewVarInt(), nil
+
        case urnKVCoder:
                if len(components) != 2 {
                        return nil, fmt.Errorf("bad pair: %v", c)
@@ -298,6 +301,9 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
                // TODO(herohde) 6/27/2017: add length-prefix and not assume 
nested by context?
                return b.internBuiltInCoder(urnBytesCoder)
 
+       case coder.VarInt:
+               return b.internBuiltInCoder(urnVarIntCoder)
+
        default:
                panic(fmt.Sprintf("Unexpected coder kind: %v", c.Kind))
        }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go 
b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index 8669814..9ca52e7 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -47,6 +47,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
                        coder.NewBytes(),
                },
                {
+                       "varint",
+                       coder.NewVarInt(),
+               },
+               {
                        "foo",
                        foo,
                },
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go 
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 8d629b1..b732535 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -720,6 +720,7 @@ type CoderRef struct {
 const (
        WindowedValueType = "kind:windowed_value"
        BytesType         = "kind:bytes"
+       VarIntType        = "kind:varint"
        GlobalWindowType  = "kind:global_window"
        streamType        = "kind:stream"
        pairType          = "kind:pair"
@@ -796,6 +797,9 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
                // TODO(herohde) 6/27/2017: add length-prefix and not assume 
nested by context?
                return &CoderRef{Type: BytesType}, nil
 
+       case coder.VarInt:
+               return &CoderRef{Type: VarIntType}, nil
+
        default:
                return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
        }
@@ -807,6 +811,9 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
        case BytesType:
                return coder.NewBytes(), nil
 
+       case VarIntType:
+               return coder.NewVarInt(), nil
+
        case pairType:
                if len(c.Components) != 2 {
                        return nil, fmt.Errorf("bad pair: %+v", c)

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to