[ 
https://issues.apache.org/jira/browse/FLINK-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555812#comment-16555812
 ] 

ASF GitHub Bot commented on FLINK-9935:
---------------------------------------

fhueske opened a new pull request #6423: [FLINK-9935] [table] Fix incorrect 
group field access in batch window combiner.
URL: https://github.com/apache/flink/pull/6423
 
 
   ## What is the purpose of the change
   
   Fixes an incorrect field access (on a grouping field) in the combiner for a 
TUMBLE or HOP group window in a batch query. The incorrect field access would 
typically result in a ClassCastException or lead to incorrect results.
   
   ## Brief change log
   
   * fix the field access in the generated aggregation code.
   * modify tests to fail if the field access is invalid
   
   ## Verifying this change
   
   * the modified tests validate the correct access.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **n/a**
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Batch Table API: grouping by window and attribute causes 
> java.lang.ClassCastException:
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-9935
>                 URL: https://issues.apache.org/jira/browse/FLINK-9935
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.2, 1.5.1, 1.6.0, 1.7.0
>            Reporter: Roman Wozniak
>            Assignee: Fabian Hueske
>            Priority: Blocker
>              Labels: pull-request-available
>
>  Grouping by window AND some other attribute(s) seems broken. Test case 
> attached:
> {code}
> class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {
>   trait BatchContext {
>     implicit lazy val env: ExecutionEnvironment = 
> ExecutionEnvironment.getExecutionEnvironment
>     implicit val tableEnv: BatchTableEnvironment = 
> TableEnvironment.getTableEnvironment(env)
>     val data = Seq(
>       (1532424567000L, "id1", "location1"),
>       (1532424567000L, "id2", "location1"),
>       (1532424567000L, "id3", "location1"),
>       (1532424568000L, "id1", "location2"),
>       (1532424568000L, "id2", "location3")
>     )
>     val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)
>     val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 
> 'location)
>   }
>   it should "be possible to run Table API queries with grouping by tumble 
> window and column(s) on batch data" in new BatchContext {
>     val results = table
>       .window(Tumble over 1.second on 'rowtime as 'w)
>       .groupBy('w, 'location)
>       .select(
>         'w.start.cast(Types.LONG),
>         'w.end.cast(Types.LONG),
>         'location,
>         'id.count
>       )
>       .toDataSet[(Long, Long, String, Long)]
>       .collect()
>     results should contain theSameElementsAs Seq(
>       (1532424567000L, 1532424568000L, "location1", 3L),
>       (1532424568000L, 1532424569000L, "location2", 1L),
>       (1532424568000L, 1532424569000L, "location3", 1L)
>     )
>   }
> }
> {code}
> It seems like during execution time, the 'rowtime attribute replaces 
> 'location and that causes ClassCastException.
> {code:java}
> [info]   Cause: java.lang.ClassCastException: java.lang.Long cannot be cast 
> to java.lang.String
> [info]   at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> [info]   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
> [info]   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
> [info]   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> [info]   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> [info]   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> [info]   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> [info]   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> [info]   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> [info]   at 
> org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
> {code}
> Here is some debug information that I was able to get. So, field serializers 
> don't match the type of Row fields:
> {code}
> this.instance = {Row@68451} "1532424567000,(3),1532424567000"
>  fields = {Object[3]@68461} 
>   0 = {Long@68462} 1532424567000
>   1 = {CountAccumulator@68463} "(3)"
>   2 = {Long@68462} 1532424567000
> this.serializer = {RowSerializer@68452} 
>  fieldSerializers = {TypeSerializer[3]@68455} 
>   0 = {StringSerializer@68458} 
>   1 = {TupleSerializer@68459} 
>   2 = {LongSerializer@68460} 
>  arity = 3
>  nullMask = {boolean[3]@68457} 
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to