[ 
https://issues.apache.org/jira/browse/BEAM-1155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur resolved BEAM-1155.
-----------------------------
       Resolution: Won't Fix
    Fix Version/s: Not applicable

> Spark runner aggregators only support a handfuls of combiners
> -------------------------------------------------------------
>
>                 Key: BEAM-1155
>                 URL: https://issues.apache.org/jira/browse/BEAM-1155
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Aviem Zur
>            Assignee: Amit Sela
>             Fix For: Not applicable
>
>
> Spark runner aggregators only support a handfuls of combiners.
> If your {{CombineFn}} implementation (specifically, a custom {{CombineFn}} 
> written by the user for their aggregator) is not one that appears in 
> {{org.apache.beam.runners.spark.translation.SparkRuntimeContext#getCoder}} 
> you will get an {{IllegalArgumentException}} in your pipeline.
> {code:java}
> private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
>     try {
>       if (combiner.getClass() == Sum.SumIntegerFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
>       } else if (combiner.getClass() == Sum.SumLongFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
>       } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
>       } else if (combiner.getClass() == Min.MinIntegerFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
>       } else if (combiner.getClass() == Min.MinLongFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
>       } else if (combiner.getClass() == Min.MinDoubleFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
>       } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
>       } else if (combiner.getClass() == Max.MaxLongFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
>       } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
>         return 
> getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
>       } else {
>         throw new IllegalArgumentException("unsupported combiner in 
> Aggregator: "
>             + combiner.getClass().getName());
>       }
>     } catch (CannotProvideCoderException e) {
>       throw new IllegalStateException("Could not determine default coder for 
> combiner", e);
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to