Roman Wozniak created FLINK-9935:
------------------------------------
Summary: Batch Table API: grouping by window and attribute causes
java.lang.ClassCastException:
Key: FLINK-9935
URL: https://issues.apache.org/jira/browse/FLINK-9935
Project: Flink
Issue Type: Bug
Affects Versions: 1.5.1, 1.5.0
Reporter: Roman Wozniak
Grouping by window AND some other attribute(s) seems broken. Test case
attached:
{code}
class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {
trait BatchContext {
implicit lazy val env: ExecutionEnvironment =
ExecutionEnvironment.getExecutionEnvironment
implicit val tableEnv: BatchTableEnvironment =
TableEnvironment.getTableEnvironment(env)
val data = Seq(
(1532424567000L, "id1", "location1"),
(1532424567000L, "id2", "location1"),
(1532424567000L, "id3", "location1"),
(1532424568000L, "id1", "location2"),
(1532424568000L, "id2", "location3")
)
val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)
val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id,
'location)
}
it should "be possible to run Table API queries with grouping by tumble
window and column(s) on batch data" in new BatchContext {
val results = table
.window(Tumble over 1.second on 'rowtime as 'w)
.groupBy('w, 'location)
.select(
'w.start.cast(Types.LONG),
'w.end.cast(Types.LONG),
'location,
'id.count
)
.toDataSet[(Long, Long, String, Long)]
.collect()
results should contain theSameElementsAs Seq(
(1532424567000L, 1532424568000L, "location1", 3L),
(1532424568000L, 1532424569000L, "location2", 1L),
(1532424568000L, 1532424569000L, "location3", 1L)
)
}
}
{code}
It seems like during execution time, the 'rowtime attribute replaces 'location
and that causes ClassCastException.
{code:java}
[info] Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to
java.lang.String
[info] at
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
[info] at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
[info] at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
[info] at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[info] at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
[info] at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
[info] at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
[info] at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
[info] at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
[info] at
org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
{code}
Here is some debug information that I was able to get. So, field serializers
don't match the type of Row fields:
{code}
this.instance = {Row@68451} "1532424567000,(3),1532424567000"
fields = {Object[3]@68461}
0 = {Long@68462} 1532424567000
1 = {CountAccumulator@68463} "(3)"
2 = {Long@68462} 1532424567000
this.serializer = {RowSerializer@68452}
fieldSerializers = {TypeSerializer[3]@68455}
0 = {StringSerializer@68458}
1 = {TupleSerializer@68459}
2 = {LongSerializer@68460}
arity = 3
nullMask = {boolean[3]@68457}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)