[
https://issues.apache.org/jira/browse/FLINK-33213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergey Paryshev updated FLINK-33213:
------------------------------------
Description:
Currently Flink didn't take account `where` condition when calculate
SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.
To reproduce bug:
{code:java}
@Test
def testMaxRetract(): Unit = {
env.setParallelism(1)
val data = new mutable.MutableList[(Int, Int)]
data.+=((1, 10))
data.+=((1, 10))
data.+=((2, 5))
data.+=((1, 10))
val t = failingDataSource(data).toTable(tEnv, 'id, 'price)
tEnv.createTemporaryView("T", t)
val sql =
"""
|SELECT MAX(price) FROM(
| SELECT id, count(*) as c, price FROM T GROUP BY id, price)
|WHERE c > 0 and c < 3""".stripMargin
val sink = new TestingRetractSink
tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
env.execute()
val expected = List("5")
assertEquals(expected.sorted, sink.getRetractResults.sorted)
} {code}
was:
Currently Flink didn't take account `where` condition when calculate
SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.
To reproduce bug:
> Flink SQL MinMax aggregations without retract messages when `where` condition
> exist
> -----------------------------------------------------------------------------------
>
> Key: FLINK-33213
> URL: https://issues.apache.org/jira/browse/FLINK-33213
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.16.2, 1.17.1
> Reporter: Sergey Paryshev
> Priority: Major
> Fix For: 1.19.0
>
>
> Currently Flink didn't take account `where` condition when calculate
> SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.
>
> To reproduce bug:
> {code:java}
> @Test
> def testMaxRetract(): Unit = {
> env.setParallelism(1)
> val data = new mutable.MutableList[(Int, Int)]
> data.+=((1, 10))
> data.+=((1, 10))
> data.+=((2, 5))
> data.+=((1, 10))
> val t = failingDataSource(data).toTable(tEnv, 'id, 'price)
> tEnv.createTemporaryView("T", t)
> val sql =
> """
> |SELECT MAX(price) FROM(
> | SELECT id, count(*) as c, price FROM T GROUP BY id, price)
> |WHERE c > 0 and c < 3""".stripMargin
> val sink = new TestingRetractSink
> tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
> env.execute()
> val expected = List("5")
> assertEquals(expected.sorted, sink.getRetractResults.sorted)
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)