This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b9fea35  [Go SDK] Use the known varint64 coder for int64s.
     new 3cbbe51  Merge pull request #8114 from lostluck/int64coder
b9fea35 is described below

commit b9fea3503bfc760626812038a74646b0751449b5
Author: Robert Burke <[email protected]>
AuthorDate: Fri Mar 22 00:21:55 2019 +0000

    [Go SDK] Use the known varint64 coder for int64s.
---
 .../pipeline/src/main/proto/beam_runner_api.proto  |  1 +
 sdks/go/pkg/beam/coder.go                          |  5 ++-
 sdks/go/pkg/beam/core/graph/coder/coder.go         |  2 +-
 sdks/go/pkg/beam/core/graph/coder/varint.go        | 12 +++---
 sdks/go/pkg/beam/core/graph/coder/varint_test.go   | 44 +++++++++++++---------
 sdks/go/pkg/beam/core/runtime/exec/coder.go        | 10 ++---
 6 files changed, 41 insertions(+), 33 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index e081f07..4e819ed 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -550,6 +550,7 @@ message StandardCoders {
     // Components: The key and value coder, in that order.
     KV = 1 [(beam_urn) = "beam:coder:kv:v1"];
 
+    // Variable length Encodes a 64-bit integer.
     // Components: None
     VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];
 
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index 62672e5..58ed115 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -135,7 +135,10 @@ func inferCoder(t FullType) (*coder.Coder, error) {
        switch t.Class() {
        case typex.Concrete, typex.Container:
                switch t.Type() {
-               case reflectx.Int, reflectx.Int8, reflectx.Int16, 
reflectx.Int32, reflectx.Int64:
+               case reflectx.Int64:
+                       // use the beam varint coder.
+                       return &coder.Coder{Kind: coder.VarInt, T: t}, nil
+               case reflectx.Int, reflectx.Int8, reflectx.Int16, 
reflectx.Int32:
                        c, err := coderx.NewVarIntZ(t.Type())
                        if err != nil {
                                return nil, err
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go 
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index a28f8ca..89b5bd9 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -246,7 +246,7 @@ func NewBytes() *Coder {
 
 // NewVarInt returns a new int32 coder using the built-in scheme.
 func NewVarInt() *Coder {
-       return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
+       return &Coder{Kind: VarInt, T: typex.New(reflectx.Int64)}
 }
 
 // IsW returns true iff the coder is for a WindowedValue.
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint.go 
b/sdks/go/pkg/beam/core/graph/coder/varint.go
index d96c245..1c93d1d 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint.go
@@ -85,16 +85,16 @@ func DecodeVarUint64(r io.Reader) (uint64, error) {
        }
 }
 
-// EncodeVarInt encodes an int32.
-func EncodeVarInt(value int32, w io.Writer) error {
-       return EncodeVarUint64((uint64)(value)&0xffffffff, w)
+// EncodeVarInt encodes an int64.
+func EncodeVarInt(value int64, w io.Writer) error {
+       return EncodeVarUint64((uint64)(value), w)
 }
 
-// DecodeVarInt decodes an int32.
-func DecodeVarInt(r io.Reader) (int32, error) {
+// DecodeVarInt decodes an int64.
+func DecodeVarInt(r io.Reader) (int64, error) {
        ret, err := DecodeVarUint64(r)
        if err != nil {
                return 0, err
        }
-       return (int32)(ret), nil
+       return (int64)(ret), nil
 }
diff --git a/sdks/go/pkg/beam/core/graph/coder/varint_test.go 
b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
index ab9d215..13acf3a 100644
--- a/sdks/go/pkg/beam/core/graph/coder/varint_test.go
+++ b/sdks/go/pkg/beam/core/graph/coder/varint_test.go
@@ -17,6 +17,8 @@ package coder
 
 import (
        "bytes"
+       "fmt"
+       "math"
        "testing"
 )
 
@@ -58,11 +60,15 @@ func TestEncodeDecodeVarUint64(t *testing.T) {
 
 func TestEncodeDecodeVarInt(t *testing.T) {
        tests := []struct {
-               value  int32
+               value  int64
                length int
        }{
-               {-2147483648, 5},
-               {-1, 5},
+               {math.MinInt32, 10},
+               {math.MaxInt32, 5},
+               {math.MinInt64, 10},
+               {math.MinInt64 + 234, 10},
+               {math.MaxInt64, 9},
+               {-1, 10},
                {0, 1},
                {1, 1},
                {127, 1},
@@ -71,24 +77,26 @@ func TestEncodeDecodeVarInt(t *testing.T) {
        }
 
        for _, test := range tests {
-               var buf bytes.Buffer
+               t.Run(fmt.Sprintf("num:%d,len:%d", test.value, test.length), 
func(t *testing.T) {
+                       var buf bytes.Buffer
 
-               if err := EncodeVarInt(test.value, &buf); err != nil {
-                       t.Fatalf("EncodeVarInt(%v) failed: %v", test.value, err)
-               }
+                       if err := EncodeVarInt(test.value, &buf); err != nil {
+                               t.Fatalf("EncodeVarInt(%v) failed: %v", 
test.value, err)
+                       }
 
-               t.Logf("Encoded %v to %v", test.value, buf.Bytes())
+                       t.Logf("Encoded %v to %v", test.value, buf.Bytes())
 
-               if len(buf.Bytes()) != test.length {
-                       t.Errorf("EncodeVarInt(%v) = %v, want %v", test.value, 
len(buf.Bytes()), test.length)
-               }
+                       if len(buf.Bytes()) != test.length {
+                               t.Errorf("len(EncodeVarInt(%v)) = %v, want %v", 
test.value, len(buf.Bytes()), test.length)
+                       }
 
-               actual, err := DecodeVarInt(&buf)
-               if err != nil {
-                       t.Fatalf("DecodeVarInt(<%v>) failed: %v", test.value, 
err)
-               }
-               if actual != test.value {
-                       t.Errorf("DecodeVarInt(<%v>) = %v, want %v", 
test.value, actual, test.value)
-               }
+                       actual, err := DecodeVarInt(&buf)
+                       if err != nil {
+                               t.Fatalf("DecodeVarInt(<%v>) failed: %v", 
test.value, err)
+                       }
+                       if actual != test.value {
+                               t.Errorf("DecodeVarInt(<%v>) = %v, want %v", 
test.value, actual, test.value)
+                       }
+               })
        }
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 174e39e..dfe2ac1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -27,7 +27,6 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
-       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
 // NOTE(herohde) 4/30/2017: The main complication is CoGBK results, which have
@@ -124,7 +123,7 @@ func (*bytesEncoder) Encode(val *FullValue, w io.Writer) 
error {
        }
        size := len(data)
 
-       if err := coder.EncodeVarInt((int32)(size), w); err != nil {
+       if err := coder.EncodeVarInt((int64)(size), w); err != nil {
                return err
        }
        _, err := w.Write(data)
@@ -151,16 +150,13 @@ type varIntEncoder struct{}
 
 func (*varIntEncoder) Encode(val *FullValue, w io.Writer) error {
        // Encoding: beam varint
-
-       n := Convert(val.Elm, reflectx.Int32).(int32) // Convert needed?
-       return coder.EncodeVarInt(n, w)
+       return coder.EncodeVarInt(val.Elm.(int64), w)
 }
 
 type varIntDecoder struct{}
 
 func (*varIntDecoder) Decode(r io.Reader) (*FullValue, error) {
        // Encoding: beam varint
-
        n, err := coder.DecodeVarInt(r)
        if err != nil {
                return nil, err
@@ -184,7 +180,7 @@ func (c *customEncoder) Encode(val *FullValue, w io.Writer) 
error {
        // (2) Add length prefix
 
        size := len(data)
-       if err := coder.EncodeVarInt((int32)(size), w); err != nil {
+       if err := coder.EncodeVarInt((int64)(size), w); err != nil {
                return err
        }
        _, err = w.Write(data)

Reply via email to