[
https://issues.apache.org/jira/browse/BEAM-6740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776742#comment-16776742
]
Etienne Chauchot edited comment on BEAM-6740 at 2/25/19 10:56 AM:
------------------------------------------------------------------
That is because none of
{code}
RawPTransformTranslator
KnownTransformPayloadTranslator
ParDoTranslator
{code}
known translators can translate Combine.Globally whereas
{code}
KnownTransformPayloadTranslator
{code}
can translate Combine.PerKey
was (Author: echauchot):
That is because none of
{code}
RawPTransformTranslator
KnownTransformPayloadTranslator
ParDoTranslator
{code}
known translators can translate Combine.Globally
> Combine.globally translation is never called
> --------------------------------------------
>
> Key: BEAM-6740
> URL: https://issues.apache.org/jira/browse/BEAM-6740
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Reporter: Etienne Chauchot
> Priority: Major
>
> SDK translates Combine.Globally as a composite transform composed of:
> * Map that assigns Void keys
> * Combine.PerKey
> As Combine.Perkey uses a spark GBK inside it, the runner adds its own
> translation of Combine.Globally to avoid less performant GBK. This
> translation should be called in place of entering the composite transform
> translation.A pipeline like this:
> {code}
> PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8,
> 9, 10));
> input.apply(
> Combine.globally(new IntegerCombineFn()));
> {code}
> {code}
> private static class IntegerCombineFn extends Combine.CombineFn<Integer,
> Integer, Integer> {
> @Override
> public Integer createAccumulator() {
> return 0;
> }
> @Override
> public Integer addInput(Integer accumulator, Integer input) {
> return accumulator + input;
> }
> @Override
> public Integer mergeAccumulators(Iterable<Integer> accumulators) {
> Integer result = 0;
> for (Integer value : accumulators) {
> result += value;
> }
> return result;
> }
> @Override
> public Integer extractOutput(Integer accumulator) {
> return accumulator;
> }
> }
> {code}
> is translated as the above composite.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)