[
https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046332#comment-16046332
]
ASF GitHub Bot commented on FLINK-6888:
---------------------------------------
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/4105#discussion_r121335927
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
---
@@ -1395,50 +1399,25 @@ object AggregateUtil {
case udagg: AggSqlFunction =>
aggregates(index) = udagg.getFunction
+ accTypes(index) = udagg.accType
case unSupported: SqlAggFunction =>
throw new TableException(s"unsupported Function:
'${unSupported.getName}'")
}
}
- (aggFieldIndexes, aggregates)
- }
-
- private def createAccumulatorType(
- aggregates: Array[TableAggregateFunction[_, _]]):
Seq[TypeInformation[_]] = {
-
- val aggTypes: Seq[TypeInformation[_]] =
- aggregates.map {
- agg =>
- val accType = try {
- val method: Method =
agg.getClass.getMethod("getAccumulatorType")
- method.invoke(agg).asInstanceOf[TypeInformation[_]]
- } catch {
- case _: NoSuchMethodException => null
- case ite: Throwable => throw new TableException("Unexpected
exception:", ite)
- }
- if (accType != null) {
- accType
- } else {
- val accumulator = agg.createAccumulator()
- try {
- TypeInformation.of(accumulator.getClass)
- } catch {
- case ite: InvalidTypesException =>
- throw new TableException(
- "Cannot infer type of accumulator. " +
- "You can override
AggregateFunction.getAccumulatorType() to specify the type.",
- ite)
- }
- }
- }
+ // create accumulator type information for every aggregate function
+ aggregates.zipWithIndex.foreach { case (agg, index) =>
+ accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg,
accTypes(index))
--- End diff --
I don't think so. The `getAccumulatorTypeOfAggregateFunction` can handle
the nullable `accTypes(index)`.
> Can not determine TypeInformation of ACC type of AggregateFunction when ACC
> is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-6888
> URL: https://issues.apache.org/jira/browse/FLINK-6888
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of
> {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted
> using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or
> tuple class, the TypeInformation will fall back to {{GenericType}} which
> result in bad performance when state de/serialization.
> I suggest to extract the ACC TypeInformation when called
> {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
> //Overloaded accumulate method
> def accumulate(acc: Accumulator, value: Long): Unit = {
> }
> override def createAccumulator(): Accumulator = Accumulator(0, 0)
> override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)