[
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu closed FLINK-9528.
--------------------------
Fix Version/s: 1.11.0
Resolution: Fixed
This is fixed by FLINK-16887.
Test is added in
{{org.apache.flink.table.planner.runtime.stream.table.TableSinkITCase#testUpsertSinkWithFilter}}
> Incorrect results: Filter does not treat Upsert messages correctly.
> -------------------------------------------------------------------
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.3.3, 1.4.2, 1.5.0
> Reporter: Fabian Hueske
> Priority: Critical
> Fix For: 1.11.0
>
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between
> retraction and upsert mode. A Calc looks at record (regardless of its update
> semantics) and either discard it (predicate evaluates to false) or pass it on
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the
> problem:
> {code:java}
> @Test
> def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
> .assignAscendingTimestamps(_._1.toLong)
> .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
> .groupBy('len)
> .select('len, 'len.count as 'cnt)
> // .where('cnt < 7)
> .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
> "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
> }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows
> that do not fulfill the condition are removed from the result. However, the
> filter only removes the upsert message such that the previous version remains
> in the result.
> One solution could be to make a filter aware of the update semantics (retract
> or upsert) and convert the upsert message into a delete message if the
> predicate evaluates to false.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)