[
https://issues.apache.org/jira/browse/FLINK-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558520#comment-16558520
]
ASF GitHub Bot commented on FLINK-9935:
---------------------------------------
fhueske closed pull request #6423: [FLINK-9935] [table] Fix incorrect group
field access in batch window combiner.
URL: https://github.com/apache/flink/pull/6423
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 7ce44a6c834..a3c9c1e3f6a 100644
---
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -586,7 +586,7 @@ object AggregateUtil {
isDistinctAggs,
isStateBackedDataViews = false,
partialResults = true,
- groupings,
+ groupings.indices.toArray,
Some(aggregates.indices.map(_ + groupings.length).toArray),
outputType.getFieldCount,
needRetract,
diff --git
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
index 3d9223e0953..2c984c16070 100644
---
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
+++
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/GroupWindowITCase.scala
@@ -97,6 +97,7 @@ class GroupWindowITCase(
val table = env
.fromCollection(data)
.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+ .select('int, 'long, 'string) // keep this select to enforce that the
'string key comes last
val windowedTable = table
.window(Tumble over 5.milli on 'long as 'w)
@@ -271,6 +272,7 @@ class GroupWindowITCase(
val table = env
.fromCollection(data)
.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+ .select('int, 'long, 'string) // keep this select to enforce that the
'string key comes last
val windowedTable = table
.window(Slide over 10.milli every 5.milli on 'long as 'w)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Batch Table API: grouping by window and attribute causes
> java.lang.ClassCastException:
> --------------------------------------------------------------------------------------
>
> Key: FLINK-9935
> URL: https://issues.apache.org/jira/browse/FLINK-9935
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.4.2, 1.5.1, 1.6.0, 1.7.0
> Reporter: Roman Wozniak
> Assignee: Fabian Hueske
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.4.3, 1.5.3, 1.6.0, 1.7.0
>
>
> Grouping by window AND some other attribute(s) seems broken. Test case
> attached:
> {code}
> class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {
> trait BatchContext {
> implicit lazy val env: ExecutionEnvironment =
> ExecutionEnvironment.getExecutionEnvironment
> implicit val tableEnv: BatchTableEnvironment =
> TableEnvironment.getTableEnvironment(env)
> val data = Seq(
> (1532424567000L, "id1", "location1"),
> (1532424567000L, "id2", "location1"),
> (1532424567000L, "id3", "location1"),
> (1532424568000L, "id1", "location2"),
> (1532424568000L, "id2", "location3")
> )
> val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)
> val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id,
> 'location)
> }
> it should "be possible to run Table API queries with grouping by tumble
> window and column(s) on batch data" in new BatchContext {
> val results = table
> .window(Tumble over 1.second on 'rowtime as 'w)
> .groupBy('w, 'location)
> .select(
> 'w.start.cast(Types.LONG),
> 'w.end.cast(Types.LONG),
> 'location,
> 'id.count
> )
> .toDataSet[(Long, Long, String, Long)]
> .collect()
> results should contain theSameElementsAs Seq(
> (1532424567000L, 1532424568000L, "location1", 3L),
> (1532424568000L, 1532424569000L, "location2", 1L),
> (1532424568000L, 1532424569000L, "location3", 1L)
> )
> }
> }
> {code}
> It seems like during execution time, the 'rowtime attribute replaces
> 'location and that causes ClassCastException.
> {code:java}
> [info] Cause: java.lang.ClassCastException: java.lang.Long cannot be cast
> to java.lang.String
> [info] at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> [info] at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
> [info] at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
> [info] at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> [info] at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> [info] at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> [info] at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> [info] at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> [info] at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> [info] at
> org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
> {code}
> Here is some debug information that I was able to get. So, field serializers
> don't match the type of Row fields:
> {code}
> this.instance = {Row@68451} "1532424567000,(3),1532424567000"
> fields = {Object[3]@68461}
> 0 = {Long@68462} 1532424567000
> 1 = {CountAccumulator@68463} "(3)"
> 2 = {Long@68462} 1532424567000
> this.serializer = {RowSerializer@68452}
> fieldSerializers = {TypeSerializer[3]@68455}
> 0 = {StringSerializer@68458}
> 1 = {TupleSerializer@68459}
> 2 = {LongSerializer@68460}
> arity = 3
> nullMask = {boolean[3]@68457}
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)