[
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504471#comment-16504471
]
Hequn Cheng commented on FLINK-9528:
------------------------------------
Hi, [~fhueske]. Considering the anti-spamming scenario, users probably just
want to get top 1% data from the result of group by, the rest 99% of the data
will all become delete messages after the {{Filter}}. This would be a disaster
for a storage. Especailly for the case, most of the coming keys are new ones
and can not be swallowed by a cache.
Adding a GroupByHaving operator is a good way but this only cover the case that
filter can be pushed down into group by. If a filter can not be pushed down, we
can turn the upsert stream into a retract stream to ensure correctness(or throw
exception and inform user to use {{RetractTableSink}}). What do you think?
> Incorrect results: Filter does not treat Upsert messages correctly.
> -------------------------------------------------------------------
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.3, 1.5.0, 1.4.2
> Reporter: Fabian Hueske
> Assignee: Hequn Cheng
> Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between
> retraction and upsert mode. A Calc looks at record (regardless of its update
> semantics) and either discard it (predicate evaluates to false) or pass it on
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the
> problem:
> {code:java}
> @Test
> def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
> .assignAscendingTimestamps(_._1.toLong)
> .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
> .groupBy('len)
> .select('len, 'len.count as 'cnt)
> // .where('cnt < 7)
> .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
> "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
> }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows
> that do not fulfill the condition are removed from the result. However, the
> filter only removes the upsert message such that the previous version remains
> in the result.
> One solution could be to make a filter aware of the update semantics (retract
> or upsert) and convert the upsert message into a delete message if the
> predicate evaluates to false.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)