[ 
https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16046318#comment-16046318
 ] 

ASF GitHub Bot commented on FLINK-6888:
---------------------------------------

Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r121329856
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -1395,50 +1399,25 @@ object AggregateUtil {
     
             case udagg: AggSqlFunction =>
               aggregates(index) = udagg.getFunction
    +          accTypes(index) = udagg.accType
     
             case unSupported: SqlAggFunction =>
               throw new TableException(s"unsupported Function: 
'${unSupported.getName}'")
           }
         }
     
    -    (aggFieldIndexes, aggregates)
    -  }
    -
    -  private def createAccumulatorType(
    -      aggregates: Array[TableAggregateFunction[_, _]]): 
Seq[TypeInformation[_]] = {
    -
    -    val aggTypes: Seq[TypeInformation[_]] =
    -      aggregates.map {
    -        agg =>
    -          val accType = try {
    -            val method: Method = 
agg.getClass.getMethod("getAccumulatorType")
    -            method.invoke(agg).asInstanceOf[TypeInformation[_]]
    -          } catch {
    -            case _: NoSuchMethodException => null
    -            case ite: Throwable => throw new TableException("Unexpected 
exception:", ite)
    -          }
    -          if (accType != null) {
    -            accType
    -          } else {
    -            val accumulator = agg.createAccumulator()
    -            try {
    -              TypeInformation.of(accumulator.getClass)
    -            } catch {
    -              case ite: InvalidTypesException =>
    -                throw new TableException(
    -                  "Cannot infer type of accumulator. " +
    -                    "You can override 
AggregateFunction.getAccumulatorType() to specify the type.",
    -                  ite)
    -            }
    -          }
    -      }
    +    // create accumulator type information for every aggregate function
    +    aggregates.zipWithIndex.foreach { case (agg, index) =>
    +      accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg, 
accTypes(index))
    --- End diff --
    
    I think we need add a check as follows:
    ```
    aggregates.zipWithIndex.foreach { case (agg, index) =>
          if(null ==  accTypes(index)){
            accTypes(index) = getAccumulatorTypeOfAggregateFunction(agg, 
accTypes(index))
          }
        }
    ```


> Can not determine TypeInformation of ACC type of AggregateFunction when ACC 
> is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6888
>                 URL: https://issues.apache.org/jira/browse/FLINK-6888
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of 
> {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted 
> using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or 
> tuple class, the TypeInformation will fall back to {{GenericType}} which 
> result in bad performance when state de/serialization. 
> I suggest to extract the ACC TypeInformation when called 
> {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
>   //Overloaded accumulate method
>   def accumulate(acc: Accumulator, value: Long): Unit = {
>   }
>   override def createAccumulator(): Accumulator = Accumulator(0, 0)
>   override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to