[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504489#comment-16504489
 ] 

Fabian Hueske commented on FLINK-9528:
--------------------------------------

True, highly-selective filters would definitely be an issue. However, most 
operators could handle duplicates quite well and would not need to have them 
filtered because they have the necessary state anyway. Only sinks are affected 
by this. Why not add a duplicate filter for upsert sinks? This would keep the 
filter stateless and efficient.

The duplicate filter for the upsert sink would need a boolean flag per key, and 
the number of keys depends on the selectivity of the filter, i.e., the size of 
the state is proportional to the gains. Later we could improve it by using a 
probabilistic filter (proposed in FLINK-8601).

> Incorrect results: Filter does not treat Upsert messages correctly.
> -------------------------------------------------------------------
>
>                 Key: FLINK-9528
>                 URL: https://issues.apache.org/jira/browse/FLINK-9528
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.3, 1.5.0, 1.4.2
>            Reporter: Fabian Hueske
>            Assignee: Hequn Cheng
>            Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.getConfig.enableObjectReuse()
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val t = StreamTestData.get3TupleDataStream(env)
>       .assignAscendingTimestamps(_._1.toLong)
>       .toTable(tEnv, 'id, 'num, 'text)
>     t.select('text.charLength() as 'len)
>       .groupBy('len)
>       .select('len, 'len.count as 'cnt)
>       // .where('cnt < 7)
>       .writeToSink(new TestUpsertSink(Array("len"), false))
>     env.execute()
>     val results = RowCollector.getAndClearValues
>     val retracted = RowCollector.upsertResults(results, Array(0)).sorted
>     val expectedWithoutFilter = List(
>       "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
>     val expectedWithFilter = List(
>     "2,1", "5,1", "11,1", "14,1", "25,1").sorted
>     assertEquals(expectedWithoutFilter, retracted)
>     // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to