[ 
https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394402#comment-16394402
 ] 

ASF GitHub Bot commented on FLINK-8689:
---------------------------------------

Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5555#discussion_r173644062
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -1439,7 +1467,47 @@ object AggregateUtil {
           }
         }
     
    -    (aggFieldIndexes, aggregates, accTypes, accSpecs)
    +    // create distinct accumulator filter argument
    +    val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
    +
    +    aggregateCalls.zipWithIndex.foreach {
    +      case (aggCall, index) =>
    +        if (aggCall.isDistinct) {
    +          val argList: util.List[Integer] = aggCall.getArgList
    +          // Only support single argument for distinct operation
    +          if (argList.size() > 1) {
    +            throw TableException(
    +              "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
    +          }
    +          val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
    +          val fieldIndex = aggFieldIndexes(index)(0)
    +          val mapViewTypeInfo = new MapViewTypeInfo(
    +            FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
    --- End diff --
    
    LONG_TYPE_INFO is more safe? Int overflow will be easily reached given 1w 
records are processed per second. 


> Add runtime support of distinct filter using MapView 
> -----------------------------------------------------
>
>                 Key: FLINK-8689
>                 URL: https://issues.apache.org/jira/browse/FLINK-8689
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Rong Rong
>            Assignee: Rong Rong
>            Priority: Major
>
> This ticket should cover distinct aggregate function support to codegen for 
> *AggregateCall*, where *isDistinct* fields is set to true.
> This can be verified using the following SQL, which is not currently 
> producing correct results.
> {code:java}
> SELECT
>   a,
>   SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND 
> CURRENT ROW)
> FROM
>   MyTable{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to