This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8b213c6 [BEAM-11095] Better error handling for iter/reiter/multimap
(#16794)
8b213c6 is described below
commit 8b213c617ef8cf3a077bb0002b6b0fec8e85cb05
Author: Danny McCormick <[email protected]>
AuthorDate: Wed Feb 9 21:54:03 2022 -0500
[BEAM-11095] Better error handling for iter/reiter/multimap (#16794)
---
sdks/go/pkg/beam/core/funcx/fn.go | 10 +++-
sdks/go/pkg/beam/core/funcx/fn_test.go | 15 +++++
sdks/go/pkg/beam/core/funcx/sideinput.go | 95 ++++++++++++++++++++++++++------
3 files changed, 101 insertions(+), 19 deletions(-)
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go
b/sdks/go/pkg/beam/core/funcx/fn.go
index cfa66a0..c1bbea5 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -350,7 +350,15 @@ func New(fn reflectx.Func) (*Fn, error) {
if ok, err := IsMalformedEmit(t); ok {
return nil, errors.Wrapf(err, "bad parameter
type for %s: %v", fn.Name(), t)
}
- // TODO(damccorm) 2022.02.08: Handle IsMalformed[Iter,
ReIter, MultiMap] cases (BEAM-11095)
+ if ok, err := IsMalformedIter(t); ok {
+ return nil, errors.Wrapf(err, "bad parameter
type for %s: %v", fn.Name(), t)
+ }
+ if ok, err := IsMalformedReIter(t); ok {
+ return nil, errors.Wrapf(err, "bad parameter
type for %s: %v", fn.Name(), t)
+ }
+ if ok, err := IsMalformedMultiMap(t); ok {
+ return nil, errors.Wrapf(err, "bad parameter
type for %s: %v", fn.Name(), t)
+ }
return nil, errors.Errorf("bad parameter type for %s:
%v", fn.Name(), t)
}
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go
b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 71b73ff..78fd3a1 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -232,6 +232,21 @@ func TestNew(t *testing.T) {
},
Err: errors.New(errIllegalParametersInEmit),
},
+ {
+ Name: "errIllegalParametersInIter - malformed Iter",
+ Fn: func(int, func(*nonConcreteType) bool, func(*int,
*string) bool) {},
+ Err: errors.New(errIllegalParametersInIter),
+ },
+ {
+ Name: "errIllegalParametersInIter - malformed ReIter",
+ Fn: func(int, func() func(*nonConcreteType) bool,
func(*int, *string) bool) {},
+ Err: errors.New(errIllegalParametersInReIter),
+ },
+ {
+ Name: "errIllegalParametersInMultiMap - malformed
MultiMap",
+ Fn: func(int, func(string) func(*nonConcreteType)
bool) {},
+ Err: errors.New(errIllegalParametersInMultiMap),
+ },
}
for _, test := range tests {
diff --git a/sdks/go/pkg/beam/core/funcx/sideinput.go
b/sdks/go/pkg/beam/core/funcx/sideinput.go
index e1f328a..168ff3c 100644
--- a/sdks/go/pkg/beam/core/funcx/sideinput.go
+++ b/sdks/go/pkg/beam/core/funcx/sideinput.go
@@ -20,6 +20,13 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+)
+
+var (
+ errIllegalParametersInIter = "All parameters in an iter must be
universal type, container type, or concrete type"
+ errIllegalParametersInReIter = "Output of a reiter must be valid iter
type"
+ errIllegalParametersInMultiMap = "Output of a multimap must be valid
iter type"
)
// IsIter returns true iff the supplied type is a "single sweep functional
iterator".
@@ -30,10 +37,20 @@ import (
// will be copied into the supplied pointers. The function returns true if
// data was copied, and false if there is no more data available.
func IsIter(t reflect.Type) bool {
- _, ok := UnfoldIter(t)
+ _, ok, _ := unfoldIter(t)
return ok
}
+// IsMalformedIter returns true iff the supplied type is an illegal "single
sweep
+// functional iterator" and an error explaining why it is illegal. For example,
+// an iterator is not legal if one of its parameters is not concrete,
universal, or
+// a container type. If the type does not have the structure of an iter or it
is a
+// legal iter, IsMalformedIter returns false and no error.
+func IsMalformedIter(t reflect.Type) (bool, error) {
+ _, _, err := unfoldIter(t)
+ return err != nil, err
+}
+
// UnfoldIter returns the parameter types, if a single sweep functional
// iterator. For example:
//
@@ -42,15 +59,20 @@ func IsIter(t reflect.Type) bool {
// func (*typex.EventTime, *int) bool returns {typex.EventTime, int}
//
func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) {
+ types, ok, _ := unfoldIter(t)
+ return types, ok
+}
+
+func unfoldIter(t reflect.Type) ([]reflect.Type, bool, error) {
if t.Kind() != reflect.Func {
- return nil, false
+ return nil, false, nil
}
if t.NumOut() != 1 || t.Out(0) != reflectx.Bool {
- return nil, false
+ return nil, false, nil
}
if t.NumIn() == 0 {
- return nil, false
+ return nil, false, nil
}
var ret []reflect.Type
@@ -60,23 +82,26 @@ func UnfoldIter(t reflect.Type) ([]reflect.Type, bool) {
skip = 1
}
if t.NumIn()-skip > 2 || t.NumIn() == skip {
- return nil, false
+ return nil, false, nil
}
for i := skip; i < t.NumIn(); i++ {
- if !isOutParam(t.In(i)) {
- return nil, false
+ if ok, err := isOutParam(t.In(i)); !ok {
+ return nil, false, errors.Wrap(err,
errIllegalParametersInIter)
}
ret = append(ret, t.In(i).Elem())
}
- return ret, true
+ return ret, true, nil
}
-func isOutParam(t reflect.Type) bool {
+func isOutParam(t reflect.Type) (bool, error) {
if t.Kind() != reflect.Ptr {
- return false
+ return false, errors.Errorf("Type %v of kind %v not allowed,
must be ptr type", t, t.Kind())
}
- return typex.IsConcrete(t.Elem()) || typex.IsUniversal(t.Elem()) ||
typex.IsContainer(t.Elem())
+ if typex.IsUniversal(t.Elem()) || typex.IsContainer(t.Elem()) {
+ return true, nil
+ }
+ return typex.CheckConcrete(t.Elem())
}
// IsReIter returns true iff the supplied type is a functional iterator
generator.
@@ -88,15 +113,34 @@ func IsReIter(t reflect.Type) bool {
return ok
}
+// IsMalformedReIter returns true iff the supplied type is an illegal
functional
+// iterator generator and an error explaining why it is illegal. An iterator
generator
+// is not legal if its output is not of type iterator. If the type does not
+// have the structure of an iterator generator or it is a legal iterator
generator,
+// IsMalformedReIter returns false and no error.
+func IsMalformedReIter(t reflect.Type) (bool, error) {
+ _, _, err := unfoldReIter(t)
+ return err != nil, err
+}
+
// UnfoldReIter returns the parameter types, if a functional iterator
generator.
func UnfoldReIter(t reflect.Type) ([]reflect.Type, bool) {
+ types, ok, _ := unfoldReIter(t)
+ return types, ok
+}
+
+func unfoldReIter(t reflect.Type) ([]reflect.Type, bool, error) {
if t.Kind() != reflect.Func {
- return nil, false
+ return nil, false, nil
}
if t.NumIn() != 0 || t.NumOut() != 1 {
- return nil, false
+ return nil, false, nil
+ }
+ types, ok, err := unfoldIter(t.Out(0))
+ if err != nil {
+ err = errors.Wrap(err, errIllegalParametersInReIter)
}
- return UnfoldIter(t.Out(0))
+ return types, ok, err
}
// IsMultiMap returns true iff the supplied type is a keyed functional iterator
@@ -109,17 +153,32 @@ func IsMultiMap(t reflect.Type) bool {
return ok
}
+// IsMalformedMultiMap returns true iff the supplied type is an illegal keyed
functional
+// iterator generator and an error explaining why it is illegal. A keyed
iterator generator
+// is not legal if its output is not of type iterator. If the type does not
have the
+// structure of a keyed iterator generator or it is a legal iterator generator,
+// IsMalformedMultiMap returns false and no error.
+func IsMalformedMultiMap(t reflect.Type) (bool, error) {
+ _, _, err := unfoldMultiMap(t)
+ return err != nil, err
+}
+
// UnfoldMultiMap returns the parameter types for the input key and the output
// values iff the type is a keyed functional iterator generator.
func UnfoldMultiMap(t reflect.Type) ([]reflect.Type, bool) {
+ types, ok, _ := unfoldMultiMap(t)
+ return types, ok
+}
+
+func unfoldMultiMap(t reflect.Type) ([]reflect.Type, bool, error) {
if t.Kind() != reflect.Func {
- return nil, false
+ return nil, false, nil
}
if t.NumIn() != 1 || t.NumOut() != 1 {
- return nil, false
+ return nil, false, nil
}
types := []reflect.Type{t.In(0)}
- iterTypes, is := UnfoldIter(t.Out(0))
+ iterTypes, is, err := unfoldIter(t.Out(0))
types = append(types, iterTypes...)
- return types, is
+ return types, is, errors.Wrap(err, errIllegalParametersInMultiMap)
}