Timo Walther created FLINK-7426:

             Summary: Table API does not support null values in keys
                 Key: FLINK-7426
                 URL: https://issues.apache.org/jira/browse/FLINK-7426
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Timo Walther

The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} 
uses instances of {{Tuple}}. The {{TupleSerializer}} is not able to serialize 
null values. This causes issues during checkpointing or when using the RocksDB 
state backend. We need to replace all {{keyBy}} calls with a custom 

class AggregateITCase extends StreamingWithStateTestBase {
  private val queryConfig = new StreamQueryConfig()
  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))

  def testDistinct(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
      .select('b, Null(Types.LONG)).distinct()

    val results = t.toRetractStream[Row](queryConfig)
    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)

    val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", 
"5,null", "6,null")
    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)

This message was sent by Atlassian JIRA

Reply via email to