Timo Walther created FLINK-7426:
-----------------------------------

             Summary: Table API does not support null values in keys
                 Key: FLINK-7426
                 URL: https://issues.apache.org/jira/browse/FLINK-7426
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Timo Walther


The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} 
uses instances of {{Tuple}}. The {{TupleSerializer}} is not able to serialize 
null values. This causes issues during checkpointing or when using the RocksDB 
state backend. We need to replace all {{keyBy}} calls with a custom 
{{RowKeySelector}}.

{code}
class AggregateITCase extends StreamingWithStateTestBase {
  private val queryConfig = new StreamQueryConfig()
  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))


  @Test
  def testDistinct(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(getStateBackend)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    StreamITCase.clear

    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
      .select('b, Null(Types.LONG)).distinct()

    val results = t.toRetractStream[Row](queryConfig)
    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
    env.execute()

    val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", 
"5,null", "6,null")
    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
  }
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to