[
https://issues.apache.org/jira/browse/FLINK-39015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dmitriy Linevich updated FLINK-39015:
-------------------------------------
Description:
org.apache.flink.table.planner.runtime.stream.sql.JoinITCase#testInnerJoinWithEqualPk
fails with enabled multi join optimization
{code:java}
@TestTemplate
def testInnerJoinWithEqualPk(): Unit = {
env.setParallelism(1)
tEnv.getConfig.getConfiguration
.setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(),
"true")
val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
val query = s"SELECT a1, b1 FROM ($query1) JOIN ($query2) ON a1 = b1"
val sink = new TestingRetractSink
tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
val expected = Seq("1,1", "2,2", "3,3")
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
} {code}
The test failure occurs only in the HeapStateBackend case.
This is because AttributeBasedJoinKeyExtractor returns GenericRowData
([here|https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java#L287])
as the key for the get request in the state backend, but after recovery from a
checkpoint, BinaryRowData is now used as the key in HeapStateBackend, and
BinaryRowData does not equal GenericRowData
([GenericRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java#L219]]
and
[BinaryRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java#L449]),]
and the same for hashCode method.
was:
org.apache.flink.table.planner.runtime.stream.sql.JoinITCase#testInnerJoinWithEqualPk
fails with enabled multi join optimization
{code:java}
@TestTemplate
def testInnerJoinWithEqualPk(): Unit = {
env.setParallelism(1)
tEnv.getConfig.getConfiguration
.setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(),
"true")
val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
val query = s"SELECT a1, b1 FROM ($query1) JOIN ($query2) ON a1 = b1"
val sink = new TestingRetractSink
tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
env.execute()
val expected = Seq("1,1", "2,2", "3,3")
assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
} {code}
The test failure occurs only in the HeapStateBackend case.
This is because AttributeBasedJoinKeyExtractor returns GenericRowData
([here|https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java#L287])
as the key for the get request in the state backend, but after recovery from a
checkpoint, BinaryRowData is now used as the key in HeapStateBackend, and
BinaryRowData does not equal GenericRowData.
> Fix MultiJoin rowadata behaviour for HeapStateBackend
> -----------------------------------------------------
>
> Key: FLINK-39015
> URL: https://issues.apache.org/jira/browse/FLINK-39015
> Project: Flink
> Issue Type: Bug
> Affects Versions: 2.1.0, 2.2.0
> Reporter: Dmitriy Linevich
> Priority: Major
>
> org.apache.flink.table.planner.runtime.stream.sql.JoinITCase#testInnerJoinWithEqualPk
> fails with enabled multi join optimization
>
> {code:java}
> @TestTemplate
> def testInnerJoinWithEqualPk(): Unit = {
> env.setParallelism(1)
> tEnv.getConfig.getConfiguration
>
> .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(),
> "true")
> val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
> val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
> val query = s"SELECT a1, b1 FROM ($query1) JOIN ($query2) ON a1 = b1"
> val sink = new TestingRetractSink
> tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
> env.execute()
> val expected = Seq("1,1", "2,2", "3,3")
> assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
> } {code}
> The test failure occurs only in the HeapStateBackend case.
> This is because AttributeBasedJoinKeyExtractor returns GenericRowData
> ([here|https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java#L287])
> as the key for the get request in the state backend, but after recovery from
> a checkpoint, BinaryRowData is now used as the key in HeapStateBackend, and
> BinaryRowData does not equal GenericRowData
> ([GenericRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java#L219]]
> and
> [BinaryRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java#L449]),]
> and the same for hashCode method.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)