Timo Walther created FLINK-7426: ----------------------------------- Summary: Table API does not support null values in keys Key: FLINK-7426 URL: https://issues.apache.org/jira/browse/FLINK-7426 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther
The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not able to serialize null values. This causes issues during checkpointing or when using the RocksDB state backend. We need to replace all {{keyBy}} calls with a custom {{RowKeySelector}}. {code} class AggregateITCase extends StreamingWithStateTestBase { private val queryConfig = new StreamQueryConfig() queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) @Test def testDistinct(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.clear val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) .select('b, Null(Types.LONG)).distinct() val results = t.toRetractStream[Row](queryConfig) results.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null") assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)