lostluck commented on a change in pull request #15400:
URL: https://github.com/apache/beam/pull/15400#discussion_r715909229
##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -450,8 +456,8 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node,
ac *coder.Coder, typ
}
inT := in.Type()
- if !typex.IsCoGBK(inT) {
- return nil, addContext(errors.Errorf("Combine requires CoGBK
type: %v", inT), s)
+ if !typex.IsCoGBK(inT) && s.Label == CombinePerKeyScope {
Review comment:
With the above changes, this change here is unnecessary since the GBK
type will be valid.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
##########
@@ -143,6 +143,10 @@ func (n *Combine) ProcessElement(ctx context.Context,
value *FullValue, values .
}
first := true
+ if len(values) == 0 {
+ return errors.Errorf("did not get values to combine for key %v,
ID %v", value.Elm, n.UID)
Review comment:
The lack of pre-GBK in the fallback case is why we ended up with this
error at all.
##########
File path: sdks/go/pkg/beam/combine.go
##########
@@ -39,12 +39,8 @@ func CombinePerKey(s Scope, combinefn interface{}, col
PCollection, opts ...Opti
// for multiple reasons, notably that the combinefn is not valid or cannot be
bound
// -- due to type mismatch, say -- to the incoming PCollections.
func TryCombine(s Scope, combinefn interface{}, col PCollection, opts
...Option) (PCollection, error) {
- pre := AddFixedKey(s, col)
- post, err := TryCombinePerKey(s, combinefn, pre, opts...)
- if err != nil {
- return PCollection{}, err
- }
- return DropKey(s, post), nil
+ s = s.Scope(graph.CombineGloballyScope)
Review comment:
I think I understand the other bugs that have been happening when a
runner doesn't understand the combine_globally urn. And now I feel like I've
wasted your time for a few weeks. Very sorry about that :/.
This change is removing the fallback behaviour from the pipeline graph for
unknown composites. The intent of that is to build up sub transforms that do
the same work as the URN that is implementing them.
The existing code for beam.Combine added a fixed key, then deferred to
CombinePerKey, which meant that downstream in graphx/translate.go it lost any
sense of the "global" nature of this particular transform. But also, I don't
think I've explained how CombinePerKey as a composite transform works. If you
read the original combine per key code, note that it always adds a
TryGroupByKey, and follows it by a NewCombine.
So in the case of a GlobalCombine, what we want is to
1. Add the "magic scope" (`graph.CombineGloballyScope`).
2. AddFixedKey + TryGroupByKey + NewCombine + DropKey
3. In graphx, we check and detect the magic scope and conditions (similarly
to how we're doing it for CombinePerKey).
It's very important that we have a composite 2 with all those contents,
since that's what needs to get executed by the runner if it doesn't understand
the URN at all. "NewCombine" by itself doesn't actually do any grouping.
Basically, in principal, there shouldn't be any need for the Direct runner
to change, (with new nodes or anything) since it's the kind of runner that will
rely on the fallbacks.
##########
File path: sdks/go/test/integration/primitives/teststream.go
##########
@@ -21,17 +21,18 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
)
Review comment:
The lack of built in / fallback GBK is probably why the Trigger tests
started failing hard: no point where a GBK as happening anymore, which is when
triggers occur.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]