lostluck commented on a change in pull request #15400:
URL: https://github.com/apache/beam/pull/15400#discussion_r715909229



##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -450,8 +456,8 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, 
ac *coder.Coder, typ
        }
 
        inT := in.Type()
-       if !typex.IsCoGBK(inT) {
-               return nil, addContext(errors.Errorf("Combine requires CoGBK 
type: %v", inT), s)
+       if !typex.IsCoGBK(inT) && s.Label == CombinePerKeyScope {

Review comment:
       With the above changes, this change here is unnecessary since the GBK 
type will be valid.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
##########
@@ -143,6 +143,10 @@ func (n *Combine) ProcessElement(ctx context.Context, 
value *FullValue, values .
        }
        first := true
 
+       if len(values) == 0 {
+               return errors.Errorf("did not get values to combine for key %v, 
ID %v", value.Elm, n.UID)

Review comment:
       The lack of pre-GBK in the fallback case is why we ended up with this 
error at all.

##########
File path: sdks/go/pkg/beam/combine.go
##########
@@ -39,12 +39,8 @@ func CombinePerKey(s Scope, combinefn interface{}, col 
PCollection, opts ...Opti
 // for multiple reasons, notably that the combinefn is not valid or cannot be 
bound
 // -- due to type mismatch, say -- to the incoming PCollections.
 func TryCombine(s Scope, combinefn interface{}, col PCollection, opts 
...Option) (PCollection, error) {
-       pre := AddFixedKey(s, col)
-       post, err := TryCombinePerKey(s, combinefn, pre, opts...)
-       if err != nil {
-               return PCollection{}, err
-       }
-       return DropKey(s, post), nil
+       s = s.Scope(graph.CombineGloballyScope)

Review comment:
       I think I understand the other bugs that have been happening when a 
runner doesn't understand the combine_globally urn. And now I feel like I've 
wasted your time for a few weeks. Very sorry about that :/.
   
   This change is removing the fallback behaviour from the pipeline graph for 
unknown composites. The intent of that is to build up sub transforms that do 
the same work as the URN that is implementing them.
   
   The existing code for beam.Combine added a fixed key, then deferred to 
CombinePerKey, which meant that downstream in graphx/translate.go it lost any 
sense of the "global" nature of this particular transform.  But also, I don't 
think I've explained how CombinePerKey as a composite transform works. If you 
read the original combine per key code, note that it always adds a 
TryGroupByKey, and follows it by a NewCombine.
   
   So in the case of a GlobalCombine, what we want is to 
   1. Add the "magic scope" (`graph.CombineGloballyScope`).
   2. AddFixedKey + TryGroupByKey + NewCombine + DropKey
   3. In graphx, we check and detect the magic scope and conditions (similarly 
to how we're doing it for CombinePerKey).
   
   It's very important that we have a composite 2 with all those contents, 
since that's what needs to get executed by the runner if it doesn't understand 
the URN at all. "NewCombine" by itself doesn't actually do any grouping.
   
   Basically, in principal, there shouldn't be any need for the Direct runner 
to change, (with new nodes or anything) since it's the kind of runner that will 
rely on the fallbacks.
   
   
   

##########
File path: sdks/go/test/integration/primitives/teststream.go
##########
@@ -21,17 +21,18 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
 )
 

Review comment:
       The lack of built in / fallback GBK is probably why the Trigger tests 
started failing hard: no point where a GBK as happening anymore, which is when 
triggers occur.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to