[ 
https://issues.apache.org/jira/browse/BEAM-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120886#comment-16120886
 ] 

Xu Mingmin commented on BEAM-2747:
----------------------------------

[~takidau] [~xumingming]
I was swamped by a technical question to support CombineFn. Please advice for 
any solution.

Due to the limitation in Calcite:
1. the input parameter of UDAF can only be a AggregateFunctionImpl when call 
`SchemaPlus.add(fnName, fn)`;
2. AggregateFunctionImpl is not extensible, with only one public static 
`create` method to take a `Class` argument;

None of below solutions works so far:
1. An adaptor, which need a constructor like `public 
BeamUdafCombineFnAdaptor(CombineFn)`, doesn't work as `AggregateFunctionImpl` 
only takes a `Class` as input;
2. An abstract BeamSqlUdaf<InputT, AccumT, OutputT> which wraps CombineFn, 
users need to extend this class and implement `public abstract 
CombineFn<InputT, AccumT, OutputT> getCombineFn();`, like
{code}
public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements 
Serializable {
  public BeamSqlUdaf(){}

  public abstract CombineFn<InputT, AccumT, OutputT> getCombineFn();

  public final AccumT init(){    return getCombineFn().createAccumulator();  }

  public final AccumT add(AccumT accumulator, InputT input){    return 
getCombineFn().addInput(accumulator, input);  }

  public final AccumT merge(Iterable<AccumT> accumulators){    return 
getCombineFn().mergeAccumulators(accumulators);  }

  public final OutputT result(AccumT accumulator){    return 
getCombineFn().extractOutput(accumulator);  }

  public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) { ...  }
}
{code}
Calcite fails to get the actual type for InputT, AccumT, OutputT due to Type 
Erasure in 
Java(https://en.wikipedia.org/wiki/Generics_in_Java#Problems_with_type_erasure).

The only workable way I know is, keeping the interface as it is and guide users 
to use an existing CombineFn like:
 {code}
  public static class SquareSum extends BeamSqlUdaf<Integer, int[], Integer> {
    public SquareSum() {}

    public CombineFn<Integer, int[], Integer> getCombineFn() {      return 
Sum.ofIntegers();    }

    public int[] init() {    return super.init();    }

    public int[] add(int[] accumulator, Integer input) {     return 
super.add(accumulator, input);    }

    public int[] merge(Iterable<int[]> accumulators) {      return 
super.merge(accumulators);    }

    public Integer result(int[] accumulator) {     return 
super.result(accumulator);    }
  } 
{code}

> accept CombineFn as UDAF
> ------------------------
>
>                 Key: BEAM-2747
>                 URL: https://issues.apache.org/jira/browse/BEAM-2747
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql
>            Reporter: Xu Mingmin
>            Assignee: Xu Mingmin
>              Labels: dsl_sql_merge, dsl_sql_review
>
> refer to 
> https://docs.google.com/document/d/1VlAqKLhQua6YpR42KuziIYHTB1FfQTi16Mh88MEYBsU/edit?disco=AAAABTmjhgo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to