[
https://issues.apache.org/jira/browse/BEAM-11928?focusedWorklogId=647510&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-647510
]
ASF GitHub Bot logged work on BEAM-11928:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Sep/21 18:13
Start Date: 07/Sep/21 18:13
Worklog Time Spent: 10m
Work Description: jrmccluskey commented on pull request #15400:
URL: https://github.com/apache/beam/pull/15400#issuecomment-914518273
Yeah it seems to be that hard-coded expectation of a stream. That's what
gave rise to the direct runner having to drop in a CoGBK ahead of the global
combine, since the CoGBK outputs in that FullValue/ReStream pair. I'm a little
surprised the Flink runner doesn't like it given that every other runner does,
though. And adding logic to handle the nil ReStream case will likely not fix
it, as that means it is outputting individual elements to process element
instead of arranging them to be processed together. The next operation would
only get the accumulation function output for a single element, not for all of
the elements.
Regardless, it makes sense that the expectation of getting a ReStream should
be wrapped with some checking and a more helpful error message than just a
generic index out of bounds panic. I added a quick check and we can go from
there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 647510)
Time Spent: 2h (was: 1h 50m)
> Go SDK should use the combine_globally urn for global combines.
> ---------------------------------------------------------------
>
> Key: BEAM-11928
> URL: https://issues.apache.org/jira/browse/BEAM-11928
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Jack McCluskey
> Priority: P3
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Reported on
> [https://stackoverflow.com/questions/66446338/issue-with-combine-function-in-apache-beam-go-sdk/66486052#66486052]
> The root is that the Go SDK doesn't use the
> "beam:transform:combine_globally:v1" URN, and always uses
> "beam:transform:combine_per_key:v1" even for global combines, with a
> AddFixedKey DoFn.
> URN in the proto:
> [https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L347]
>
> Go SDK only having combine_per_key
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L42]
> We currently "detect" combines via a CombinePerKey scope
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/edge.go#L434]
>
> added at beam.TryCombinePerKey
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/combine.go#L58]
> We convert combines into the CombinePayload here
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L253]
> called above here:
> [https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L241]
>
> We probably want to just add a graph.CombineGlobal op ( vs the existing
> combine node), or modify the "CombinePerKey" scope hack to have a
> CombineCombineGlobal variant, or somehting that is cleaner than currently
> exists.
> We'd also want to make sure the optimization takes place properly, which
> should be simple enough to detect timing wise at least once, if not as a
> regular benchmark.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)