[ https://issues.apache.org/jira/browse/FLINK-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15983234#comment-15983234 ]
ASF GitHub Bot commented on FLINK-6368: --------------------------------------- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3768 I ran into the same problem today when adding the new test cases for UDAGG. Thanks for the fix, @xccui > Grouping keys in stream aggregations have wrong order > ----------------------------------------------------- > > Key: FLINK-6368 > URL: https://issues.apache.org/jira/browse/FLINK-6368 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Xingcan Cui > Fix For: 1.3.0 > > > FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It > seems that the order of grouping keys is sometimes messed up. The following > tests fails: > {code} > @Test > def testEventTimeSlidingGroupWindow(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.testResults = mutable.MutableList() > val stream = env > .fromCollection(data) > .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) > .map(t => (t._2, t._6)) > val table = stream.toTable(tEnv, 'int, 'string) > val windowedTable = table > .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) > .groupBy('w, 'string) > .select('string, 'int.count, 'w.start, 'w.end) > val results = windowedTable.toDataStream[Row] > results.addSink(new StreamITCase.StringSink) > env.execute() > } > {code} > Exception: > {code} > Caused by: java.lang.RuntimeException: Could not forward element to next > operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50) > at > org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505) > at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276) > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288) > ... 7 more > Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to > java.lang.String > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)