[
https://issues.apache.org/jira/browse/BEAM-11928?focusedWorklogId=647596&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-647596
]
ASF GitHub Bot logged work on BEAM-11928:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Sep/21 22:09
Start Date: 07/Sep/21 22:09
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #15400:
URL: https://github.com/apache/beam/pull/15400#discussion_r703887294
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -251,11 +252,11 @@ func (m *marshaller) addScopeTree(s *ScopeTree) (string,
error) {
// Beam Portability requires that composites contain an implementation for
runners
// that don't understand the URN and Payload, which this lightly checks for.
func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform
*pipepb.PTransform) error {
Review comment:
I think the problem with Flink here is that it doesn't understand how to
handle URNCombineGlobally. As the doc comment for this function says, "// Beam
Portability requires that composites contain an implementation for runners
// that don't understand the URN and Payload".
Flink then falls back on whatever implementation is in the subtransforms
(see lines 228-240), which simply have "GBK + Combine".
In essence, it could be that the exec.Combine node is wrong for this case,
as it would only be used for this unlifted case. It was fine before, since
fixed key handling changed what the Flink GBK was receiving.
So one solution is: change it so the subtransforms are updated with the
fixed key logic somehow, working something like the special Reshuffle or CoGBK
handler nodes work. Probably more work that way.
An alternative would be to add a new/different CombineGlobally node to the
exec package that can handle serial elements (likely doing something with
AddInput and handing it to Extract output, when it's a CombineGlobally graph
node). This is probably less work than the former option.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 647596)
Time Spent: 2h 10m (was: 2h)
> Go SDK should use the combine_globally urn for global combines.
> ---------------------------------------------------------------
>
> Key: BEAM-11928
> URL: https://issues.apache.org/jira/browse/BEAM-11928
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Jack McCluskey
> Priority: P3
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Reported on
> [https://stackoverflow.com/questions/66446338/issue-with-combine-function-in-apache-beam-go-sdk/66486052#66486052]
> The root is that the Go SDK doesn't use the
> "beam:transform:combine_globally:v1" URN, and always uses
> "beam:transform:combine_per_key:v1" even for global combines, with a
> AddFixedKey DoFn.
> URN in the proto:
> [https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L347]
>
> Go SDK only having combine_per_key
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L42]
> We currently "detect" combines via a CombinePerKey scope
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/edge.go#L434]
>
> added at beam.TryCombinePerKey
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/combine.go#L58]
> We convert combines into the CombinePayload here
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L253]
> called above here:
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L241]
>
> We probably want to just add a graph.CombineGlobal op ( vs the existing
> combine node), or modify the "CombinePerKey" scope hack to have a
> CombineCombineGlobal variant, or somehting that is cleaner than currently
> exists.
> We'd also want to make sure the optimization takes place properly, which
> should be simple enough to detect timing wise at least once, if not as a
> regular benchmark.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)