Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4585#discussion_r142667939
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -1410,6 +1410,26 @@ object AggregateUtil {
             case _: SqlCountAggFunction =>
               aggregates(index) = new CountAggFunction
     
    +        case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT 
=>
    +          aggregates(index) = sqlTypeName match {
    +            case TINYINT =>
    +              new ByteCollectAggFunction
    +            case SMALLINT =>
    +              new ShortCollectAggFunction
    +            case INTEGER =>
    +              new IntCollectAggFunction
    +            case BIGINT =>
    +              new LongCollectAggFunction
    +            case VARCHAR | CHAR =>
    +              new StringCollectAggFunction
    +            case FLOAT =>
    +              new FloatCollectAggFunction
    +            case DOUBLE =>
    +              new DoubleCollectAggFunction
    +            case _ =>
    +              new ObjectCollectAggFunction
    +          }
    +
    --- End diff --
    
    we need to set `accTypes(index) = aggregates(index).getAccumulatorType` in 
order to activate the `MapView` feature.


---

Reply via email to