[ 
https://issues.apache.org/jira/browse/FLINK-5768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891305#comment-15891305
 ] 

ASF GitHub Bot commented on FLINK-5768:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103806213
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -737,101 +632,121 @@ object AggregateUtil {
               aggregates(index) = if (sqlMinMaxFunction.getKind == 
SqlKind.MIN) {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMinAggregate
    +                new ByteMinAggFunction
                   case SMALLINT =>
    -                new ShortMinAggregate
    +                new ShortMinAggFunction
                   case INTEGER =>
    -                new IntMinAggregate
    +                new IntMinAggFunction
                   case BIGINT =>
    -                new LongMinAggregate
    +                new LongMinAggFunction
                   case FLOAT =>
    -                new FloatMinAggregate
    +                new FloatMinAggFunction
                   case DOUBLE =>
    -                new DoubleMinAggregate
    +                new DoubleMinAggFunction
                   case DECIMAL =>
    -                new DecimalMinAggregate
    +                new DecimalMinAggFunction
                   case BOOLEAN =>
    -                new BooleanMinAggregate
    +                new BooleanMinAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Min aggregate does no support 
type:" + sqlType)
                 }
               } else {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMaxAggregate
    +                new ByteMaxAggFunction
                   case SMALLINT =>
    -                new ShortMaxAggregate
    +                new ShortMaxAggFunction
                   case INTEGER =>
    -                new IntMaxAggregate
    +                new IntMaxAggFunction
                   case BIGINT =>
    -                new LongMaxAggregate
    +                new LongMaxAggFunction
                   case FLOAT =>
    -                new FloatMaxAggregate
    +                new FloatMaxAggFunction
                   case DOUBLE =>
    -                new DoubleMaxAggregate
    +                new DoubleMaxAggFunction
                   case DECIMAL =>
    -                new DecimalMaxAggregate
    +                new DecimalMaxAggFunction
                   case BOOLEAN =>
    -                new BooleanMaxAggregate
    +                new BooleanMaxAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Max aggregate does no support 
type:" + sqlType)
                 }
               }
             }
             case _: SqlCountAggFunction =>
    -          aggregates(index) = new CountAggregate
    +          aggregates(index) = new CountAggFunction
             case unSupported: SqlAggFunction =>
               throw new TableException("unsupported Function: " + 
unSupported.getName)
           }
    -      setAggregateDataOffset(index)
    -    }
    -
    -    // set the aggregate intermediate data start index in Row, and update 
current value.
    -    def setAggregateDataOffset(index: Int): Unit = {
    -      aggregates(index).setAggOffsetInRow(aggOffset)
    -      aggOffset += aggregates(index).intermediateDataType.length
         }
     
         (aggFieldIndexes, aggregates)
       }
     
    -  private def createAggregateBufferDataType(
    -    groupings: Array[Int],
    -    aggregates: Array[Aggregate[_]],
    -    inputType: RelDataType,
    -    windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo 
= {
    +  private def createDataSetAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]],
    +      inputType: RelDataType,
    +      windowKeyTypes: Option[Array[TypeInformation[_]]] = None): 
RowTypeInfo = {
     
         // get the field data types of group keys.
    -    val groupingTypes: Seq[TypeInformation[_]] = groupings
    -      .map(inputType.getFieldList.get(_).getType)
    -      .map(FlinkTypeFactory.toTypeInfo)
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
     
         // get all field data types of all intermediate aggregates
    -    val aggTypes: Seq[TypeInformation[_]] = 
aggregates.flatMap(_.intermediateDataType)
    +    val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
    +      val clazz: Class[_] = agg.getClass
    --- End diff --
    
    We need to obtain the `TypeInformation` of the `Accumulator` here, not the 
type of the `AggregateFunction`.
    We might need to add a `getAccumulatorType()` method to the 
`AggregateFunction` if we cannot extract the type from the object returned by 
`AggregateFunction.createAccumulator()`.


> Apply new aggregation functions for datastream and dataset tables
> -----------------------------------------------------------------
>
>                 Key: FLINK-5768
>                 URL: https://issues.apache.org/jira/browse/FLINK-5768
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>
> Apply new aggregation functions for datastream and dataset tables
> This includes:
> 1. Change the implementation of the DataStream aggregation runtime code to 
> use new aggregation functions and aggregate dataStream API.
> 2. DataStream will be always running in incremental mode, as explained in 
> 06/Feb/2017 in FLINK5564.
> 2. Change the implementation of the Dataset aggregation runtime code to use 
> new aggregation functions.
> 3. Clean up unused class and method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to