lostluck commented on a change in pull request #15400:
URL: https://github.com/apache/beam/pull/15400#discussion_r703887294
##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -251,11 +252,11 @@ func (m *marshaller) addScopeTree(s *ScopeTree) (string,
error) {
// Beam Portability requires that composites contain an implementation for
runners
// that don't understand the URN and Payload, which this lightly checks for.
func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform
*pipepb.PTransform) error {
Review comment:
I think the problem with Flink here is that it doesn't understand how to
handle URNCombineGlobally. As the doc comment for this function says, "// Beam
Portability requires that composites contain an implementation for runners
// that don't understand the URN and Payload".
Flink then falls back on whatever implementation is in the subtransforms
(see lines 228-240), which simply have "GBK + Combine".
In essence, it could be that the exec.Combine node is wrong for this case,
as it would only be used for this unlifted case. It was fine before, since
fixed key handling changed what the Flink GBK was receiving.
So one solution is: change it so the subtransforms are updated with the
fixed key logic somehow, working something like the special Reshuffle or CoGBK
handler nodes work. Probably more work that way.
An alternative would be to add a new/different CombineGlobally node to the
exec package that can handle serial elements (likely doing something with
AddInput and handing it to Extract output, when it's a CombineGlobally graph
node). This is probably less work than the former option.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]