[
https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Fabian Hueske closed FLINK-7338.
--------------------------------
Resolution: Fixed
Fix Version/s: 1.4.0
Fixed for 1.4.0 with 16b088218435dc64848ad641a383f9ce808f07c2
> User Defined aggregation with constants causes error under in lowerbound over
> window extraction
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-7338
> URL: https://issues.apache.org/jira/browse/FLINK-7338
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.1
> Reporter: Stefano Bortoli
> Assignee: Fabian Hueske
> Priority: Critical
> Fix For: 1.4.0
>
>
> A user defined aggregation that passes a constant among the arguments causes
> a RuntimeException extracting the lower boundary over window.
> {code}
> val sqlQuery = "SELECT a, " +
> " myAgg(a, CAST('1' as BIGINT)) "+
> " OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND
> PRECEDING AND CURRENT ROW) " +
> "FROM MyTable"
> {code}
> The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala
> we do : field count - lower bound index
> -- which causes a -1 get, and subsequent RuntimeException.
> We should do: lower bound offset - field count to find the value in the
> constant array.
> The code below should fix the problem.
> {code}
> private[flink] def getLowerBoundary(
> logicWindow: Window,
> overWindow: Group,
> input: RelNode): Long = {
> val ref: RexInputRef =
> overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
> val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
> val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
> lowerBound match {
> case x: java.math.BigDecimal =>
> x.asInstanceOf[java.math.BigDecimal].longValue()
> case _ => lowerBound.asInstanceOf[Long]
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)