[ 
https://issues.apache.org/jira/browse/FLINK-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15971623#comment-15971623
 ] 

ASF GitHub Bot commented on FLINK-6240:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3694#discussion_r111814395
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
    @@ -396,13 +396,68 @@ class CodeGenerator(
              |  }""".stripMargin
         }
     
    -    def generateCreateOutputRow(outputArity: Int): String = {
    +    def genCreateOutputRow(outputArity: Int): String = {
           j"""
              |  public org.apache.flink.types.Row createOutputRow() {
              |    return new org.apache.flink.types.Row($outputArity);
              |  }""".stripMargin
         }
     
    +    def genMergeAccumulatorsPair(
    +        accTypes: Array[String],
    +        aggs: Array[String]): String = {
    +
    +      val sig: String =
    +        j"""
    +           |  public org.apache.flink.types.Row mergeAccumulatorsPair(
    +           |    org.apache.flink.types.Row a,
    +           |    org.apache.flink.types.Row b)
    +           """.stripMargin
    +      val merge: String = {
    +        for (i <- aggs.indices) yield
    +          j"""
    +             |    ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
    +             |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i);
    +             |    accList$i.set(0, aAcc$i);
    +             |    accList$i.set(1, bAcc$i);
    +             |    a.setField(
    +             |      $i,
    +             |      ${aggs(i)}.merge(accList$i));
    +             """.stripMargin
    +      }.mkString("\n")
    +      val ret: String =
    +        j"""
    +           |      return a;
    +           """.stripMargin
    +
    +      j"""$sig {
    +         |$merge
    +         |$ret
    +         |  }""".stripMargin
    +    }
    +
    +    def genMergeList(accTypes: Array[String]): String = {
    +      {
    +        for (i <- accTypes.indices) yield
    +          j"""
    +             |    java.util.ArrayList<${accTypes(i)}> accList$i;
    +             """.stripMargin
    +      }.mkString("\n")
    +    }
    +
    +    def initMergeList(
    +        accTypes: Array[String],
    +        aggs: Array[String]): String = {
    +      {
    +        for (i <- accTypes.indices) yield
    +          j"""
    +             |    accList$i = new java.util.ArrayList<${accTypes(i)}>();
    --- End diff --
    
    create with initial capacity 2 `new 
java.util.ArrayList<${accTypes(i)}>(2);`?


> codeGen dataStream aggregates that use AggregateAggFunction
> -----------------------------------------------------------
>
>                 Key: FLINK-6240
>                 URL: https://issues.apache.org/jira/browse/FLINK-6240
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to