This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a64aa0f57ba [#24789][prism] internal/coders.go and tests (#25476)
a64aa0f57ba is described below

commit a64aa0f57ba3d6fb0e05f1a925e894ef1b3a5f78
Author: Robert Burke <[email protected]>
AuthorDate: Wed Feb 15 13:16:53 2023 -0800

    [#24789][prism] internal/coders.go and tests (#25476)
    
    * [prism] coders + initial test file.
    
    * [prism] internal/coders.go
    
    * [prism] finish coder coverage
    
    * [prism] no-alloc doc
    
    * review commit
    
    ---------
    
    Co-authored-by: lostluck <[email protected]>
---
 sdks/go/pkg/beam/runners/prism/internal/coders.go  | 243 +++++++++++++
 .../pkg/beam/runners/prism/internal/coders_test.go | 377 +++++++++++++++++++++
 2 files changed, 620 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go 
b/sdks/go/pkg/beam/runners/prism/internal/coders.go
new file mode 100644
index 00000000000..d88ed763f80
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
+       pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+       "golang.org/x/exp/slog"
+       "google.golang.org/protobuf/encoding/prototext"
+)
+
+// leafCoders lists coder urns the runner knows how to manipulate.
+// In particular, ones that won't be a problem to parse, in general
+// because they have a known total size.
+var leafCoders = map[string]struct{}{
+       urns.CoderBytes:          {},
+       urns.CoderStringUTF8:     {},
+       urns.CoderLengthPrefix:   {},
+       urns.CoderVarInt:         {},
+       urns.CoderDouble:         {},
+       urns.CoderBool:           {},
+       urns.CoderGlobalWindow:   {},
+       urns.CoderIntervalWindow: {},
+}
+
+func isLeafCoder(c *pipepb.Coder) bool {
+       _, ok := leafCoders[c.GetSpec().GetUrn()]
+       return ok
+}
+
+// makeWindowedValueCoder gets the coder for the PCollection, renders it safe, 
and adds it to the coders map.
+//
+// PCollection coders are not inherently WindowValueCoder wrapped, and they 
are added by the runner
+// for crossing the FnAPI boundary at data sources and data sinks.
+func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders 
map[string]*pipepb.Coder) string {
+       col := comps.GetPcollections()[pID]
+       cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
+       wcID := 
comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId()
+
+       // The runner needs to be defensive, and tell the SDK to Length Prefix
+       // any coders that it doesn't understand.
+       // So here, we look at the coder and its components, and produce
+       // new coders that we know how to deal with.
+
+       // Produce ID for the Windowed Value Coder
+       wvcID := "cwv_" + pID
+       wInC := &pipepb.Coder{
+               Spec: &pipepb.FunctionSpec{
+                       Urn: urns.CoderWindowedValue,
+               },
+               ComponentCoderIds: []string{cID, wcID},
+       }
+       // Populate the coders to send with the new windowed value coder.
+       coders[wvcID] = wInC
+       return wvcID
+}
+
+// makeWindowCoders makes the coder pair but behavior is ultimately determined 
by the strategy's windowFn.
+func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, 
exec.WindowEncoder) {
+       var cwc *coder.WindowCoder
+       switch wc.GetSpec().GetUrn() {
+       case urns.CoderGlobalWindow:
+               cwc = coder.NewGlobalWindow()
+       case urns.CoderIntervalWindow:
+               cwc = coder.NewIntervalWindow()
+       default:
+               slog.Log(slog.LevelError, "makeWindowCoders: unknown urn", 
slog.String("urn", wc.GetSpec().GetUrn()))
+               panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v", 
prototext.Format(wc)))
+       }
+       return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc)
+}
+
+// lpUnknownCoders takes a coder, and populates coders with any new coders
+// coders that the runner needs to be safe, and speedy.
+// It returns either the passed in coder id, or the new safe coder id.
+func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string 
{
+       // First check if we've already added the LP version of this coder to 
coders already.
+       lpcID := cID + "_lp"
+       // Check if we've done this one before.
+       if _, ok := bundle[lpcID]; ok {
+               return lpcID
+       }
+       // All coders in the coders map have been processed.
+       if _, ok := bundle[cID]; ok {
+               return cID
+       }
+       // Look up the canonical location.
+       c, ok := base[cID]
+       if !ok {
+               // We messed up somewhere.
+               panic(fmt.Sprint("unknown coder id:", cID))
+       }
+       // Add the original coder to the coders map.
+       bundle[cID] = c
+       // If we don't know this coder, and it has no sub components,
+       // we must LP it, and we return the LP'd version.
+       leaf := isLeafCoder(c)
+       if len(c.GetComponentCoderIds()) == 0 && !leaf {
+               lpc := &pipepb.Coder{
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: urns.CoderLengthPrefix,
+                       },
+                       ComponentCoderIds: []string{cID},
+               }
+               bundle[lpcID] = lpc
+               return lpcID
+       }
+       // We know we have a composite, so if we count this as a leaf, move 
everything to
+       // the coders map.
+       if leaf {
+               // Copy the components from the base.
+               for _, cc := range c.GetComponentCoderIds() {
+                       bundle[cc] = base[cc]
+               }
+               return cID
+       }
+       var needNewComposite bool
+       var comps []string
+       for _, cc := range c.GetComponentCoderIds() {
+               rcc := lpUnknownCoders(cc, bundle, base)
+               if cc != rcc {
+                       needNewComposite = true
+               }
+               comps = append(comps, rcc)
+       }
+       if needNewComposite {
+               lpc := &pipepb.Coder{
+                       Spec:              c.GetSpec(),
+                       ComponentCoderIds: comps,
+               }
+               bundle[lpcID] = lpc
+               return lpcID
+       }
+       return cID
+}
+
+// reconcileCoders ensures that the bundle coders are primed with initial 
coders from
+// the base pipeline components.
+func reconcileCoders(bundle, base map[string]*pipepb.Coder) {
+       for {
+               var comps []string
+               for _, c := range bundle {
+                       for _, ccid := range c.GetComponentCoderIds() {
+                               if _, ok := bundle[ccid]; !ok {
+                                       // We don't have the coder yet, so in 
we go.
+                                       comps = append(comps, ccid)
+                               }
+                       }
+               }
+               if len(comps) == 0 {
+                       return
+               }
+               for _, ccid := range comps {
+                       c, ok := base[ccid]
+                       if !ok {
+                               panic(fmt.Sprintf("unknown coder id during 
reconciliation: %v", ccid))
+                       }
+                       bundle[ccid] = c
+               }
+       }
+}
+
+// pullDecoder return a function that will extract the bytes
+// for the associated coder. Uses a buffer and a TeeReader to extract the 
original
+// bytes from when decoding elements.
+func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) 
func(io.Reader) []byte {
+       dec := pullDecoderNoAlloc(c, coders)
+       return func(r io.Reader) []byte {
+               var buf bytes.Buffer
+               tr := io.TeeReader(r, &buf)
+               dec(tr)
+               return buf.Bytes()
+       }
+}
+
+// pullDecoderNoAlloc returns a function that decodes a single eleemnt of the 
given coder.
+// Intended to only be used as an internal function for pullDecoder, which 
will use a io.TeeReader
+// to extract the bytes.
+func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) 
func(io.Reader) {
+       urn := c.GetSpec().GetUrn()
+       switch urn {
+       // Anything length prefixed can be treated as opaque.
+       case urns.CoderBytes, urns.CoderStringUTF8, urns.CoderLengthPrefix:
+               return func(r io.Reader) {
+                       l, _ := coder.DecodeVarInt(r)
+                       ioutilx.ReadN(r, int(l))
+               }
+       case urns.CoderVarInt:
+               return func(r io.Reader) {
+                       coder.DecodeVarInt(r)
+               }
+       case urns.CoderBool:
+               return func(r io.Reader) {
+                       coder.DecodeBool(r)
+               }
+       case urns.CoderDouble:
+               return func(r io.Reader) {
+                       coder.DecodeDouble(r)
+               }
+       case urns.CoderIterable:
+               ccids := c.GetComponentCoderIds()
+               ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
+               return func(r io.Reader) {
+                       l, _ := coder.DecodeInt32(r)
+                       for i := int32(0); i < l; i++ {
+                               ed(r)
+                       }
+               }
+
+       case urns.CoderKV:
+               ccids := c.GetComponentCoderIds()
+               kd := pullDecoderNoAlloc(coders[ccids[0]], coders)
+               vd := pullDecoderNoAlloc(coders[ccids[1]], coders)
+               return func(r io.Reader) {
+                       kd(r)
+                       vd(r)
+               }
+       case urns.CoderRow:
+               panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", 
prototext.Format(c)))
+       default:
+               panic(fmt.Sprintf("unknown coder urn key: %v", urn))
+       }
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
new file mode 100644
index 00000000000..ad6e3649628
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+       "bytes"
+       "encoding/binary"
+       "math"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+       "github.com/google/go-cmp/cmp"
+       "google.golang.org/protobuf/testing/protocmp"
+)
+
+func Test_isLeafCoder(t *testing.T) {
+       tests := []struct {
+               urn    string
+               isLeaf bool
+       }{
+               {urns.CoderBytes, true},
+               {urns.CoderStringUTF8, true},
+               {urns.CoderLengthPrefix, true},
+               {urns.CoderVarInt, true},
+               {urns.CoderDouble, true},
+               {urns.CoderBool, true},
+               {urns.CoderGlobalWindow, true},
+               {urns.CoderIntervalWindow, true},
+               {urns.CoderIterable, false},
+               {urns.CoderRow, false},
+               {urns.CoderKV, false},
+       }
+       for _, test := range tests {
+               undertest := &pipepb.Coder{
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: test.urn,
+                       },
+               }
+               if got, want := isLeafCoder(undertest), test.isLeaf; got != 
want {
+                       t.Errorf("isLeafCoder(%v) = %v, want %v", test.urn, 
got, want)
+               }
+       }
+}
+
+func Test_makeWindowedValueCoder(t *testing.T) {
+       coders := map[string]*pipepb.Coder{}
+
+       gotID := makeWindowedValueCoder("testPID", &pipepb.Components{
+               Pcollections: map[string]*pipepb.PCollection{
+                       "testPID": {CoderId: "testCoderID"},
+               },
+               Coders: map[string]*pipepb.Coder{
+                       "testCoderID": {
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: urns.CoderBool,
+                               },
+                       },
+               },
+       }, coders)
+
+       if gotID == "" {
+               t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty", 
gotID)
+       }
+       got := coders[gotID]
+       if got == nil {
+               t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry", 
gotID)
+       }
+       if got.GetSpec().GetUrn() != urns.CoderWindowedValue {
+               t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry", 
gotID)
+       }
+}
+
+func Test_makeWindowCoders(t *testing.T) {
+       tests := []struct {
+               urn    string
+               window typex.Window
+       }{
+               {urns.CoderGlobalWindow, window.GlobalWindow{}},
+               {urns.CoderIntervalWindow, window.IntervalWindow{
+                       Start: mtime.MinTimestamp,
+                       End:   mtime.MaxTimestamp,
+               }},
+       }
+       for _, test := range tests {
+               undertest := &pipepb.Coder{
+                       Spec: &pipepb.FunctionSpec{
+                               Urn: test.urn,
+                       },
+               }
+               dec, enc := makeWindowCoders(undertest)
+
+               // Validate we're getting a round trip coder.
+               var buf bytes.Buffer
+               if err := enc.EncodeSingle(test.window, &buf); err != nil {
+                       t.Errorf("encoder[%v].EncodeSingle(%v) = %v, want nil", 
test.urn, test.window, err)
+               }
+               got, err := dec.DecodeSingle(&buf)
+               if err != nil {
+                       t.Errorf("decoder[%v].DecodeSingle(%v) = %v, want nil", 
test.urn, test.window, err)
+               }
+
+               if want := test.window; got != want {
+                       t.Errorf("makeWindowCoders(%v) didn't round trip: got 
%v, want %v", test.urn, got, want)
+               }
+       }
+}
+
+func Test_lpUnknownCoders(t *testing.T) {
+       tests := []struct {
+               name         string
+               urn          string
+               components   []string
+               bundle, base map[string]*pipepb.Coder
+               want         map[string]*pipepb.Coder
+       }{
+               {"alreadyProcessed",
+                       urns.CoderBool, nil,
+                       map[string]*pipepb.Coder{
+                               "test": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{
+                               "test": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+               },
+               {"alreadyProcessedLP",
+                       urns.CoderBool, nil,
+                       map[string]*pipepb.Coder{
+                               "test_lp": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}},
+                               "test":    {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{
+                               "test_lp": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}},
+                               "test":    {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+               },
+               {"noNeedForLP",
+                       urns.CoderBool, nil,
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{
+                               "test": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+               },
+               {"needLP",
+                       urns.CoderRow, nil,
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{
+                               "test_lp": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}},
+                               "test":    {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderRow}},
+                       },
+               },
+               {"needLP_recurse",
+                       urns.CoderKV, []string{"k", "v"},
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{
+                               "k": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderRow}},
+                               "v": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+                       map[string]*pipepb.Coder{
+                               "test_lp": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k_lp", "v"}},
+                               "test":    {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+                               "k_lp":    {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}},
+                               "k":       {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderRow}},
+                               "v":       {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+               },
+               {"alreadyLP", urns.CoderLengthPrefix, []string{"k"},
+                       map[string]*pipepb.Coder{},
+                       map[string]*pipepb.Coder{
+                               "k": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderRow}},
+                       },
+                       map[string]*pipepb.Coder{
+                               "test": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}},
+                               "k":    {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderRow}},
+                       },
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       // Add the initial coder to base.
+                       test.base["test"] = &pipepb.Coder{
+                               Spec:              &pipepb.FunctionSpec{Urn: 
test.urn},
+                               ComponentCoderIds: test.components,
+                       }
+
+                       lpUnknownCoders("test", test.bundle, test.base)
+
+                       if d := cmp.Diff(test.want, test.bundle, 
protocmp.Transform()); d != "" {
+                               t.Fatalf("lpUnknownCoders(%v); (-want, 
+got):\n%v", test.urn, d)
+                       }
+               })
+       }
+}
+
+func Test_reconcileCoders(t *testing.T) {
+       tests := []struct {
+               name         string
+               bundle, base map[string]*pipepb.Coder
+               want         map[string]*pipepb.Coder
+       }{
+               {name: "noChanges",
+                       bundle: map[string]*pipepb.Coder{
+                               "a": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+                       base: map[string]*pipepb.Coder{
+                               "a": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                               "b": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBytes}},
+                               "c": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderStringUTF8}},
+                       },
+                       want: map[string]*pipepb.Coder{
+                               "a": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+               },
+               {name: "KV",
+                       bundle: map[string]*pipepb.Coder{
+                               "kv": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+                       },
+                       base: map[string]*pipepb.Coder{
+                               "kv": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+                               "k":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                               "v":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+                       want: map[string]*pipepb.Coder{
+                               "kv": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+                               "k":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                               "v":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                       },
+               },
+               {name: "KV-nested",
+                       bundle: map[string]*pipepb.Coder{
+                               "kv": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+                       },
+                       base: map[string]*pipepb.Coder{
+                               "kv": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+                               "k":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}},
+                               "v":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                               "a":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBytes}},
+                               "b":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderRow}},
+                               "c":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderStringUTF8}},
+                       },
+                       want: map[string]*pipepb.Coder{
+                               "kv": {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}},
+                               "k":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}},
+                               "v":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBool}},
+                               "a":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderBytes}},
+                               "b":  {Spec: &pipepb.FunctionSpec{Urn: 
urns.CoderRow}},
+                       },
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       reconcileCoders(test.bundle, test.base)
+
+                       if d := cmp.Diff(test.want, test.bundle, 
protocmp.Transform()); d != "" {
+                               t.Fatalf("reconcileCoders(...); (-want, 
+got):\n%v", d)
+                       }
+               })
+       }
+}
+
+func Test_pullDecoder(t *testing.T) {
+
+       doubleBytes := make([]byte, 8)
+       binary.BigEndian.PutUint64(doubleBytes, math.Float64bits(math.SqrtPi))
+
+       tests := []struct {
+               name   string
+               coder  *pipepb.Coder
+               coders map[string]*pipepb.Coder
+               input  []byte
+       }{
+               {
+                       "bytes",
+                       &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: urns.CoderBytes,
+                               },
+                       },
+                       map[string]*pipepb.Coder{},
+                       []byte{3, 1, 2, 3},
+               }, {
+                       "varint",
+                       &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: urns.CoderVarInt,
+                               },
+                       },
+                       map[string]*pipepb.Coder{},
+                       []byte{255, 3},
+               }, {
+                       "bool",
+                       &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: urns.CoderBool,
+                               },
+                       },
+                       map[string]*pipepb.Coder{},
+                       []byte{1},
+               }, {
+                       "double",
+                       &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: urns.CoderDouble,
+                               },
+                       },
+                       map[string]*pipepb.Coder{},
+                       doubleBytes,
+               }, {
+                       "iterable",
+                       &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: urns.CoderIterable,
+                               },
+                               ComponentCoderIds: []string{"elm"},
+                       },
+                       map[string]*pipepb.Coder{
+                               "elm": &pipepb.Coder{
+                                       Spec: &pipepb.FunctionSpec{
+                                               Urn: urns.CoderVarInt,
+                                       },
+                               },
+                       },
+                       []byte{4, 0, 1, 2, 3},
+               }, {
+                       "kv",
+                       &pipepb.Coder{
+                               Spec: &pipepb.FunctionSpec{
+                                       Urn: urns.CoderKV,
+                               },
+                               ComponentCoderIds: []string{"key", "value"},
+                       },
+                       map[string]*pipepb.Coder{
+                               "key": &pipepb.Coder{
+                                       Spec: &pipepb.FunctionSpec{
+                                               Urn: urns.CoderVarInt,
+                                       },
+                               },
+                               "value": &pipepb.Coder{
+                                       Spec: &pipepb.FunctionSpec{
+                                               Urn: urns.CoderBool,
+                                       },
+                               },
+                       },
+                       []byte{3, 0},
+               },
+       }
+       for _, test := range tests {
+               t.Run(test.name, func(t *testing.T) {
+                       dec := pullDecoder(test.coder, test.coders)
+                       buf := bytes.NewBuffer(test.input)
+                       got := dec(buf)
+                       if !bytes.EqualFold(test.input, got) {
+                               t.Fatalf("pullDecoder(%v)(...) = %v, want %v", 
test.coder, got, test.input)
+                       }
+               })
+       }
+}

Reply via email to