[ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960516#comment-15960516
 ] 

ASF GitHub Bot commented on FLINK-6240:
---------------------------------------

Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3694#discussion_r110345902
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
 ---
    @@ -18,69 +18,51 @@
     
     package org.apache.flink.table.runtime.aggregate
     
    -import java.util.{ArrayList => JArrayList, List => JList}
    -import org.apache.flink.api.common.functions.{AggregateFunction => 
DataStreamAggFunc}
    -import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.api.common.functions.AggregateFunction
    +import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
     import org.apache.flink.types.Row
    +import org.slf4j.LoggerFactory
     
     /**
       * Aggregate Function used for the aggregate operator in
       * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
       *
    -  * @param aggregates       the list of all 
[[org.apache.flink.table.functions.AggregateFunction]]
    -  *                         used for this aggregation
    -  * @param aggFields   the position (in the input Row) of the input value 
for each aggregate
    +  * @param genAggregations Generated aggregate helper function
       */
     class AggregateAggFunction(
    -    private val aggregates: Array[AggregateFunction[_]],
    -    private val aggFields: Array[Array[Int]])
    -  extends DataStreamAggFunc[Row, Row, Row] {
    +    genAggregations: GeneratedAggregationsFunction)
    +  extends AggregateFunction[Row, Row, Row]
    --- End diff --
    
    I have thought about this. But unfortunately, the aggregate in 
WindowedStream does not support richFunction:
    ```
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
    ...
                if (aggregateFunction instanceof RichFunction) {
                        throw new UnsupportedOperationException("This aggregate 
function cannot be a RichFunction.");
                }
    ```


> codeGen dataStream aggregates that use AggregateAggFunction
> -----------------------------------------------------------
>
>                 Key: FLINK-6240
>                 URL: https://issues.apache.org/jira/browse/FLINK-6240
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to