Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5555#discussion_r182489141
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -1439,7 +1467,47 @@ object AggregateUtil {
           }
         }
     
    -    (aggFieldIndexes, aggregates, accTypes, accSpecs)
    +    // create distinct accumulator filter argument
    +    val distinctAggs = new Array[Seq[DataViewSpec[_]]](aggregateCalls.size)
    +
    +    aggregateCalls.zipWithIndex.foreach {
    +      case (aggCall, index) =>
    +        if (aggCall.isDistinct) {
    +          val argList: util.List[Integer] = aggCall.getArgList
    +          // Only support single argument for distinct operation
    +          if (argList.size() > 1) {
    +            throw TableException(
    +              "Cannot apply distinct filter on multiple input argument 
fields at this moment!")
    +          }
    +          val relDataType = 
aggregateInputType.getFieldList.get(argList.get(0)).getType
    +          val fieldIndex = aggFieldIndexes(index)(0)
    +          val mapViewTypeInfo = new MapViewTypeInfo(
    +            FlinkTypeFactory.toTypeInfo(relDataType), 
BasicTypeInfo.INT_TYPE_INFO)
    --- End diff --
    
    +1


---

Reply via email to