[
https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054023#comment-16054023
]
ASF GitHub Bot commented on FLINK-6888:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4105#discussion_r122711347
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
---
@@ -358,20 +358,27 @@ abstract class TableEnvironment(val config:
TableConfig) {
* Registers an [[AggregateFunction]] under a unique name. Replaces
already existing
* user-defined functions under this name.
*/
- private[flink] def registerAggregateFunctionInternal[T: TypeInformation,
ACC](
+ private[flink] def registerAggregateFunctionInternal[T: TypeInformation,
ACC: TypeInformation](
name: String, function: AggregateFunction[T, ACC]): Unit = {
// check if class not Scala object
checkNotSingleton(function.getClass)
// check if class could be instantiated
checkForInstantiation(function.getClass)
- val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
+ val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
+ val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
// register in Table API
functionCatalog.registerFunction(name, function.getClass)
// register in SQL API
- val sqlFunctions = createAggregateSqlFunction(name, function,
typeInfo, typeFactory)
+ val sqlFunctions = createAggregateSqlFunction(
--- End diff --
You are right. We use the same approach for the return type `T` of all
UDFs. However, the return type is part of the function signature and needs to
be known to Calcite for semantic validation. The `ACC` type is only needed for
compilation and an engine specific property.
TBH, I'm not sure which approach is better.
> Can not determine TypeInformation of ACC type of AggregateFunction when ACC
> is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-6888
> URL: https://issues.apache.org/jira/browse/FLINK-6888
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of
> {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted
> using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or
> tuple class, the TypeInformation will fall back to {{GenericType}} which
> result in bad performance when state de/serialization.
> I suggest to extract the ACC TypeInformation when called
> {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
> //Overloaded accumulate method
> def accumulate(acc: Accumulator, value: Long): Unit = {
> }
> override def createAccumulator(): Accumulator = Accumulator(0, 0)
> override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)