[ 
https://issues.apache.org/jira/browse/FLINK-33213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Paryshev updated FLINK-33213:
------------------------------------
    Description: 
Currently Flink didn't take account `where` condition when calculate 
SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.

 

To reproduce bug:
{code:java}
@Test
def testMaxRetract(): Unit = {
  env.setParallelism(1)
  val data = new mutable.MutableList[(Int, Int)]
  data.+=((1, 10))
  data.+=((1, 10))
  data.+=((2, 5))
  data.+=((1, 10))

  val t = failingDataSource(data).toTable(tEnv, 'id, 'price)
  tEnv.createTemporaryView("T", t)

  val sql =
    """
      |SELECT MAX(price) FROM(
      |   SELECT id, count(*) as c, price FROM T GROUP BY id, price)
      |WHERE c > 0 and c < 3""".stripMargin

  val sink = new TestingRetractSink
  tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
  env.execute()

  val expected = List("5")
  assertEquals(expected.sorted, sink.getRetractResults.sorted)
} {code}

  was:
Currently Flink didn't take account `where` condition when calculate 
SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.

 

To reproduce bug:


> Flink SQL MinMax aggregations without retract messages when `where` condition 
> exist
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-33213
>                 URL: https://issues.apache.org/jira/browse/FLINK-33213
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.7, 1.13.6, 1.14.6, 1.15.4, 1.16.2, 1.17.1
>            Reporter: Sergey Paryshev
>            Priority: Major
>             Fix For: 1.19.0
>
>
> Currently Flink didn't take account `where` condition when calculate 
> SqlMonotonicity for Calc/FlinkLogicalCalc/StreamPhysicalCalc.
>  
> To reproduce bug:
> {code:java}
> @Test
> def testMaxRetract(): Unit = {
>   env.setParallelism(1)
>   val data = new mutable.MutableList[(Int, Int)]
>   data.+=((1, 10))
>   data.+=((1, 10))
>   data.+=((2, 5))
>   data.+=((1, 10))
>   val t = failingDataSource(data).toTable(tEnv, 'id, 'price)
>   tEnv.createTemporaryView("T", t)
>   val sql =
>     """
>       |SELECT MAX(price) FROM(
>       |   SELECT id, count(*) as c, price FROM T GROUP BY id, price)
>       |WHERE c > 0 and c < 3""".stripMargin
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = List("5")
>   assertEquals(expected.sorted, sink.getRetractResults.sorted)
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to