Aviem Zur created BEAM-1155:
-------------------------------

             Summary: Spark runner aggregators only support a handfuls of 
combiners
                 Key: BEAM-1155
                 URL: https://issues.apache.org/jira/browse/BEAM-1155
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
            Reporter: Aviem Zur
            Assignee: Amit Sela


Spark runner aggregators only support a handfuls of combiners.

If your {{CombineFn}} implementation (specifically, a custom {{CombineFn}} 
written by the user for their aggregator) is not one that appears in 
{{org.apache.beam.runners.spark.translation.SparkRuntimeContext#getCoder}} you 
will get an {{IllegalArgumentException}} in your pipeline.

{code:java}
private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) {
    try {
      if (combiner.getClass() == Sum.SumIntegerFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
      } else if (combiner.getClass() == Sum.SumLongFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
      } else if (combiner.getClass() == Sum.SumDoubleFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
      } else if (combiner.getClass() == Min.MinIntegerFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
      } else if (combiner.getClass() == Min.MinLongFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
      } else if (combiner.getClass() == Min.MinDoubleFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
      } else if (combiner.getClass() == Max.MaxIntegerFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class));
      } else if (combiner.getClass() == Max.MaxLongFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class));
      } else if (combiner.getClass() == Max.MaxDoubleFn.class) {
        return 
getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class));
      } else {
        throw new IllegalArgumentException("unsupported combiner in Aggregator: 
"
            + combiner.getClass().getName());
      }
    } catch (CannotProvideCoderException e) {
      throw new IllegalStateException("Could not determine default coder for 
combiner", e);
    }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to