[
https://issues.apache.org/jira/browse/BEAM-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305729#comment-16305729
]
ASF GitHub Bot commented on BEAM-3356:
--------------------------------------
lukecwik closed pull request #4289: [BEAM-3356] Add builtin varint coder for
the Go SDK
URL: https://github.com/apache/beam/pull/4289
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/core/graph/coder/coder.go
b/sdks/go/pkg/beam/core/graph/coder/coder.go
index 7286aedd86c..1c46d7a132d 100644
--- a/sdks/go/pkg/beam/core/graph/coder/coder.go
+++ b/sdks/go/pkg/beam/core/graph/coder/coder.go
@@ -122,7 +122,8 @@ type Kind string
// documents the usage of coders in the Beam environment.
const (
Custom Kind = "Custom" // Implicitly length-prefixed
- Bytes Kind = "bytes" // Implicitly length-prefixed
+ Bytes Kind = "bytes" // Implicitly length-prefixed as part of
the encoding
+ VarInt Kind = "varint"
WindowedValue Kind = "W"
KV Kind = "KV"
GBK Kind = "GBK"
@@ -199,6 +200,11 @@ func NewBytes() *Coder {
return &Coder{Kind: Bytes, T: typex.New(reflectx.ByteSlice)}
}
+// NewVarInt returns a new int32 coder using the built-in scheme.
+func NewVarInt() *Coder {
+ return &Coder{Kind: VarInt, T: typex.New(reflectx.Int32)}
+}
+
// Convenience methods to operate through the top-level WindowedValue.
// IsW returns true iff the coder is for a WindowedValue.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index f40268e0735..948b2e2d62d 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -78,6 +78,12 @@ func EncodeElement(c *coder.Coder, val FullValue, w
io.Writer) error {
_, err := w.Write(data)
return err
+ case coder.VarInt:
+ // Encoding: beam varint
+
+ n :=
reflectx.UnderlyingType(val.Elm).Convert(reflectx.Int32).Interface().(int32)
+ return coder.EncodeVarInt(n, w)
+
case coder.Custom:
enc := c.Custom.Enc
@@ -132,6 +138,15 @@ func DecodeElement(c *coder.Coder, r io.Reader)
(FullValue, error) {
}
return FullValue{Elm: reflect.ValueOf(data)}, nil
+ case coder.VarInt:
+ // Encoding: beam varint
+
+ n, err := coder.DecodeVarInt(r)
+ if err != nil {
+ return FullValue{}, err
+ }
+ return FullValue{Elm: reflect.ValueOf(n)}, nil
+
case coder.Custom:
dec := c.Custom.Dec
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
index bb4b9b7b25c..35301fe79cf 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder.go
@@ -136,6 +136,9 @@ func (b *CoderUnmarshaller) makeCoder(c *pb.Coder)
(*coder.Coder, error) {
case urnBytesCoder:
return coder.NewBytes(), nil
+ case urnVarIntCoder:
+ return coder.NewVarInt(), nil
+
case urnKVCoder:
if len(components) != 2 {
return nil, fmt.Errorf("bad pair: %v", c)
@@ -298,6 +301,9 @@ func (b *CoderMarshaller) Add(c *coder.Coder) string {
// TODO(herohde) 6/27/2017: add length-prefix and not assume
nested by context?
return b.internBuiltInCoder(urnBytesCoder)
+ case coder.VarInt:
+ return b.internBuiltInCoder(urnVarIntCoder)
+
default:
panic(fmt.Sprintf("Unexpected coder kind: %v", c.Kind))
}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
index 86698141306..9ca52e77411 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/coder_test.go
@@ -46,6 +46,10 @@ func TestMarshalUnmarshalCoders(t *testing.T) {
"bytes",
coder.NewBytes(),
},
+ {
+ "varint",
+ coder.NewVarInt(),
+ },
{
"foo",
foo,
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 8d629b1154c..b732535d05f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -720,6 +720,7 @@ type CoderRef struct {
const (
WindowedValueType = "kind:windowed_value"
BytesType = "kind:bytes"
+ VarIntType = "kind:varint"
GlobalWindowType = "kind:global_window"
streamType = "kind:stream"
pairType = "kind:pair"
@@ -796,6 +797,9 @@ func EncodeCoderRef(c *coder.Coder) (*CoderRef, error) {
// TODO(herohde) 6/27/2017: add length-prefix and not assume
nested by context?
return &CoderRef{Type: BytesType}, nil
+ case coder.VarInt:
+ return &CoderRef{Type: VarIntType}, nil
+
default:
return nil, fmt.Errorf("bad coder kind: %v", c.Kind)
}
@@ -807,6 +811,9 @@ func DecodeCoderRef(c *CoderRef) (*coder.Coder, error) {
case BytesType:
return coder.NewBytes(), nil
+ case VarIntType:
+ return coder.NewVarInt(), nil
+
case pairType:
if len(c.Components) != 2 {
return nil, fmt.Errorf("bad pair: %+v", c)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add varint coder
> ----------------
>
> Key: BEAM-3356
> URL: https://issues.apache.org/jira/browse/BEAM-3356
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Henning Rohde
> Priority: Minor
>
> We should add a built-in coder for varint. The underlying functionality is
> present, but not exposed as a coder.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)