[ 
https://issues.apache.org/jira/browse/BEAM-11928?focusedWorklogId=647596&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-647596
 ]

ASF GitHub Bot logged work on BEAM-11928:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Sep/21 22:09
            Start Date: 07/Sep/21 22:09
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #15400:
URL: https://github.com/apache/beam/pull/15400#discussion_r703887294



##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -251,11 +252,11 @@ func (m *marshaller) addScopeTree(s *ScopeTree) (string, 
error) {
 // Beam Portability requires that composites contain an implementation for 
runners
 // that don't understand the URN and Payload, which this lightly checks for.
 func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform 
*pipepb.PTransform) error {

Review comment:
       I think the problem with Flink here is that it doesn't understand how to 
handle URNCombineGlobally. As the doc comment for this function says, "// Beam 
Portability requires that composites contain an implementation for runners
   // that don't understand the URN and Payload". 
   
   Flink then falls back on whatever implementation is in the subtransforms 
(see lines 228-240), which simply have "GBK + Combine". 
   
   In essence, it could be that the exec.Combine node is wrong for this case, 
as it would only be used for this unlifted case. It was fine before, since 
fixed key handling changed what the Flink GBK was receiving.
   
   So one solution is: change it so the subtransforms are updated with the 
fixed key logic somehow, working something like the special Reshuffle or CoGBK 
handler nodes work. Probably more work that way.
   
   An alternative would be to add a new/different CombineGlobally node to the 
exec package that can handle serial elements (likely doing something with 
AddInput and handing it to Extract output, when it's a CombineGlobally graph 
node). This is probably less work than the former option.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 647596)
    Time Spent: 2h 10m  (was: 2h)

> Go SDK should use the combine_globally urn for global combines.
> ---------------------------------------------------------------
>
>                 Key: BEAM-11928
>                 URL: https://issues.apache.org/jira/browse/BEAM-11928
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Jack McCluskey
>            Priority: P3
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Reported on 
> [https://stackoverflow.com/questions/66446338/issue-with-combine-function-in-apache-beam-go-sdk/66486052#66486052]
> The root is that the Go SDK doesn't use the 
> "beam:transform:combine_globally:v1" URN, and always uses 
> "beam:transform:combine_per_key:v1" even for global combines, with a 
> AddFixedKey DoFn.
> URN in the proto: 
> [https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L347]
>  
> Go SDK only having combine_per_key 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L42]
> We currently "detect" combines via a CombinePerKey scope 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/edge.go#L434]
>  
> added at beam.TryCombinePerKey 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/combine.go#L58]
> We convert combines into the CombinePayload here
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L253]
> called above here: 
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L241]
>  
> We probably want to just add a graph.CombineGlobal op ( vs the existing 
> combine node), or modify the "CombinePerKey" scope hack to have a 
> CombineCombineGlobal variant, or somehting that is cleaner than currently 
> exists.
> We'd also want to make sure the optimization takes place properly, which 
> should be simple enough to detect timing wise at least once, if not as a 
> regular benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to