Xu Mingmin commented on BEAM-2747:

[~takidau] [~xumingming]
I was swamped by a technical question to support CombineFn. Please advice for 
any solution.

Due to the limitation in Calcite:
1. the input parameter of UDAF can only be a AggregateFunctionImpl when call 
`SchemaPlus.add(fnName, fn)`;
2. AggregateFunctionImpl is not extensible, with only one public static 
`create` method to take a `Class` argument;

None of below solutions works so far:
1. An adaptor, which need a constructor like `public 
BeamUdafCombineFnAdaptor(CombineFn)`, doesn't work as `AggregateFunctionImpl` 
only takes a `Class` as input;
2. An abstract BeamSqlUdaf<InputT, AccumT, OutputT> which wraps CombineFn, 
users need to extend this class and implement `public abstract 
CombineFn<InputT, AccumT, OutputT> getCombineFn();`, like
public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements 
Serializable {
  public BeamSqlUdaf(){}

  public abstract CombineFn<InputT, AccumT, OutputT> getCombineFn();

  public final AccumT init(){    return getCombineFn().createAccumulator();  }

  public final AccumT add(AccumT accumulator, InputT input){    return 
getCombineFn().addInput(accumulator, input);  }

  public final AccumT merge(Iterable<AccumT> accumulators){    return 
getCombineFn().mergeAccumulators(accumulators);  }

  public final OutputT result(AccumT accumulator){    return 
getCombineFn().extractOutput(accumulator);  }

  public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) { ...  }
Calcite fails to get the actual type for InputT, AccumT, OutputT due to Type 
Erasure in 

The only workable way I know is, keeping the interface as it is and guide users 
to use an existing CombineFn like:
  public static class SquareSum extends BeamSqlUdaf<Integer, int[], Integer> {
    public SquareSum() {}

    public CombineFn<Integer, int[], Integer> getCombineFn() {      return 
Sum.ofIntegers();    }

    public int[] init() {    return super.init();    }

    public int[] add(int[] accumulator, Integer input) {     return 
super.add(accumulator, input);    }

    public int[] merge(Iterable<int[]> accumulators) {      return 
super.merge(accumulators);    }

    public Integer result(int[] accumulator) {     return 
super.result(accumulator);    }

> accept CombineFn as UDAF
> ------------------------
>                 Key: BEAM-2747
>                 URL: https://issues.apache.org/jira/browse/BEAM-2747
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql
>            Reporter: Xu Mingmin
>            Assignee: Xu Mingmin
>              Labels: dsl_sql_merge, dsl_sql_review
> refer to 
> https://docs.google.com/document/d/1VlAqKLhQua6YpR42KuziIYHTB1FfQTi16Mh88MEYBsU/edit?disco=AAAABTmjhgo

This message was sent by Atlassian JIRA

Reply via email to