This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 95e4e5b [BEAM-7068] Improving error messages when binding functions
in Go SDK
new 21f1b0d Merge pull request #8298 from youngoli/beam7068
95e4e5b is described below
commit 95e4e5b03439f4b0f1742ceb05c5f41dcf901533
Author: Daniel Oliveira <[email protected]>
AuthorDate: Fri Apr 12 17:10:30 2019 -0700
[BEAM-7068] Improving error messages when binding functions in Go SDK
I improve the error messages by adding useful context wherever it comes
in handy, plus making the error messages more human-readable, for
example by printing out a function name instead of address.
---
sdks/go/pkg/beam/core/funcx/fn.go | 2 +-
sdks/go/pkg/beam/core/graph/bind.go | 58 +++++++++++++++++----------------
sdks/go/pkg/beam/core/graph/edge.go | 29 +++++++++--------
sdks/go/pkg/beam/core/typex/fulltype.go | 8 ++---
4 files changed, 50 insertions(+), 47 deletions(-)
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go
b/sdks/go/pkg/beam/core/funcx/fn.go
index 5cf6ba6..48129a5 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -220,7 +220,7 @@ func (u *Fn) Returns(mask ReturnKind) []int {
}
func (u *Fn) String() string {
- return fmt.Sprintf("%+v", *u)
+ return fmt.Sprintf("{Fn:{Name:%v Kind:%v} Param:%+v Ret:%+v}",
u.Fn.Name(), u.Fn.Type(), u.Param, u.Ret)
}
// New returns a Fn from a user function, if valid. Closures and dynamically
diff --git a/sdks/go/pkg/beam/core/graph/bind.go
b/sdks/go/pkg/beam/core/graph/bind.go
index 093c738..3aeb3d2 100644
--- a/sdks/go/pkg/beam/core/graph/bind.go
+++ b/sdks/go/pkg/beam/core/graph/bind.go
@@ -62,27 +62,27 @@ import (
func Bind(fn *funcx.Fn, typedefs map[string]reflect.Type, in
...typex.FullType) ([]typex.FullType, []InputKind, []typex.FullType,
[]typex.FullType, error) {
inbound, kinds, err := findInbound(fn, in...)
if err != nil {
- return nil, nil, nil, nil, err
+ return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v",
fn.Fn.Name(), err)
}
outbound, err := findOutbound(fn)
if err != nil {
- return nil, nil, nil, nil, err
+ return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v",
fn.Fn.Name(), err)
}
subst, err := typex.Bind(inbound, in)
if err != nil {
- return nil, nil, nil, nil, err
+ return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v",
fn.Fn.Name(), err)
}
for k, v := range typedefs {
- if _, exists := subst[k]; exists {
- return nil, nil, nil, nil, fmt.Errorf("type %v already
defined by fn", k)
+ if substK, exists := subst[k]; exists {
+ return nil, nil, nil, nil, fmt.Errorf("binding fn %v:
cannot substitute type %v with %v, already defined as %v", fn.Fn.Name(), k, v,
substK)
}
subst[k] = v
}
out, err := typex.Substitute(outbound, subst)
if err != nil {
- return nil, nil, nil, nil, err
+ return nil, nil, nil, nil, fmt.Errorf("binding fn %v: %v",
fn.Fn.Name(), err)
}
return inbound, kinds, outbound, out, nil
}
@@ -133,31 +133,33 @@ func findInbound(fn *funcx.Fn, in ...typex.FullType)
([]typex.FullType, []InputK
params := funcx.SubParams(fn.Param,
fn.Params(funcx.FnValue|funcx.FnIter|funcx.FnReIter)...)
index := 0
for _, input := range in {
- elm, kind, err := tryBindInbound(input, params[index:], index
== 0)
+ arity, err := inboundArity(input, index == 0)
if err != nil {
- return nil, nil, fmt.Errorf("failed to bind %v to input
%v: %v", fn, in, err)
+ return nil, nil, fmt.Errorf("binding params %v to input
%v: %v", params, input, err)
+ }
+ if len(params)-index < arity {
+ return nil, nil, fmt.Errorf("binding params %v to input
%v: too few params", params[index:], input)
+ }
+
+ paramsToBind := params[index : index+arity]
+ elm, kind, err := tryBindInbound(input, paramsToBind, index ==
0)
+ if err != nil {
+ return nil, nil, fmt.Errorf("binding params %v to input
%v: %v", paramsToBind, input, err)
}
inbound = append(inbound, elm)
kinds = append(kinds, kind)
- index += inboundArity(input, index == 0)
+ index += arity
}
if index < len(params) {
- return nil, nil, fmt.Errorf("found too few input to bind to %v:
%v. Forgot an input or to annotate options?", params, in)
+ return nil, nil, fmt.Errorf("binding params %v to inputs %v:
too few inputs. Forgot an input or to annotate options?", params, in)
}
if index > len(params) {
- return nil, nil, fmt.Errorf("found too many input to bind to
%v: %v", params, in)
+ return nil, nil, fmt.Errorf("binding params %v to inputs %v:
too many inputs", params, in)
}
return inbound, kinds, nil
}
func tryBindInbound(t typex.FullType, args []funcx.FnParam, isMain bool)
(typex.FullType, InputKind, error) {
- arity := inboundArity(t, isMain)
- if len(args) < arity {
- return nil, Main, fmt.Errorf("too few parameters to bind %v", t)
- }
-
- // log.Printf("Bind inbound %v to %v (main: %v)", candidate, args,
isMain)
-
kind := Main
var other typex.FullType
@@ -203,7 +205,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam,
isMain bool) (typex.
other = typex.New(trimmed[0])
default:
- panic(fmt.Sprintf("Unexpected param kind: %v",
arg))
+ return nil, kind, fmt.Errorf("unexpected param
kind: %v", arg)
}
}
case typex.Composite:
@@ -253,7 +255,7 @@ func tryBindInbound(t typex.FullType, args []funcx.FnParam,
isMain bool) (typex.
components := []typex.FullType{typex.New(args[0].T)}
- for i := 1; i < arity; i++ {
+ for i := 1; i < len(args); i++ {
switch args[i].Kind {
case funcx.FnIter:
values, _ := funcx.UnfoldIter(args[i].T)
@@ -277,11 +279,11 @@ func tryBindInbound(t typex.FullType, args
[]funcx.FnParam, isMain bool) (typex.
other = typex.NewCoGBK(components...)
default:
- panic("Unexpected inbound type")
+ return nil, kind, fmt.Errorf("unexpected inbound type:
%v", t.Type())
}
default:
- return nil, kind, fmt.Errorf("unexpected inbound type %v", t)
+ return nil, kind, fmt.Errorf("unexpected inbound class: %v",
t.Class())
}
if !typex.IsStructurallyAssignable(t, other) {
@@ -290,22 +292,22 @@ func tryBindInbound(t typex.FullType, args
[]funcx.FnParam, isMain bool) (typex.
return other, kind, nil
}
-func inboundArity(t typex.FullType, isMain bool) int {
+func inboundArity(t typex.FullType, isMain bool) (int, error) {
if t.Class() == typex.Composite {
switch t.Type() {
case typex.KVType:
if isMain {
- return 2
+ return 2, nil
}
// A KV side input must be a single iterator/map.
- return 1
+ return 1, nil
case typex.CoGBKType:
- return len(t.Components())
+ return len(t.Components()), nil
default:
- panic("Unexpected inbound type")
+ return 0, fmt.Errorf("unexpected composite inbound
type: %v", t.Type())
}
}
- return 1
+ return 1, nil
}
func trimIllegal(list []reflect.Type) []reflect.Type {
diff --git a/sdks/go/pkg/beam/core/graph/edge.go
b/sdks/go/pkg/beam/core/graph/edge.go
index 74eb94c..24b57fe 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -186,10 +186,11 @@ func (e *MultiEdge) String() string {
// NewCoGBK inserts a new CoGBK edge into the graph.
func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error) {
if len(ns) == 0 {
- return nil, fmt.Errorf("cogbk needs at least 1 input")
+ // TODO(BEAM-7086) Reduce the repetition in the context of all
the errors in this file.
+ return nil, fmt.Errorf("creating new CoGBK in scope %v: needs
at least 1 input", s)
}
if !typex.IsKV(ns[0].Type()) {
- return nil, fmt.Errorf("input type must be KV: %v", ns[0])
+ return nil, fmt.Errorf("creating new CoGBK in scope %v: input
type must be KV: %v", s, ns[0])
}
// (1) Create CoGBK result type: KV<T,U>, .., KV<T,Z> ->
CoGBK<T,U,..,Z>.
@@ -202,16 +203,16 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node)
(*MultiEdge, error) {
for i := 1; i < len(ns); i++ {
n := ns[i]
if !typex.IsKV(n.Type()) {
- return nil, fmt.Errorf("input type must be KV: %v", n)
+ return nil, fmt.Errorf("creating new CoGBK in scope %v:
input type must be KV: %v", s, n)
}
if !n.Coder.Components[0].Equals(c) {
- return nil, fmt.Errorf("key coder for %v is %v, want
%v", n, n.Coder.Components[0], c)
+ return nil, fmt.Errorf("creating new CoGBK in scope %v:
key coder for %v is %v, want %v", s, n, n.Coder.Components[0], c)
}
if !w.Equals(n.WindowingStrategy()) {
- return nil, fmt.Errorf("mismatched cogbk windowing
strategies: %v, want %v", n.WindowingStrategy(), w)
+ return nil, fmt.Errorf("creating new CoGBK in scope %v:
mismatched CoGBK windowing strategies: %v, want %v", s, n.WindowingStrategy(),
w)
}
if bounded != n.Bounded() {
- return nil, fmt.Errorf("unmatched cogbk boundedness:
%v, want %v", n.Bounded(), bounded)
+ return nil, fmt.Errorf("creating new CoGBK in scope %v:
unmatched CoGBK boundedness: %v, want %v", s, n.Bounded(), bounded)
}
comp = append(comp, n.Type().Components()[1])
@@ -235,7 +236,7 @@ func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge,
error) {
// the shared input type.
func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error) {
if len(in) < 2 {
- return nil, fmt.Errorf("flatten needs at least 2 input, got
%v", len(in))
+ return nil, fmt.Errorf("creating new Flatten in scope %v:
Flatten needs at least 2 input, got %v", s, len(in))
}
t := in[0].Type()
w := inputWindow(in)
@@ -251,14 +252,14 @@ func NewFlatten(g *Graph, s *Scope, in []*Node)
(*MultiEdge, error) {
}
for _, n := range in {
if !typex.IsEqual(t, n.Type()) {
- return nil, fmt.Errorf("mismatched flatten input types:
%v, want %v", n.Type(), t)
+ return nil, fmt.Errorf("creating new Flatten in scope
%v: mismatched Flatten input types: %v, want %v", s, n.Type(), t)
}
if !w.Equals(n.WindowingStrategy()) {
- return nil, fmt.Errorf("mismatched flatten window
types: %v, want %v", n.WindowingStrategy(), w)
+ return nil, fmt.Errorf("creating new Flatten in scope
%v: mismatched Flatten window types: %v, want %v", s, n.WindowingStrategy(), w)
}
}
if typex.IsCoGBK(t) {
- return nil, fmt.Errorf("flatten input type cannot be CoGBK:
%v", t)
+ return nil, fmt.Errorf("creating new Flatten in scope %v:
Flatten input type cannot be CoGBK: %v", s, t)
}
edge := g.NewEdge(s)
@@ -299,7 +300,7 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in
[]*Node, typedefs ma
inbound, kinds, outbound, out, err := Bind(u.ProcessElementFn(),
typedefs, NodeTypes(in)...)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("creating new DoFn in scope %v: %v", s,
err)
}
edge := g.NewEdge(s)
@@ -328,10 +329,10 @@ const CombinePerKeyScope = "CombinePerKey"
func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder)
(*MultiEdge, error) {
inT := in.Type()
if !typex.IsCoGBK(inT) {
- return nil, fmt.Errorf("combine requires CoGBK type: %v", inT)
+ return nil, fmt.Errorf("creating new Combine in scope %v:
Combine requires CoGBK type: %v", s, inT)
}
if len(inT.Components()) > 2 {
- return nil, fmt.Errorf("combine cannot follow multi-input
CoGBK: %v", inT)
+ return nil, fmt.Errorf("creating new Combine in scope %v:
Combine cannot follow multi-input CoGBK: %v", s, inT)
}
// Create a synthetic function for binding purposes. It takes main input
@@ -380,7 +381,7 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node,
ac *coder.Coder) (*M
inbound, kinds, outbound, out, err := Bind(synth, nil, inT)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("creating new Combine in scope %v: %v",
s, err)
}
edge := g.NewEdge(s)
diff --git a/sdks/go/pkg/beam/core/typex/fulltype.go
b/sdks/go/pkg/beam/core/typex/fulltype.go
index e40aa7a..5066023 100644
--- a/sdks/go/pkg/beam/core/typex/fulltype.go
+++ b/sdks/go/pkg/beam/core/typex/fulltype.go
@@ -298,7 +298,7 @@ func IsBound(t FullType) bool {
// produce {"T" -> string}.
func Bind(types, models []FullType) (map[string]reflect.Type, error) {
if len(types) != len(models) {
- return nil, fmt.Errorf("invalid number of modes: %v, want %v",
len(models), len(types))
+ return nil, fmt.Errorf("typex.Bind: invalid number of models:
%v, want %v", len(models), len(types))
}
m := make(map[string]reflect.Type)
@@ -307,7 +307,7 @@ func Bind(types, models []FullType)
(map[string]reflect.Type, error) {
model := models[i]
if !IsStructurallyAssignable(model, t) {
- return nil, fmt.Errorf("%v is not assignable to %v",
model, t)
+ return nil, fmt.Errorf("typex.Bind: %v is not
assignable to %v", model, t)
}
if err := walk(t, model, m); err != nil {
return nil, err
@@ -363,7 +363,7 @@ func substitute(t FullType, m map[string]reflect.Type)
(FullType, error) {
name := t.Type().Name()
repl, ok := m[name]
if !ok {
- return nil, fmt.Errorf("type variable not bound: %v",
name)
+ return nil, fmt.Errorf("substituting type %v: type not
bound", name)
}
return New(repl), nil
case Container:
@@ -374,7 +374,7 @@ func substitute(t FullType, m map[string]reflect.Type)
(FullType, error) {
if IsList(t.Type()) {
return New(reflect.SliceOf(comp[0].Type()), comp...),
nil
}
- panic(fmt.Sprintf("Unexpected aggregate: %v", t))
+ return nil, fmt.Errorf("unexpected aggregate %v, only slices
allowed", t)
case Composite:
comp, err := substituteList(t.Components(), m)
if err != nil {