[ 
https://issues.apache.org/jira/browse/FLINK-39241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-39241:
-------------------------------------
    Description: 
Problem can be reproduced by the following test
{code:java}
@TestTemplate
def testThreeWayMultiJoin(): Unit = {
  env.setParallelism(1)
  val data1 = new mutable.MutableList[(Int, Long)]
  data1.+=((1, 1L))
  data1.+=((1, 2L))
  data1.+=((1, 2L))
  data1.+=((1, 5L))
  data1.+=((2, 7L))
  data1.+=((1, 9L))
  data1.+=((1, 8L))
  data1.+=((3, 8L))

  val data2 = new mutable.MutableList[(Int, Long)]
  data2.+=((1, 1L))
  data2.+=((2, 2L))
  data2.+=((3, 2L))
  data2.+=((1, 4L))

  val data3 = new mutable.MutableList[(Int, Long)]
  data3.+=((1, 1L))
  data3.+=((2, 2L))
  data3.+=((3, 2L))
  data3.+=((2, 1L))

  val a = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
  val b = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
  val c = failingDataSource(data3).toTable(tEnv, 'c1, 'c2)

  tEnv.createTemporaryView("Atable", a)
  tEnv.createTemporaryView("Btable", b)
  tEnv.createTemporaryView("Ctable", c)

  tEnv.getConfig.getConfiguration
    .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), 
"true")
  val query1 = "SELECT SUM(a2) AS a2, a1 FROM Atable group by a1"
  val query2 = "SELECT SUM(b2) AS b2, b1 FROM Btable group by b1"
  val query3 = "SELECT SUM(c2) AS c2, c1 FROM Ctable group by c1"
  val query = s"SELECT a1, b1, c1 FROM ($query1) JOIN ($query2) ON a1 = b1 JOIN 
($query3) ON c2 = b2"

  val sink = new TestingRetractSink
  tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()

  val expected = Seq("2,2,3", "3,3,3")
  assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
}{code}
Test result for heap state backend is wrong

 
{code:java}
Expected :List(2,2,3, 3,3,3)
Actual   :List(2,2,2, 2,2,3, 3,3,2, 3,3,3) {code}
 

 

> Incorrect row data using in MultiJoinStateViews for HeapStateBackend case
> -------------------------------------------------------------------------
>
>                 Key: FLINK-39241
>                 URL: https://issues.apache.org/jira/browse/FLINK-39241
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Dmitriy Linevich
>            Priority: Major
>
> Problem can be reproduced by the following test
> {code:java}
> @TestTemplate
> def testThreeWayMultiJoin(): Unit = {
>   env.setParallelism(1)
>   val data1 = new mutable.MutableList[(Int, Long)]
>   data1.+=((1, 1L))
>   data1.+=((1, 2L))
>   data1.+=((1, 2L))
>   data1.+=((1, 5L))
>   data1.+=((2, 7L))
>   data1.+=((1, 9L))
>   data1.+=((1, 8L))
>   data1.+=((3, 8L))
>   val data2 = new mutable.MutableList[(Int, Long)]
>   data2.+=((1, 1L))
>   data2.+=((2, 2L))
>   data2.+=((3, 2L))
>   data2.+=((1, 4L))
>   val data3 = new mutable.MutableList[(Int, Long)]
>   data3.+=((1, 1L))
>   data3.+=((2, 2L))
>   data3.+=((3, 2L))
>   data3.+=((2, 1L))
>   val a = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
>   val b = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
>   val c = failingDataSource(data3).toTable(tEnv, 'c1, 'c2)
>   tEnv.createTemporaryView("Atable", a)
>   tEnv.createTemporaryView("Btable", b)
>   tEnv.createTemporaryView("Ctable", c)
>   tEnv.getConfig.getConfiguration
>     
> .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), 
> "true")
>   val query1 = "SELECT SUM(a2) AS a2, a1 FROM Atable group by a1"
>   val query2 = "SELECT SUM(b2) AS b2, b1 FROM Btable group by b1"
>   val query3 = "SELECT SUM(c2) AS c2, c1 FROM Ctable group by c1"
>   val query = s"SELECT a1, b1, c1 FROM ($query1) JOIN ($query2) ON a1 = b1 
> JOIN ($query3) ON c2 = b2"
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected = Seq("2,2,3", "3,3,3")
>   assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
> }{code}
> Test result for heap state backend is wrong
>  
> {code:java}
> Expected :List(2,2,3, 3,3,3)
> Actual   :List(2,2,2, 2,2,3, 3,3,2, 3,3,3) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to