[
https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182748#comment-16182748
]
ASF GitHub Bot commented on FLINK-7371:
---------------------------------------
GitHub user twalthr opened a pull request:
https://github.com/apache/flink/pull/4736
[FLINK-7371] [table] Add support for constant parameters in OVER aggregate
## What is the purpose of the change
This PR allows to pass constants to OVER window aggregates. E.g.
`.select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)`.
## Brief change log
Until now the constants where simply ignored. I added code generation for
the literals in `AggregationCodeGenerator`.
## Verifying this change
I add a ITCase for it. I might add more tests if I have time. In general,
we need to rework the logic there a little bit, because I think we also do not
support DATE, TIME etc. right now.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? not applicable
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/twalthr/flink FLINK-7371
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4736.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4736
----
commit 19e056e038009e22e2b607b38931f575d5c948df
Author: twalthr <[email protected]>
Date: 2017-09-27T15:11:28Z
[FLINK-7371] [table] Add support for constant parameters in OVER aggregate
----
> user defined aggregator assumes nr of arguments smaller or equal than number
> of row fields
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-7371
> URL: https://issues.apache.org/jira/browse/FLINK-7371
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.1
> Reporter: Stefano Bortoli
> Assignee: Timo Walther
>
> The definition of user define aggregations with a number of parameters larger
> than the row fields causes ArrayIndexOutOfBoundsException because the
> indexing is based on a linear iteration over row fields. This does not
> consider cases where fields can be used more than once and constant values
> are passed to the aggregation function.
> for example:
> {code}
> window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW
> aggs [myAgg($0, $1, $3, $0, $4)])
> {code}
> where $3 and $4 are reference to constants, and $0 and $1 are fields causes:
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 4
> at
> org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134)
> at
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
> at
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)