[
https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16053432#comment-16053432
]
ASF GitHub Bot commented on FLINK-6888:
---------------------------------------
Github user wuchong commented on a diff in the pull request:
https://github.com/apache/flink/pull/4105#discussion_r122614563
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala
---
@@ -192,10 +193,14 @@ class BatchTableEnvironment(
name: String,
f: AggregateFunction[T, ACC])
: Unit = {
- implicit val typeInfo: TypeInformation[T] = TypeExtractor
- .createTypeInfo(f, classOf[AggregateFunction[T, ACC]], f.getClass, 0)
+ implicit val typeInfo: TypeInformation[T] = UserDefinedFunctionUtils
+ .getResultTypeOfAggregateFunction(f)
.asInstanceOf[TypeInformation[T]]
+ implicit val accTypeInfo: TypeInformation[ACC] =
UserDefinedFunctionUtils
--- End diff --
I agree with you. But this PR is only to fix Scala classes can't work in
the Scala TableEnvironments. In general, users create Scala classes and
naturally will use it in Scala TableEnvironments. Registering a Scala class
into Java TableEnvironments is not recommended (should forbidden). This is the
same with that Scala `T` classes of `Aggregate[T, _] and TableFunction[T]`
works in Scala TableEnvironments.
> Can not determine TypeInformation of ACC type of AggregateFunction when ACC
> is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-6888
> URL: https://issues.apache.org/jira/browse/FLINK-6888
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Reporter: Jark Wu
> Assignee: Jark Wu
> Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of
> {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted
> using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or
> tuple class, the TypeInformation will fall back to {{GenericType}} which
> result in bad performance when state de/serialization.
> I suggest to extract the ACC TypeInformation when called
> {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
> //Overloaded accumulate method
> def accumulate(acc: Accumulator, value: Long): Unit = {
> }
> override def createAccumulator(): Accumulator = Accumulator(0, 0)
> override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)