Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3783#discussion_r124030194
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
    @@ -296,6 +299,39 @@ class CodeGenerator(
           fields.mkString(", ")
         }
     
    +    def genInitialize(): String = {
    +      
    +      val sig: String = 
    +        j"""
    +           |  public void initialize(
    +           |    org.apache.flink.api.common.functions.RuntimeContext ctx
    +           |  )""".stripMargin
    +        
    +      val initDist: String = if( distinctAggsFlags.isDefined ) {
    +        val statePackage = "org.apache.flink.api.common.state"
    +        val distAggsFlags = distinctAggsFlags.get
    +          for(i <- distAggsFlags.indices) yield
    +            if(distAggsFlags(i)) {
    +              val typeString = javaTypes(aggFields(i)(0))
    --- End diff --
    
    UDAGGs can have more than a single parameter.
    
    Since the key can only be a single object, we have to put all arguments 
into a `TupleX` (I think the limitation of 25 fields is reasonable and we can 
throw an exception if we observe a DISTINCT UDAGG with more than 25 fields).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to