Aviem Zur created BEAM-1155: ------------------------------- Summary: Spark runner aggregators only support a handfuls of combiners Key: BEAM-1155 URL: https://issues.apache.org/jira/browse/BEAM-1155 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Aviem Zur Assignee: Amit Sela
Spark runner aggregators only support a handfuls of combiners. If your {{CombineFn}} implementation (specifically, a custom {{CombineFn}} written by the user for their aggregator) is not one that appears in {{org.apache.beam.runners.spark.translation.SparkRuntimeContext#getCoder}} you will get an {{IllegalArgumentException}} in your pipeline. {code:java} private Coder<?> getCoder(Combine.CombineFn<?, ?, ?> combiner) { try { if (combiner.getClass() == Sum.SumIntegerFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); } else if (combiner.getClass() == Sum.SumLongFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); } else if (combiner.getClass() == Sum.SumDoubleFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); } else if (combiner.getClass() == Min.MinIntegerFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); } else if (combiner.getClass() == Min.MinLongFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); } else if (combiner.getClass() == Min.MinDoubleFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); } else if (combiner.getClass() == Max.MaxIntegerFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Integer.class)); } else if (combiner.getClass() == Max.MaxLongFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Long.class)); } else if (combiner.getClass() == Max.MaxDoubleFn.class) { return getCoderRegistry().getDefaultCoder(TypeDescriptor.of(Double.class)); } else { throw new IllegalArgumentException("unsupported combiner in Aggregator: " + combiner.getClass().getName()); } } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine default coder for combiner", e); } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)