jinfeng created FLINK-16451:
-------------------------------
Summary: listagg with distinct for over window
Key: FLINK-16451
URL: https://issues.apache.org/jira/browse/FLINK-16451
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.10.0, 1.9.2
Reporter: jinfeng
When I use lisgagg with distinct and over window.
{code:java}
//代码占位符
"select listagg(distinct product, '|') over(partition by user order by proctime
rows between 200 preceding and current row) as product, user from " + testTable
{code}
I got the follwing exception
{code:java}
//代码占位符
Exception in thread "main" java.lang.IndexOutOfBoundsException: Index: 3, Size:
3 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at
java.util.ArrayList.get(ArrayList.java:433) at
java.util.Collections$UnmodifiableList.get(Collections.java:1311) at
org.apache.flink.table.types.logical.RowType.getTypeAt(RowType.java:174) at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:635)
at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:620)
at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:524)
at
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
at
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen$$anonfun$10.apply(DistinctAggCodeGen.scala:374)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234) at
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.generateKeyExpression(DistinctAggCodeGen.scala:374)
at
org.apache.flink.table.planner.codegen.agg.DistinctAggCodeGen.accumulate(DistinctAggCodeGen.scala:192)
at
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
at
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$12.apply(AggsHandlerCodeGenerator.scala:871)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genAccumulate(AggsHandlerCodeGenerator.scala:871)
at
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateAggsHandler(AggsHandlerCodeGenerator.scala:329)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.createBoundedOverProcessFunction(StreamExecOverAggregate.scala:425)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:255)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:56)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlan(StreamExecOverAggregate.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
{code}
But It worked with
{code:java}
//代码占位符
select listagg(distinct product) over(partition by user order by proctime rows
between 200 preceding and current row) as product, user from " + testTable
{code}
{code:java}
//代码占位符
private def generateKeyExpression(
ctx: CodeGeneratorContext,
generator: ExprCodeGenerator): GeneratedExpression = {
val fieldExprs = distinctInfo.argIndexes.map(generateInputAccess(
ctx,
generator.input1Type,
generator.input1Term,
_,
nullableInput = false,
deepCopy = inputFieldCopy))
{code}
The exception will be throw at the below code.
The distinctInfo.argIndexs is [1, 3] . But the index 3 is a logical index. It
will be replaced by '|' . And should not generate Input Access for index 3
--
This message was sent by Atlassian Jira
(v8.3.4#803005)