This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 95e4e5b  [BEAM-7068] Improving error messages when binding functions 
in Go SDK
     new 21f1b0d  Merge pull request #8298 from youngoli/beam7068
95e4e5b is described below

commit 95e4e5b03439f4b0f1742ceb05c5f41dcf901533
Author: Daniel Oliveira <[email protected]>
AuthorDate: Fri Apr 12 17:10:30 2019 -0700

    [BEAM-7068] Improving error messages when binding functions in Go SDK
    
    I improve the error messages by adding useful context wherever it comes
    in handy, plus making the error messages more human-readable, for
    example by printing out a function name instead of address.
---
 sdks/go/pkg/beam/core/funcx/fn.go       |  2 +-
 sdks/go/pkg/beam/core/graph/bind.go     | 58 +++++++++++++++++----------------
 sdks/go/pkg/beam/core/graph/edge.go     | 29 +++++++++--------
 sdks/go/pkg/beam/core/typex/fulltype.go |  8 ++---
 4 files changed, 50 insertions(+), 47 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index 5cf6ba6..48129a5 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -220,7 +220,7 @@ func (u *Fn) Returns(mask ReturnKind) []int {
 }
 
 func (u *Fn) String() string {
-       return fmt.Sprintf("%+v", *u)
+       return fmt.Sprintf("{Fn:{Name:%v Kind:%v} Param:%+v Ret:%+v}", 
u.Fn.Name(), u.Fn.Type(), u.Param, u.Ret)
 }
 
 // New returns a Fn from a user function, if valid. Closures and dynamically
diff --git a/sdks/go/pkg/beam/core/graph/bind.go 
b/sdks/go/pkg/beam/core/graph/bind.go
index 093c738..3aeb3d2 100644
--- a/sdks/go/pkg/beam/core/graph/bind.go
+++ b/sdks/go/pkg/beam/core/graph/bind.go
@@ -62,27 +62,27 @@ import (
 func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in 
...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType, 
[]typex.FullType, error) {
        inbound, kinds, err := findInbound(fn, in...)
        if err != nil {
-               return nil, nil, nil, nil, err
+               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
        }
        outbound, err := findOutbound(fn)
        if err != nil {
-               return nil, nil, nil, nil, err
+               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
        }
 
        subst, err := typex.Bind(inbound, in)
        if err != nil {
-               return nil, nil, nil, nil, err
+               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
        }
        for k, v := range typedefs {
-               if _, exists := subst[k]; exists {
-                       return nil, nil, nil, nil, fmt.Errorf("type %v already 
defined by fn", k)
+               if substK, exists := subst[k]; exists {
+                       return nil, nil, nil, nil, fmt.Errorf("binding fn %v: 
cannot substitute type %v with %v, already defined as %v", fn.Fn.Name(), k, v, 
substK)
                }
                subst[k] = v
        }
 
        out, err := typex.Substitute(outbound, subst)
        if err != nil {
-               return nil, nil, nil, nil, err
+               return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v", 
fn.Fn.Name(), err)
        }
        return inbound, kinds, outbound, out, nil
 }
@@ -133,31 +133,33 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType) 
([]typex.FullType, []InputK
        params := funcx.SubParams(fn.Param, 
fn.Params(funcx.FnValue|funcx.FnIter|funcx.FnReIter)...)
        index := 0
        for _, input := range in {
-               elm, kind, err := tryBindInbound(input, params[index:], index 
== 0)
+               arity, err := inboundArity(input, index == 0)
                if err != nil {
-                       return nil, nil, fmt.Errorf("failed to bind %v to input 
%v: %v", fn, in, err)
+                       return nil, nil, fmt.Errorf("binding params %v to input 
%v: %v", params, input, err)
+               }
+               if len(params)-index < arity {
+                       return nil, nil, fmt.Errorf("binding params %v to input 
%v: too few params", params[index:], input)
+               }
+
+               paramsToBind := params[index : index+arity]
+               elm, kind, err := tryBindInbound(input, paramsToBind, index == 
0)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("binding params %v to input 
%v: %v", paramsToBind, input, err)
                }
                inbound = append(inbound, elm)
                kinds = append(kinds, kind)
-               index += inboundArity(input, index == 0)
+               index += arity
        }
        if index < len(params) {
-               return nil, nil, fmt.Errorf("found too few input to bind to %v: 
%v. Forgot an input or to annotate options?", params, in)
+               return nil, nil, fmt.Errorf("binding params %v to inputs %v: 
too few inputs. Forgot an input or to annotate options?", params, in)
        }
        if index > len(params) {
-               return nil, nil, fmt.Errorf("found too many input to bind to 
%v: %v", params, in)
+               return nil, nil, fmt.Errorf("binding params %v to inputs %v: 
too many inputs", params, in)
        }
        return inbound, kinds, nil
 }
 
 func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool) 
(typex.FullType, InputKind, error) {
-       arity := inboundArity(t, isMain)
-       if len(args) < arity {
-               return nil, Main, fmt.Errorf("too few parameters to bind %v", t)
-       }
-
-       // log.Printf("Bind inbound %v to %v (main: %v)", candidate, args, 
isMain)
-
        kind := Main
        var other typex.FullType
 
@@ -203,7 +205,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, 
isMain bool) (typex.
                                other = typex.New(trimmed[0])
 
                        default:
-                               panic(fmt.Sprintf("Unexpected param kind: %v", 
arg))
+                               return nil, kind, fmt.Errorf("unexpected param 
kind: %v", arg)
                        }
                }
        case typex.Composite:
@@ -253,7 +255,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam, 
isMain bool) (typex.
 
                        components := []typex.FullType{typex.New(args[0].T)}
 
-                       for i := 1; i < arity; i++ {
+                       for i := 1; i < len(args); i++ {
                                switch args[i].Kind {
                                case funcx.FnIter:
                                        values, _ := funcx.UnfoldIter(args[i].T)
@@ -277,11 +279,11 @@ func tryBindInbound(t typex.FullType, args 
[]funcx.FnParam, isMain bool) (typex.
                        other = typex.NewCoGBK(components...)
 
                default:
-                       panic("Unexpected inbound type")
+                       return nil, kind, fmt.Errorf("unexpected inbound type: 
%v", t.Type())
                }
 
        default:
-               return nil, kind, fmt.Errorf("unexpected inbound type %v", t)
+               return nil, kind, fmt.Errorf("unexpected inbound class: %v", 
t.Class())
        }
 
        if !typex.IsStructurallyAssignable(t, other) {
@@ -290,22 +292,22 @@ func tryBindInbound(t typex.FullType, args 
[]funcx.FnParam, isMain bool) (typex.
        return other, kind, nil
 }
 
-func inboundArity(t typex.FullType, isMain bool) int {
+func inboundArity(t typex.FullType, isMain bool) (int, error) {
        if t.Class() == typex.Composite {
                switch t.Type() {
                case typex.KVType:
                        if isMain {
-                               return 2
+                               return 2, nil
                        }
                        // A KV side input must be a single iterator/map.
-                       return 1
+                       return 1, nil
                case typex.CoGBKType:
-                       return len(t.Components())
+                       return len(t.Components()), nil
                default:
-                       panic("Unexpected inbound type")
+                       return 0, fmt.Errorf("unexpected composite inbound 
type: %v", t.Type())
                }
        }
-       return 1
+       return 1, nil
 }
 
 func trimIllegal(list []reflect.Type) []reflect.Type {
diff --git a/sdks/go/pkg/beam/core/graph/edge.go 
b/sdks/go/pkg/beam/core/graph/edge.go
index 74eb94c..24b57fe 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -186,10 +186,11 @@ func (e *MultiEdge) String() string {
 // NewCoGBK inserts a new CoGBK edge into the graph.
 func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
        if len(ns) == 0 {
-               return nil, fmt.Errorf("cogbk needs at least 1 input")
+               // TODO(BEAM-7086) Reduce the repetition in the context of all 
the errors in this file.
+               return nil, fmt.Errorf("creating new CoGBK in scope %v: needs 
at least 1 input", s)
        }
        if !typex.IsKV(ns[0].Type()) {
-               return nil, fmt.Errorf("input type must be KV: %v", ns[0])
+               return nil, fmt.Errorf("creating new CoGBK in scope %v: input 
type must be KV: %v", s, ns[0])
        }
 
        // (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> -> 
CoGBK<T,U,..,Z>.
@@ -202,16 +203,16 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) 
(*MultiEdge, error) {
        for i := 1; i < len(ns); i++ {
                n := ns[i]
                if !typex.IsKV(n.Type()) {
-                       return nil, fmt.Errorf("input type must be KV: %v", n)
+                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
input type must be KV: %v", s, n)
                }
                if !n.Coder.Components[0].Equals(c) {
-                       return nil, fmt.Errorf("key coder for %v is %v, want 
%v", n, n.Coder.Components[0], c)
+                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
key coder for %v is %v, want %v", s, n, n.Coder.Components[0], c)
                }
                if !w.Equals(n.WindowingStrategy()) {
-                       return nil, fmt.Errorf("mismatched cogbk windowing 
strategies: %v, want %v", n.WindowingStrategy(), w)
+                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
mismatched CoGBK windowing strategies: %v, want %v", s, n.WindowingStrategy(), 
w)
                }
                if bounded != n.Bounded() {
-                       return nil, fmt.Errorf("unmatched cogbk boundedness: 
%v, want %v", n.Bounded(), bounded)
+                       return nil, fmt.Errorf("creating new CoGBK in scope %v: 
unmatched CoGBK boundedness: %v, want %v", s, n.Bounded(), bounded)
                }
 
                comp = append(comp, n.Type().Components()[1])
@@ -235,7 +236,7 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, 
error) {
 // the shared input type.
 func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
        if len(in) < 2 {
-               return nil, fmt.Errorf("flatten needs at least 2 input, got 
%v", len(in))
+               return nil, fmt.Errorf("creating new Flatten in scope %v: 
Flatten needs at least 2 input, got %v", s, len(in))
        }
        t := in[0].Type()
        w := inputWindow(in)
@@ -251,14 +252,14 @@ func NewFlatten(g *Graph, s *Scope, in []*Node) 
(*MultiEdge, error) {
        }
        for _, n := range in {
                if !typex.IsEqual(t, n.Type()) {
-                       return nil, fmt.Errorf("mismatched flatten input types: 
%v, want %v", n.Type(), t)
+                       return nil, fmt.Errorf("creating new Flatten in scope 
%v: mismatched Flatten input types: %v, want %v", s, n.Type(), t)
                }
                if !w.Equals(n.WindowingStrategy()) {
-                       return nil, fmt.Errorf("mismatched flatten window 
types: %v, want %v", n.WindowingStrategy(), w)
+                       return nil, fmt.Errorf("creating new Flatten in scope 
%v: mismatched Flatten window types: %v, want %v", s, n.WindowingStrategy(), w)
                }
        }
        if typex.IsCoGBK(t) {
-               return nil, fmt.Errorf("flatten input type cannot be CoGBK: 
%v", t)
+               return nil, fmt.Errorf("creating new Flatten in scope %v: 
Flatten input type cannot be CoGBK: %v", s, t)
        }
 
        edge := g.NewEdge(s)
@@ -299,7 +300,7 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in 
[]*Node, typedefs ma
 
        inbound, kinds, outbound, out, err := Bind(u.ProcessElementFn(), 
typedefs, NodeTypes(in)...)
        if err != nil {
-               return nil, err
+               return nil, fmt.Errorf("creating new DoFn in scope %v: %v", s, 
err)
        }
 
        edge := g.NewEdge(s)
@@ -328,10 +329,10 @@ const CombinePerKeyScope = "CombinePerKey"
 func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder) 
(*MultiEdge, error) {
        inT := in.Type()
        if !typex.IsCoGBK(inT) {
-               return nil, fmt.Errorf("combine requires CoGBK type: %v", inT)
+               return nil, fmt.Errorf("creating new Combine in scope %v: 
Combine requires CoGBK type: %v", s, inT)
        }
        if len(inT.Components()) > 2 {
-               return nil, fmt.Errorf("combine cannot follow multi-input 
CoGBK: %v", inT)
+               return nil, fmt.Errorf("creating new Combine in scope %v: 
Combine cannot follow multi-input CoGBK: %v", s, inT)
        }
 
        // Create a synthetic function for binding purposes. It takes main input
@@ -380,7 +381,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, 
ac *coder.Coder) (*M
 
        inbound, kinds, outbound, out, err := Bind(synth, nil, inT)
        if err != nil {
-               return nil, err
+               return nil, fmt.Errorf("creating new Combine in scope %v: %v", 
s, err)
        }
 
        edge := g.NewEdge(s)
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go 
b/sdks/go/pkg/beam/core/typex/fulltype.go
index e40aa7a..5066023 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -298,7 +298,7 @@ func IsBound(t FullType) bool {
 // produce {"T" -> string}.
 func Bind(types, models []FullType) (map[string]reflect.Type, error) {
        if len(types) != len(models) {
-               return nil, fmt.Errorf("invalid number of modes: %v, want %v", 
len(models), len(types))
+               return nil, fmt.Errorf("typex.Bind: invalid number of models: 
%v, want %v", len(models), len(types))
        }
 
        m := make(map[string]reflect.Type)
@@ -307,7 +307,7 @@ func Bind(types, models []FullType) 
(map[string]reflect.Type, error) {
                model := models[i]
 
                if !IsStructurallyAssignable(model, t) {
-                       return nil, fmt.Errorf("%v is not assignable to %v", 
model, t)
+                       return nil, fmt.Errorf("typex.Bind: %v is not 
assignable to %v", model, t)
                }
                if err := walk(t, model, m); err != nil {
                        return nil, err
@@ -363,7 +363,7 @@ func substitute(t FullType, m map[string]reflect.Type) 
(FullType, error) {
                name := t.Type().Name()
                repl, ok := m[name]
                if !ok {
-                       return nil, fmt.Errorf("type variable not bound: %v", 
name)
+                       return nil, fmt.Errorf("substituting type %v: type not 
bound", name)
                }
                return New(repl), nil
        case Container:
@@ -374,7 +374,7 @@ func substitute(t FullType, m map[string]reflect.Type) 
(FullType, error) {
                if IsList(t.Type()) {
                        return New(reflect.SliceOf(comp[0].Type()), comp...), 
nil
                }
-               panic(fmt.Sprintf("Unexpected aggregate: %v", t))
+               return nil, fmt.Errorf("unexpected aggregate %v, only slices 
allowed", t)
        case Composite:
                comp, err := substituteList(t.Components(), m)
                if err != nil {

Reply via email to