This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a64aa0f57ba [#24789][prism] internal/coders.go and tests (#25476)
a64aa0f57ba is described below
commit a64aa0f57ba3d6fb0e05f1a925e894ef1b3a5f78
Author: Robert Burke <[email protected]>
AuthorDate: Wed Feb 15 13:16:53 2023 -0800
[#24789][prism] internal/coders.go and tests (#25476)
* [prism] coders + initial test file.
* [prism] internal/coders.go
* [prism] finish coder coverage
* [prism] no-alloc doc
* review commit
---------
Co-authored-by: lostluck <[email protected]>
---
sdks/go/pkg/beam/runners/prism/internal/coders.go | 243 +++++++++++++
.../pkg/beam/runners/prism/internal/coders_test.go | 377 +++++++++++++++++++++
2 files changed, 620 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
new file mode 100644
index 00000000000..d88ed763f80
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+ "golang.org/x/exp/slog"
+ "google.golang.org/protobuf/encoding/prototext"
+)
+
+// leafCoders lists coder urns the runner knows how to manipulate.
+// In particular, ones that won't be a problem to parse, in general
+// because they have a known total size.
+var leafCoders = map[string]struct{}{
+ urns.CoderBytes: {},
+ urns.CoderStringUTF8: {},
+ urns.CoderLengthPrefix: {},
+ urns.CoderVarInt: {},
+ urns.CoderDouble: {},
+ urns.CoderBool: {},
+ urns.CoderGlobalWindow: {},
+ urns.CoderIntervalWindow: {},
+}
+
+func isLeafCoder(c *pipepb.Coder) bool {
+ _, ok := leafCoders[c.GetSpec().GetUrn()]
+ return ok
+}
+
+// makeWindowedValueCoder gets the coder for the PCollection, renders it safe,
and adds it to the coders map.
+//
+// PCollection coders are not inherently WindowValueCoder wrapped, and they
are added by the runner
+// for crossing the FnAPI boundary at data sources and data sinks.
+func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders
map[string]*pipepb.Coder) string {
+ col := comps.GetPcollections()[pID]
+ cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
+ wcID :=
comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId()
+
+ // The runner needs to be defensive, and tell the SDK to Length Prefix
+ // any coders that it doesn't understand.
+ // So here, we look at the coder and its components, and produce
+ // new coders that we know how to deal with.
+
+ // Produce ID for the Windowed Value Coder
+ wvcID := "cwv_" + pID
+ wInC := &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderWindowedValue,
+ },
+ ComponentCoderIds: []string{cID, wcID},
+ }
+ // Populate the coders to send with the new windowed value coder.
+ coders[wvcID] = wInC
+ return wvcID
+}
+
+// makeWindowCoders makes the coder pair but behavior is ultimately determined
by the strategy's windowFn.
+func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder,
exec.WindowEncoder) {
+ var cwc *coder.WindowCoder
+ switch wc.GetSpec().GetUrn() {
+ case urns.CoderGlobalWindow:
+ cwc = coder.NewGlobalWindow()
+ case urns.CoderIntervalWindow:
+ cwc = coder.NewIntervalWindow()
+ default:
+ slog.Log(slog.LevelError, "makeWindowCoders: unknown urn",
slog.String("urn", wc.GetSpec().GetUrn()))
+ panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v",
prototext.Format(wc)))
+ }
+ return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc)
+}
+
+// lpUnknownCoders takes a coder, and populates coders with any new coders
+// coders that the runner needs to be safe, and speedy.
+// It returns either the passed in coder id, or the new safe coder id.
+func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string
{
+ // First check if we've already added the LP version of this coder to
coders already.
+ lpcID := cID + "_lp"
+ // Check if we've done this one before.
+ if _, ok := bundle[lpcID]; ok {
+ return lpcID
+ }
+ // All coders in the coders map have been processed.
+ if _, ok := bundle[cID]; ok {
+ return cID
+ }
+ // Look up the canonical location.
+ c, ok := base[cID]
+ if !ok {
+ // We messed up somewhere.
+ panic(fmt.Sprint("unknown coder id:", cID))
+ }
+ // Add the original coder to the coders map.
+ bundle[cID] = c
+ // If we don't know this coder, and it has no sub components,
+ // we must LP it, and we return the LP'd version.
+ leaf := isLeafCoder(c)
+ if len(c.GetComponentCoderIds()) == 0 && !leaf {
+ lpc := &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderLengthPrefix,
+ },
+ ComponentCoderIds: []string{cID},
+ }
+ bundle[lpcID] = lpc
+ return lpcID
+ }
+ // We know we have a composite, so if we count this as a leaf, move
everything to
+ // the coders map.
+ if leaf {
+ // Copy the components from the base.
+ for _, cc := range c.GetComponentCoderIds() {
+ bundle[cc] = base[cc]
+ }
+ return cID
+ }
+ var needNewComposite bool
+ var comps []string
+ for _, cc := range c.GetComponentCoderIds() {
+ rcc := lpUnknownCoders(cc, bundle, base)
+ if cc != rcc {
+ needNewComposite = true
+ }
+ comps = append(comps, rcc)
+ }
+ if needNewComposite {
+ lpc := &pipepb.Coder{
+ Spec: c.GetSpec(),
+ ComponentCoderIds: comps,
+ }
+ bundle[lpcID] = lpc
+ return lpcID
+ }
+ return cID
+}
+
+// reconcileCoders ensures that the bundle coders are primed with initial
coders from
+// the base pipeline components.
+func reconcileCoders(bundle, base map[string]*pipepb.Coder) {
+ for {
+ var comps []string
+ for _, c := range bundle {
+ for _, ccid := range c.GetComponentCoderIds() {
+ if _, ok := bundle[ccid]; !ok {
+ // We don't have the coder yet, so in
we go.
+ comps = append(comps, ccid)
+ }
+ }
+ }
+ if len(comps) == 0 {
+ return
+ }
+ for _, ccid := range comps {
+ c, ok := base[ccid]
+ if !ok {
+ panic(fmt.Sprintf("unknown coder id during
reconciliation: %v", ccid))
+ }
+ bundle[ccid] = c
+ }
+ }
+}
+
+// pullDecoder return a function that will extract the bytes
+// for the associated coder. Uses a buffer and a TeeReader to extract the
original
+// bytes from when decoding elements.
+func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder)
func(io.Reader) []byte {
+ dec := pullDecoderNoAlloc(c, coders)
+ return func(r io.Reader) []byte {
+ var buf bytes.Buffer
+ tr := io.TeeReader(r, &buf)
+ dec(tr)
+ return buf.Bytes()
+ }
+}
+
+// pullDecoderNoAlloc returns a function that decodes a single eleemnt of the
given coder.
+// Intended to only be used as an internal function for pullDecoder, which
will use a io.TeeReader
+// to extract the bytes.
+func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder)
func(io.Reader) {
+ urn := c.GetSpec().GetUrn()
+ switch urn {
+ // Anything length prefixed can be treated as opaque.
+ case urns.CoderBytes, urns.CoderStringUTF8, urns.CoderLengthPrefix:
+ return func(r io.Reader) {
+ l, _ := coder.DecodeVarInt(r)
+ ioutilx.ReadN(r, int(l))
+ }
+ case urns.CoderVarInt:
+ return func(r io.Reader) {
+ coder.DecodeVarInt(r)
+ }
+ case urns.CoderBool:
+ return func(r io.Reader) {
+ coder.DecodeBool(r)
+ }
+ case urns.CoderDouble:
+ return func(r io.Reader) {
+ coder.DecodeDouble(r)
+ }
+ case urns.CoderIterable:
+ ccids := c.GetComponentCoderIds()
+ ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
+ return func(r io.Reader) {
+ l, _ := coder.DecodeInt32(r)
+ for i := int32(0); i < l; i++ {
+ ed(r)
+ }
+ }
+
+ case urns.CoderKV:
+ ccids := c.GetComponentCoderIds()
+ kd := pullDecoderNoAlloc(coders[ccids[0]], coders)
+ vd := pullDecoderNoAlloc(coders[ccids[1]], coders)
+ return func(r io.Reader) {
+ kd(r)
+ vd(r)
+ }
+ case urns.CoderRow:
+ panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v",
prototext.Format(c)))
+ default:
+ panic(fmt.Sprintf("unknown coder urn key: %v", urn))
+ }
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
new file mode 100644
index 00000000000..ad6e3649628
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "bytes"
+ "encoding/binary"
+ "math"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/protobuf/testing/protocmp"
+)
+
+func Test_isLeafCoder(t *testing.T) {
+ tests := []struct {
+ urn string
+ isLeaf bool
+ }{
+ {urns.CoderBytes, true},
+ {urns.CoderStringUTF8, true},
+ {urns.CoderLengthPrefix, true},
+ {urns.CoderVarInt, true},
+ {urns.CoderDouble, true},
+ {urns.CoderBool, true},
+ {urns.CoderGlobalWindow, true},
+ {urns.CoderIntervalWindow, true},
+ {urns.CoderIterable, false},
+ {urns.CoderRow, false},
+ {urns.CoderKV, false},
+ }
+ for _, test := range tests {
+ undertest := &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: test.urn,
+ },
+ }
+ if got, want := isLeafCoder(undertest), test.isLeaf; got !=
want {
+ t.Errorf("isLeafCoder(%v) = %v, want %v", test.urn,
got, want)
+ }
+ }
+}
+
+func Test_makeWindowedValueCoder(t *testing.T) {
+ coders := map[string]*pipepb.Coder{}
+
+ gotID := makeWindowedValueCoder("testPID", &pipepb.Components{
+ Pcollections: map[string]*pipepb.PCollection{
+ "testPID": {CoderId: "testCoderID"},
+ },
+ Coders: map[string]*pipepb.Coder{
+ "testCoderID": {
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderBool,
+ },
+ },
+ },
+ }, coders)
+
+ if gotID == "" {
+ t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty",
gotID)
+ }
+ got := coders[gotID]
+ if got == nil {
+ t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry",
gotID)
+ }
+ if got.GetSpec().GetUrn() != urns.CoderWindowedValue {
+ t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry",
gotID)
+ }
+}
+
+func Test_makeWindowCoders(t *testing.T) {
+ tests := []struct {
+ urn string
+ window typex.Window
+ }{
+ {urns.CoderGlobalWindow, window.GlobalWindow{}},
+ {urns.CoderIntervalWindow, window.IntervalWindow{
+ Start: mtime.MinTimestamp,
+ End: mtime.MaxTimestamp,
+ }},
+ }
+ for _, test := range tests {
+ undertest := &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: test.urn,
+ },
+ }
+ dec, enc := makeWindowCoders(undertest)
+
+ // Validate we're getting a round trip coder.
+ var buf bytes.Buffer
+ if err := enc.EncodeSingle(test.window, &buf); err != nil {
+ t.Errorf("encoder[%v].EncodeSingle(%v) = %v, want nil",
test.urn, test.window, err)
+ }
+ got, err := dec.DecodeSingle(&buf)
+ if err != nil {
+ t.Errorf("decoder[%v].DecodeSingle(%v) = %v, want nil",
test.urn, test.window, err)
+ }
+
+ if want := test.window; got != want {
+ t.Errorf("makeWindowCoders(%v) didn't round trip: got
%v, want %v", test.urn, got, want)
+ }
+ }
+}
+
+func Test_lpUnknownCoders(t *testing.T) {
+ tests := []struct {
+ name string
+ urn string
+ components []string
+ bundle, base map[string]*pipepb.Coder
+ want map[string]*pipepb.Coder
+ }{
+ {"alreadyProcessed",
+ urns.CoderBool, nil,
+ map[string]*pipepb.Coder{
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ },
+ {"alreadyProcessedLP",
+ urns.CoderBool, nil,
+ map[string]*pipepb.Coder{
+ "test_lp": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}},
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{
+ "test_lp": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}},
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ },
+ {"noNeedForLP",
+ urns.CoderBool, nil,
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ },
+ {"needLP",
+ urns.CoderRow, nil,
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{
+ "test_lp": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}},
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderRow}},
+ },
+ },
+ {"needLP_recurse",
+ urns.CoderKV, []string{"k", "v"},
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderRow}},
+ "v": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ map[string]*pipepb.Coder{
+ "test_lp": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k_lp", "v"}},
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+ "k_lp": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}},
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderRow}},
+ "v": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ },
+ {"alreadyLP", urns.CoderLengthPrefix, []string{"k"},
+ map[string]*pipepb.Coder{},
+ map[string]*pipepb.Coder{
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderRow}},
+ },
+ map[string]*pipepb.Coder{
+ "test": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}},
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderRow}},
+ },
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ // Add the initial coder to base.
+ test.base["test"] = &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{Urn:
test.urn},
+ ComponentCoderIds: test.components,
+ }
+
+ lpUnknownCoders("test", test.bundle, test.base)
+
+ if d := cmp.Diff(test.want, test.bundle,
protocmp.Transform()); d != "" {
+ t.Fatalf("lpUnknownCoders(%v); (-want,
+got):\n%v", test.urn, d)
+ }
+ })
+ }
+}
+
+func Test_reconcileCoders(t *testing.T) {
+ tests := []struct {
+ name string
+ bundle, base map[string]*pipepb.Coder
+ want map[string]*pipepb.Coder
+ }{
+ {name: "noChanges",
+ bundle: map[string]*pipepb.Coder{
+ "a": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ base: map[string]*pipepb.Coder{
+ "a": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ "b": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBytes}},
+ "c": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderStringUTF8}},
+ },
+ want: map[string]*pipepb.Coder{
+ "a": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ },
+ {name: "KV",
+ bundle: map[string]*pipepb.Coder{
+ "kv": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+ },
+ base: map[string]*pipepb.Coder{
+ "kv": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ "v": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ want: map[string]*pipepb.Coder{
+ "kv": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ "v": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ },
+ },
+ {name: "KV-nested",
+ bundle: map[string]*pipepb.Coder{
+ "kv": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+ },
+ base: map[string]*pipepb.Coder{
+ "kv": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}},
+ "v": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ "a": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBytes}},
+ "b": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderRow}},
+ "c": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderStringUTF8}},
+ },
+ want: map[string]*pipepb.Coder{
+ "kv": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+ "k": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}},
+ "v": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBool}},
+ "a": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderBytes}},
+ "b": {Spec: &pipepb.FunctionSpec{Urn:
urns.CoderRow}},
+ },
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ reconcileCoders(test.bundle, test.base)
+
+ if d := cmp.Diff(test.want, test.bundle,
protocmp.Transform()); d != "" {
+ t.Fatalf("reconcileCoders(...); (-want,
+got):\n%v", d)
+ }
+ })
+ }
+}
+
+func Test_pullDecoder(t *testing.T) {
+
+ doubleBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(doubleBytes, math.Float64bits(math.SqrtPi))
+
+ tests := []struct {
+ name string
+ coder *pipepb.Coder
+ coders map[string]*pipepb.Coder
+ input []byte
+ }{
+ {
+ "bytes",
+ &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderBytes,
+ },
+ },
+ map[string]*pipepb.Coder{},
+ []byte{3, 1, 2, 3},
+ }, {
+ "varint",
+ &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderVarInt,
+ },
+ },
+ map[string]*pipepb.Coder{},
+ []byte{255, 3},
+ }, {
+ "bool",
+ &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderBool,
+ },
+ },
+ map[string]*pipepb.Coder{},
+ []byte{1},
+ }, {
+ "double",
+ &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderDouble,
+ },
+ },
+ map[string]*pipepb.Coder{},
+ doubleBytes,
+ }, {
+ "iterable",
+ &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderIterable,
+ },
+ ComponentCoderIds: []string{"elm"},
+ },
+ map[string]*pipepb.Coder{
+ "elm": &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderVarInt,
+ },
+ },
+ },
+ []byte{4, 0, 1, 2, 3},
+ }, {
+ "kv",
+ &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderKV,
+ },
+ ComponentCoderIds: []string{"key", "value"},
+ },
+ map[string]*pipepb.Coder{
+ "key": &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderVarInt,
+ },
+ },
+ "value": &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.CoderBool,
+ },
+ },
+ },
+ []byte{3, 0},
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ dec := pullDecoder(test.coder, test.coders)
+ buf := bytes.NewBuffer(test.input)
+ got := dec(buf)
+ if !bytes.EqualFold(test.input, got) {
+ t.Fatalf("pullDecoder(%v)(...) = %v, want %v",
test.coder, got, test.input)
+ }
+ })
+ }
+}