[ 
https://issues.apache.org/jira/browse/FLINK-39015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Linevich updated FLINK-39015:
-------------------------------------
    Description: 
org.apache.flink.table.planner.runtime.stream.sql.JoinITCase#testInnerJoinWithEqualPk
 fails with enabled multi join optimization
 
{code:java}
@TestTemplate
def testInnerJoinWithEqualPk(): Unit = {
  env.setParallelism(1)
  tEnv.getConfig.getConfiguration
    .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), 
"true")
  val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
  val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
  val query = s"SELECT a1, b1 FROM ($query1) JOIN ($query2) ON a1 = b1"

  val sink = new TestingRetractSink
  tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()

  val expected = Seq("1,1", "2,2", "3,3")
  assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
} {code}
The test failure occurs only in the HeapStateBackend case.
This is because AttributeBasedJoinKeyExtractor returns GenericRowData 
([here|https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java#L287])
 as the key for the get request in the state backend, but after recovery from a 
checkpoint, BinaryRowData is now used as the key in HeapStateBackend, and 
BinaryRowData does not equal GenericRowData 
([GenericRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java#L219]]
 and 
[BinaryRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java#L449]),]
 and the same for hashCode method.

  was:
org.apache.flink.table.planner.runtime.stream.sql.JoinITCase#testInnerJoinWithEqualPk
 fails with enabled multi join optimization
 
{code:java}
@TestTemplate
def testInnerJoinWithEqualPk(): Unit = {
  env.setParallelism(1)
  tEnv.getConfig.getConfiguration
    .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), 
"true")
  val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
  val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
  val query = s"SELECT a1, b1 FROM ($query1) JOIN ($query2) ON a1 = b1"

  val sink = new TestingRetractSink
  tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
  env.execute()

  val expected = Seq("1,1", "2,2", "3,3")
  assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
} {code}
The test failure occurs only in the HeapStateBackend case.
This is because AttributeBasedJoinKeyExtractor returns GenericRowData 
([here|https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java#L287])
 as the key for the get request in the state backend, but after recovery from a 
checkpoint, BinaryRowData is now used as the key in HeapStateBackend, and 
BinaryRowData does not equal GenericRowData.


> Fix MultiJoin rowadata behaviour for HeapStateBackend
> -----------------------------------------------------
>
>                 Key: FLINK-39015
>                 URL: https://issues.apache.org/jira/browse/FLINK-39015
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Dmitriy Linevich
>            Priority: Major
>
> org.apache.flink.table.planner.runtime.stream.sql.JoinITCase#testInnerJoinWithEqualPk
>  fails with enabled multi join optimization
>  
> {code:java}
> @TestTemplate
> def testInnerJoinWithEqualPk(): Unit = {
>   env.setParallelism(1)
>   tEnv.getConfig.getConfiguration
>     
> .setString(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED.key(), 
> "true")
>   val query1 = "SELECT SUM(a2) AS a2, a1 FROM A group by a1"
>   val query2 = "SELECT SUM(b2) AS b2, b1 FROM B group by b1"
>   val query = s"SELECT a1, b1 FROM ($query1) JOIN ($query2) ON a1 = b1"
>   val sink = new TestingRetractSink
>   tEnv.sqlQuery(query).toRetractStream[Row].addSink(sink).setParallelism(1)
>   env.execute()
>   val expected = Seq("1,1", "2,2", "3,3")
>   assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
> } {code}
> The test failure occurs only in the HeapStateBackend case.
> This is because AttributeBasedJoinKeyExtractor returns GenericRowData 
> ([here|https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/keyselector/AttributeBasedJoinKeyExtractor.java#L287])
>  as the key for the get request in the state backend, but after recovery from 
> a checkpoint, BinaryRowData is now used as the key in HeapStateBackend, and 
> BinaryRowData does not equal GenericRowData 
> ([GenericRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/GenericRowData.java#L219]]
>  and 
> [BinaryRowData|[https://github.com/apache/flink/blob/80e26ca982872b2eae6266816ccfd6007de17b08/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java#L449]),]
>  and the same for hashCode method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to