Fabian Hueske created FLINK-9528:
------------------------------------

             Summary: Incorrect results: Filter does not treat Upsert messages 
correctly.
                 Key: FLINK-9528
                 URL: https://issues.apache.org/jira/browse/FLINK-9528
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.4.2, 1.5.0, 1.3.3
            Reporter: Fabian Hueske


Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
retraction and upsert mode. A Calc looks at record (regardless of its update 
semantics) and either discard it (predicate evaluates to false) or pass it on 
(predicate evaluates to true).

This works fine for messages with retraction semantics but is not correct for 
upsert messages.

The following test case (can be pasted into {{TableSinkITCase}}) shows the 
problem:
{code:java}
  @Test
  def testUpsertsWithFilter(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.getConfig.enableObjectReuse()
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val t = StreamTestData.get3TupleDataStream(env)
      .assignAscendingTimestamps(_._1.toLong)
      .toTable(tEnv, 'id, 'num, 'text)

    t.select('text.charLength() as 'len)
      .groupBy('len)
      .select('len, 'len.count as 'cnt)
      // .where('cnt < 7)
      .writeToSink(new TestUpsertSink(Array("len"), false))

    env.execute()
    val results = RowCollector.getAndClearValues

    val retracted = RowCollector.upsertResults(results, Array(0)).sorted
    val expectedWithoutFilter = List(
      "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
    val expectedWithFilter = List(
    "2,1", "5,1", "11,1", "14,1", "25,1").sorted

    assertEquals(expectedWithoutFilter, retracted)
    // assertEquals(expectedWithFilter, retracted)
  }
{code}
When we add a filter on the aggregation result, we would expect that all rows 
that do not fulfill the condition are removed from the result. However, the 
filter only removes the upsert message such that the previous version remains 
in the result.

One solution could be to make a filter aware of the update semantics (retract 
or upsert) and convert the upsert message into a delete message if the 
predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to