[ 
https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046320#comment-16046320
 ] 

ASF GitHub Bot commented on FLINK-6888:
---------------------------------------

Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121324222
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 ---
    @@ -37,24 +36,26 @@ import 
org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
       * @param name function name (used by SQL parser)
       * @param aggregateFunction aggregate function to be called
       * @param returnType the type information of returned value
    +  * @param accType the type information of the accumulator
       * @param typeFactory type factory for converting Flink's between 
Calcite's types
       */
     class AggSqlFunction(
         name: String,
         aggregateFunction: AggregateFunction[_, _],
    -    returnType: TypeInformation[_],
    +    val returnType: TypeInformation[_],
    +    val accType: TypeInformation[_],
         typeFactory: FlinkTypeFactory,
         requiresOver: Boolean)
    -  extends SqlUserDefinedAggFunction(
    +  extends SqlAggFunction(
    --- End diff --
    
    Why we need change `SqlUserDefinedAggFunction` to `SqlAggFunction`. Is 
there some reasons? If so, please explain more about. 
    If not so, I suggest keeping using `SqlUserDefinedAggFunction`. Because 
`SqlUserDefinedAggFunction` is provided by Calcite which related the UDAG, in 
order to adapt to Calcite changes I recommend keeping using 
`SqlUserDefinedAggFunction`. (Although the `SqlUserDefinedAggFunction` of 
calcite is covered by the current flink.). 


> Can not determine TypeInformation of ACC type of AggregateFunction when ACC 
> is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6888
>                 URL: https://issues.apache.org/jira/browse/FLINK-6888
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of 
> {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted 
> using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or 
> tuple class, the TypeInformation will fall back to {{GenericType}} which 
> result in bad performance when state de/serialization. 
> I suggest to extract the ACC TypeInformation when called 
> {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
>   //Overloaded accumulate method
>   def accumulate(acc: Accumulator, value: Long): Unit = {
>   }
>   override def createAccumulator(): Accumulator = Accumulator(0, 0)
>   override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to