[
https://issues.apache.org/jira/browse/FLINK-39241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39241:
-----------------------------------
Labels: pull-request-available (was: )
> Incorrect row data using in MultiJoinStateViews for HeapStateBackend case
> -------------------------------------------------------------------------
>
> Key: FLINK-39241
> URL: https://issues.apache.org/jira/browse/FLINK-39241
> Project: Flink
> Issue Type: Bug
> Affects Versions: 2.1.0, 2.2.0
> Reporter: Dmitriy Linevich
> Priority: Major
> Labels: pull-request-available
>
> Problem can be reproduced by the following test
> {code:java}
> @TestTemplate
> def testThreeWayMultiJoin(): Unit = {
> env.setParallelism(1)
> val data1 = new mutable.MutableList[(Int, Long)]
> data1.+=((1, 1L))
> data1.+=((1, 2L))
> data1.+=((1, 2L))
> data1.+=((1, 5L))
> data1.+=((2, 7L))
> data1.+=((1, 9L))
> data1.+=((1, 8L))
> data1.+=((3, 8L))
> val data2 = new mutable.MutableList[(Int, Long)]
> data2.+=((1, 1L))
> data2.+=((2, 2L))
> data2.+=((3, 2L))
> data2.+=((1, 4L))
> val data3 = new mutable.MutableList[(Int, Long)]
> data3.+=((1, 1L))
> data3.+=((2, 2L))
> data3.+=((3, 2L))
> data3.+=((2, 1L))
> val a = failingDataSource(data1).toTable(tEnv, 'a1, 'a2)
> val b = failingDataSource(data2).toTable(tEnv, 'b1, 'b2)
> val c = failingDataSource(data3).toTable(tEnv, 'c1, 'c2)
> tEnv.createTemporaryView("Atable", a)
> tEnv.createTemporaryView("Btable", b)
> tEnv.createTemporaryView("Ctable", c)
> tEnv.getConfig.getConfiguration
>
> .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(),
> "true")
> val query1 = "SELECT SUM(a2) AS a2, a1 FROM Atable group by a1"
> val query2 = "SELECT SUM(b2) AS b2, b1 FROM Btable group by b1"
> val query3 = "SELECT SUM(c2) AS c2, c1 FROM Ctable group by c1"
> val query = s"SELECT a1, b1, c1 FROM ($query1) JOIN ($query2) ON a1 = b1
> JOIN ($query3) ON c2 = b2"
> val sink = new TestingRetractSink
> tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
> env.execute()
> val expected = Seq("2,2,3", "3,3,3")
> assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
> }{code}
>
> Test result for heap state backend is wrong
> {code:java}
> Expected :List(2,2,3, 3,3,3)
> Actual :List(2,2,2, 2,2,3, 3,3,2, 3,3,3) {code}
>
> The problem is essentially the same as
> [FLINK-39015|https://issues.apache.org/jira/browse/FLINK-39015], but similar
> corrections are already needed [here|#L231]].
--
This message was sent by Atlassian Jira
(v8.20.10#820010)