[ 
https://issues.apache.org/jira/browse/FLINK-39241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39241:
-----------------------------------
    Labels: pull-request-available  (was: )

> Incorrect row data using in MultiJoinStateViews for HeapStateBackend case
> -------------------------------------------------------------------------
>
>                 Key: FLINK-39241
>                 URL: https://issues.apache.org/jira/browse/FLINK-39241
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Dmitriy Linevich
>            Priority: Major
>              Labels: pull-request-available
>
> Problem can be reproduced by the following test
> {code:java}
> @TestTemplate
> def testThreeWayMultiJoin(): Unit = {
>   env.setParallelism(1)
>   val data1 = new mutable.MutableList[(Int, Long)]
>   data1.+=((1, 1L))
>   data1.+=((1, 2L))
>   data1.+=((1, 2L))
>   data1.+=((1, 5L))
>   data1.+=((2, 7L))
>   data1.+=((1, 9L))
>   data1.+=((1, 8L))
>   data1.+=((3, 8L))
>   val data2 = new mutable.MutableList[(Int, Long)]
>   data2.+=((1, 1L))
>   data2.+=((2, 2L))
>   data2.+=((3, 2L))
>   data2.+=((1, 4L))
>   val data3 = new mutable.MutableList[(Int, Long)]
>   data3.+=((1, 1L))
>   data3.+=((2, 2L))
>   data3.+=((3, 2L))
>   data3.+=((2, 1L))
>   val a = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
>   val b = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
>   val c = failingDataSource(data3).toTable(tEnv, 'c1, 'c2)
>   tEnv.createTemporaryView("Atable", a)
>   tEnv.createTemporaryView("Btable", b)
>   tEnv.createTemporaryView("Ctable", c)
>   tEnv.getConfig.getConfiguration
>     
> .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), 
> "true")
>   val query1 = "SELECT SUM(a2) AS a2, a1 FROM Atable group by a1"
>   val query2 = "SELECT SUM(b2) AS b2, b1 FROM Btable group by b1"
>   val query3 = "SELECT SUM(c2) AS c2, c1 FROM Ctable group by c1"
>   val query = s"SELECT a1, b1, c1 FROM ($query1) JOIN ($query2) ON a1 = b1 
> JOIN ($query3) ON c2 = b2"
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected = Seq("2,2,3", "3,3,3")
>   assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
> }{code}
>  
> Test result for heap state backend is wrong
> {code:java}
> Expected :List(2,2,3, 3,3,3)
> Actual   :List(2,2,2, 2,2,3, 3,3,2, 3,3,3) {code}
>  
> The problem is essentially the same as 
> [FLINK-39015|https://issues.apache.org/jira/browse/FLINK-39015], but similar 
> corrections are already needed [here|#L231]].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to