[ 
https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188844#comment-16188844
 ] 

ASF GitHub Bot commented on FLINK-7371:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4736#discussion_r142247401
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
    @@ -1670,4 +1670,34 @@ abstract class CodeGenerator(
     
         fieldTerm
       }
    +
    +  /**
    +    * Adds a reusable constant to the member area of the generated 
[[Function]].
    +    *
    +    * @param constant constant expression
    +    * @return member variable term
    +    */
    +  def addReusableBoxedConstant(constant: GeneratedExpression): String = {
    +    require(constant.literal, "Literal expected")
    +
    +    val fieldTerm = newName("constant")
    +
    +    val boxed = generateOutputFieldBoxing(constant)
    +    val boxedType = boxedTypeTermForTypeInfo(boxed.resultType)
    +
    +    val field =
    +      s"""
    +        |transient $boxedType $fieldTerm;
    --- End diff --
    
    why `transient`? Couldn't this be `final`?


> user defined aggregator assumes nr of arguments smaller or equal than number 
> of row fields
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7371
>                 URL: https://issues.apache.org/jira/browse/FLINK-7371
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.1
>            Reporter: Stefano Bortoli
>            Assignee: Timo Walther
>
> The definition of user define aggregations with a number of parameters larger 
> than the row fields causes ArrayIndexOutOfBoundsException because the 
> indexing is based on a linear iteration over row fields. This does not 
> consider cases where fields can be used more than once and constant values 
> are passed to the aggregation function.
> for example:
> {code}
> window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW 
> aggs [myAgg($0, $1, $3, $0, $4)])
> {code}
> where $3 and $4 are reference to constants, and $0 and $1 are fields causes:
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 4
>       at 
> org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134)
>       at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>       at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>       at 
> org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147)
>       at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to