[ 
https://issues.apache.org/jira/browse/BEAM-11928?focusedWorklogId=655111&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-655111
 ]

ASF GitHub Bot logged work on BEAM-11928:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Sep/21 22:17
            Start Date: 24/Sep/21 22:17
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #15400:
URL: https://github.com/apache/beam/pull/15400#discussion_r715909229



##########
File path: sdks/go/pkg/beam/core/graph/edge.go
##########
@@ -450,8 +456,8 @@ func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, 
ac *coder.Coder, typ
        }
 
        inT := in.Type()
-       if !typex.IsCoGBK(inT) {
-               return nil, addContext(errors.Errorf("Combine requires CoGBK 
type: %v", inT), s)
+       if !typex.IsCoGBK(inT) && s.Label == CombinePerKeyScope {

Review comment:
       With the above changes, this change here is unnecessary since the GBK 
type will be valid.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/combine.go
##########
@@ -143,6 +143,10 @@ func (n *Combine) ProcessElement(ctx context.Context, 
value *FullValue, values .
        }
        first := true
 
+       if len(values) == 0 {
+               return errors.Errorf("did not get values to combine for key %v, 
ID %v", value.Elm, n.UID)

Review comment:
       The lack of pre-GBK in the fallback case is why we ended up with this 
error at all.

##########
File path: sdks/go/pkg/beam/combine.go
##########
@@ -39,12 +39,8 @@ func CombinePerKey(s Scope, combinefn interface{}, col 
PCollection, opts ...Opti
 // for multiple reasons, notably that the combinefn is not valid or cannot be 
bound
 // -- due to type mismatch, say -- to the incoming PCollections.
 func TryCombine(s Scope, combinefn interface{}, col PCollection, opts 
...Option) (PCollection, error) {
-       pre := AddFixedKey(s, col)
-       post, err := TryCombinePerKey(s, combinefn, pre, opts...)
-       if err != nil {
-               return PCollection{}, err
-       }
-       return DropKey(s, post), nil
+       s = s.Scope(graph.CombineGloballyScope)

Review comment:
       I think I understand the other bugs that have been happening when a 
runner doesn't understand the combine_globally urn. And now I feel like I've 
wasted your time for a few weeks. Very sorry about that :/.
   
   This change is removing the fallback behaviour from the pipeline graph for 
unknown composites. The intent of that is to build up sub transforms that do 
the same work as the URN that is implementing them.
   
   The existing code for beam.Combine added a fixed key, then deferred to 
CombinePerKey, which meant that downstream in graphx/translate.go it lost any 
sense of the "global" nature of this particular transform.  But also, I don't 
think I've explained how CombinePerKey as a composite transform works. If you 
read the original combine per key code, note that it always adds a 
TryGroupByKey, and follows it by a NewCombine.
   
   So in the case of a GlobalCombine, what we want is to 
   1. Add the "magic scope" (`graph.CombineGloballyScope`).
   2. AddFixedKey + TryGroupByKey + NewCombine + DropKey
   3. In graphx, we check and detect the magic scope and conditions (similarly 
to how we're doing it for CombinePerKey).
   
   It's very important that we have a composite 2 with all those contents, 
since that's what needs to get executed by the runner if it doesn't understand 
the URN at all. "NewCombine" by itself doesn't actually do any grouping.
   
   Basically, in principal, there shouldn't be any need for the Direct runner 
to change, (with new nodes or anything) since it's the kind of runner that will 
rely on the fallbacks.
   
   
   

##########
File path: sdks/go/test/integration/primitives/teststream.go
##########
@@ -21,17 +21,18 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
 )
 

Review comment:
       The lack of built in / fallback GBK is probably why the Trigger tests 
started failing hard: no point where a GBK as happening anymore, which is when 
triggers occur.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 655111)
    Time Spent: 5h 20m  (was: 5h 10m)

> Go SDK should use the combine_globally urn for global combines.
> ---------------------------------------------------------------
>
>                 Key: BEAM-11928
>                 URL: https://issues.apache.org/jira/browse/BEAM-11928
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Jack McCluskey
>            Priority: P3
>          Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Reported on 
> [https://stackoverflow.com/questions/66446338/issue-with-combine-function-in-apache-beam-go-sdk/66486052#66486052]
> The root is that the Go SDK doesn't use the 
> "beam:transform:combine_globally:v1" URN, and always uses 
> "beam:transform:combine_per_key:v1" even for global combines, with a 
> AddFixedKey DoFn.
> URN in the proto: 
> [https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L347]
>  
> Go SDK only having combine_per_key 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L42]
> We currently "detect" combines via a CombinePerKey scope 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/edge.go#L434]
>  
> added at beam.TryCombinePerKey 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/combine.go#L58]
> We convert combines into the CombinePayload here
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L253]
> called above here: 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L241]
>  
> We probably want to just add a graph.CombineGlobal op ( vs the existing 
> combine node), or modify the "CombinePerKey" scope hack to have a 
> CombineCombineGlobal variant, or somehting that is cleaner than currently 
> exists.
> We'd also want to make sure the optimization takes place properly, which 
> should be simple enough to detect timing wise at least once, if not as a 
> regular benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to