[
https://issues.apache.org/jira/browse/BEAM-3301?focusedWorklogId=395344&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-395344
]
ASF GitHub Bot logged work on BEAM-3301:
----------------------------------------
Author: ASF GitHub Bot
Created on: 29/Feb/20 00:03
Start Date: 29/Feb/20 00:03
Worklog Time Spent: 10m
Work Description: lostluck commented on pull request #10991: [BEAM-3301]
Refactor DoFn validation & allow specifying main inputs.
URL: https://github.com/apache/beam/pull/10991#discussion_r385893620
##########
File path: sdks/go/pkg/beam/core/graph/fn.go
##########
@@ -209,21 +209,58 @@ func (f *DoFn) RestrictionT() *reflect.Type {
// a KV or not based on the other signatures (unless we're more loose about
which
// sideinputs are present). Bind should respect that.
+// Constants so we can avoid magic numbers in validation. Represent number of
+// DoFn main inputs based on what kind of input the DoFn has.
+const (
+ unknownInNum = -1 // Used when we don't know the number of main inputs.
+ singleInNum = 1
+ kvInNum = 2
+)
+
// NewDoFn constructs a DoFn from the given value, if possible.
func NewDoFn(fn interface{}) (*DoFn, error) {
ret, err := NewFn(fn)
if err != nil {
return nil, errors.WithContext(errors.Wrapf(err, "invalid
DoFn"), "constructing DoFn")
}
- return AsDoFn(ret)
+ return AsDoFn(ret, unknownInNum)
}
-// AsDoFn converts a Fn to a DoFn, if possible.
-func AsDoFn(fn *Fn) (*DoFn, error) {
+// NewDoFnKv constructs a DoFn from the given value, if possible, with
+// improved validation from knowing whether the DoFn's main input is a KV or
+// single element.
+func NewDoFnKv(fn interface{}, mainKv bool) (*DoFn, error) {
+ ret, err := NewFn(fn)
+ if err != nil {
+ return nil, errors.WithContext(errors.Wrapf(err, "invalid
DoFn"), "constructing DoFn")
+ }
+
+ if mainKv {
+ return AsDoFn(ret, kvInNum)
+ } else {
+ return AsDoFn(ret, singleInNum)
+ }
+}
+
+// AsDoFn converts a Fn to a DoFn, if possible. numMainIn specifies how many
+// main inputs are expected in the DoFn's method signatures. Valid values are
+// -1 (unknown), 1 (single elements), or 2 (KVs). If the value is unknown then
+// validation is done by best effort and may miss some edge cases.
+func AsDoFn(fn *Fn, numMainIn int) (*DoFn, error) {
addContext := func(err error, fn *Fn) error {
return errors.WithContextf(err, "graph.AsDoFn: for Fn named
%v", fn.Name())
}
+ // Validate numMainIn. This check should match this method's comment.
+ if numMainIn != unknownInNum &&
+ numMainIn != singleInNum &&
+ numMainIn != kvInNum {
+ err := errors.Errorf("invalid number of main inputs given. "+
+ "Got: %v, Want: One of the following: %v",
+ processElementName, []int{unknownInNum, singleInNum,
kvInNum})
+ return nil, addContext(err, fn)
+ }
Review comment:
Consider a switch instead.
```suggestion
switch numMainIn {
case unknownInNum, singleInNum, kvInNum: // Valid
default: // Invalid
err := errors.Errorf("invalid number of main inputs given. "+
"Got: %v, Want: One of the following: %v",
processElementName, []int{unknownInNum, singleInNum,
kvInNum})
return nil, addContext(err, fn)
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 395344)
Time Spent: 2h (was: 1h 50m)
> Go SplittableDoFn support
> -------------------------
>
> Key: BEAM-3301
> URL: https://issues.apache.org/jira/browse/BEAM-3301
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Henning Rohde
> Assignee: Daniel Oliveira
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> SDFs will be the only way to add streaming and liquid sharded IO for Go.
> Design doc: https://s.apache.org/splittable-do-fn
--
This message was sent by Atlassian Jira
(v8.3.4#803005)