Stefano Bortoli created FLINK-7338: -------------------------------------- Summary: User Defined aggregation with constants causes error under in lowerbound over window extraction Key: FLINK-7338 URL: https://issues.apache.org/jira/browse/FLINK-7338 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.3.1 Reporter: Stefano Bortoli Priority: Critical
A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window. [code] val sqlQuery = "SELECT a, " + " myAgg(a, CAST('1' as BIGINT)) "+ " OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND PRECEDING AND CURRENT ROW) " + "FROM MyTable" [code] The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala we do : field count - lower bound index -- which causes a -1 get, and subsequent RuntimeException. We should do: lower bound offset - field count to find the value in the constant array. The code below should fix the problem. [code] private[flink] def getLowerBoundary( logicWindow: Window, overWindow: Group, input: RelNode): Long = { val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef] val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2 lowerBound match { case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue() case _ => lowerBound.asInstanceOf[Long] } } [code] -- This message was sent by Atlassian JIRA (v6.4.14#64029)