[
https://issues.apache.org/jira/browse/BEAM-3855?focusedWorklogId=83226&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83226
]
ASF GitHub Bot logged work on BEAM-3855:
----------------------------------------
Author: ASF GitHub Bot
Created on: 22/Mar/18 16:34
Start Date: 22/Mar/18 16:34
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #4908: BEAM-3855: Add
Protocol Buffer support
URL: https://github.com/apache/beam/pull/4908
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/coder.go b/sdks/go/pkg/beam/coder.go
index db0c06a3991..236347a6cd7 100644
--- a/sdks/go/pkg/beam/coder.go
+++ b/sdks/go/pkg/beam/coder.go
@@ -25,11 +25,14 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/coderx"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+ "github.com/golang/protobuf/proto"
)
func init() {
RegisterFunction(JSONDec)
RegisterFunction(JSONEnc)
+ RegisterFunction(ProtoEnc)
+ RegisterFunction(ProtoDec)
}
// Coder defines how to encode and decode values of type 'A' into byte streams.
@@ -87,6 +90,8 @@ func NewCoder(t FullType) Coder {
return Coder{c}
}
+var protoMessageType = reflect.TypeOf((*proto.Message)(nil)).Elem()
+
func inferCoder(t FullType) (*coder.Coder, error) {
switch t.Class() {
case typex.Concrete, typex.Container:
@@ -108,6 +113,16 @@ func inferCoder(t FullType) (*coder.Coder, error) {
// conversions at runtime in inconvenient places.
return &coder.Coder{Kind: coder.Bytes, T: t}, nil
default:
+ // TODO(BEAM-3306): the coder registry should be
consulted here for user
+ // specified types and their coders.
+ if t.Type().Implements(protoMessageType) {
+ c, err := newProtoCoder(t.Type())
+ if err != nil {
+ return nil, err
+ }
+ return &coder.Coder{Kind: coder.Custom, T: t,
Custom: c}, nil
+ }
+
c, err := newJSONCoder(t.Type())
if err != nil {
return nil, err
@@ -154,6 +169,26 @@ func inferCoders(list []FullType) ([]*coder.Coder, error) {
// form that doesn't require LengthPrefix'ing to cut up the bytestream from
// the FnHarness.
+func ProtoEnc(in typex.T) ([]byte, error) {
+ return proto.Marshal(in.(proto.Message))
+}
+
+func ProtoDec(t reflect.Type, in []byte) (typex.T, error) {
+ val := reflect.New(t.Elem()).Interface().(proto.Message)
+ if err := proto.Unmarshal(in, val); err != nil {
+ return nil, err
+ }
+ return val, nil
+}
+
+func newProtoCoder(t reflect.Type) (*coder.CustomCoder, error) {
+ c, err := coder.NewCustomCoder("proto", t, ProtoEnc, ProtoDec)
+ if err != nil {
+ return nil, fmt.Errorf("invalid coder: %v", err)
+ }
+ return c, nil
+}
+
// Concrete and universal custom coders both have a similar signature.
// Conversion is handled by reflection.
diff --git a/sdks/go/pkg/beam/core/typex/class.go
b/sdks/go/pkg/beam/core/typex/class.go
index 8dfa79fbd54..a477ecf5f1d 100644
--- a/sdks/go/pkg/beam/core/typex/class.go
+++ b/sdks/go/pkg/beam/core/typex/class.go
@@ -20,6 +20,8 @@ import (
"reflect"
"unicode"
"unicode/utf8"
+
+ "github.com/golang/protobuf/proto"
)
// Class is the type "class" of data as distinguished by the runtime. The class
@@ -45,6 +47,8 @@ const (
Composite
)
+var protoMessageType = reflect.TypeOf((*proto.Message)(nil)).Elem()
+
// TODO(herohde) 5/16/2017: maybe we should add more classes, so that every
// reasonable type (such as error) is not Invalid, even though it is not
// valid in FullType. "Special", say? Right now, a valid DoFn signature may
@@ -76,6 +80,12 @@ func IsConcrete(t reflect.Type) bool {
return false
}
+ // TODO(BEAM-3306): the coder registry should be consulted here for user
+ // specified types and their coders.
+ if t.Implements(protoMessageType) {
+ return true
+ }
+
switch t.Kind() {
case reflect.Invalid, reflect.UnsafePointer, reflect.Uintptr,
reflect.Interface:
return false // no unmanageable types
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 83226)
Time Spent: 2h (was: 1h 50m)
> Add Go SDK support for protobuf coder
> -------------------------------------
>
> Key: BEAM-3855
> URL: https://issues.apache.org/jira/browse/BEAM-3855
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Willy Lulciuc
> Assignee: Bill Neubauer
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> This JIRA is for something functional. We might want to use the coder
> registry for a more general solution, when implemented.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)