[
https://issues.apache.org/jira/browse/BEAM-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120886#comment-16120886
]
Xu Mingmin commented on BEAM-2747:
----------------------------------
[~takidau] [~xumingming]
I was swamped by a technical question to support CombineFn. Please advice for
any solution.
Due to the limitation in Calcite:
1. the input parameter of UDAF can only be a AggregateFunctionImpl when call
`SchemaPlus.add(fnName, fn)`;
2. AggregateFunctionImpl is not extensible, with only one public static
`create` method to take a `Class` argument;
None of below solutions works so far:
1. An adaptor, which need a constructor like `public
BeamUdafCombineFnAdaptor(CombineFn)`, doesn't work as `AggregateFunctionImpl`
only takes a `Class` as input;
2. An abstract BeamSqlUdaf<InputT, AccumT, OutputT> which wraps CombineFn,
users need to extend this class and implement `public abstract
CombineFn<InputT, AccumT, OutputT> getCombineFn();`, like
{code}
public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements
Serializable {
public BeamSqlUdaf(){}
public abstract CombineFn<InputT, AccumT, OutputT> getCombineFn();
public final AccumT init(){ return getCombineFn().createAccumulator(); }
public final AccumT add(AccumT accumulator, InputT input){ return
getCombineFn().addInput(accumulator, input); }
public final AccumT merge(Iterable<AccumT> accumulators){ return
getCombineFn().mergeAccumulators(accumulators); }
public final OutputT result(AccumT accumulator){ return
getCombineFn().extractOutput(accumulator); }
public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) { ... }
}
{code}
Calcite fails to get the actual type for InputT, AccumT, OutputT due to Type
Erasure in
Java(https://en.wikipedia.org/wiki/Generics_in_Java#Problems_with_type_erasure).
The only workable way I know is, keeping the interface as it is and guide users
to use an existing CombineFn like:
{code}
public static class SquareSum extends BeamSqlUdaf<Integer, int[], Integer> {
public SquareSum() {}
public CombineFn<Integer, int[], Integer> getCombineFn() { return
Sum.ofIntegers(); }
public int[] init() { return super.init(); }
public int[] add(int[] accumulator, Integer input) { return
super.add(accumulator, input); }
public int[] merge(Iterable<int[]> accumulators) { return
super.merge(accumulators); }
public Integer result(int[] accumulator) { return
super.result(accumulator); }
}
{code}
> accept CombineFn as UDAF
> ------------------------
>
> Key: BEAM-2747
> URL: https://issues.apache.org/jira/browse/BEAM-2747
> Project: Beam
> Issue Type: Sub-task
> Components: dsl-sql
> Reporter: Xu Mingmin
> Assignee: Xu Mingmin
> Labels: dsl_sql_merge, dsl_sql_review
>
> refer to
> https://docs.google.com/document/d/1VlAqKLhQua6YpR42KuziIYHTB1FfQTi16Mh88MEYBsU/edit?disco=AAAABTmjhgo
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)