xuyang created FLINK-29221:
------------------------------
Summary: Adding join hint in sql may cause imcompatible state
Key: FLINK-29221
URL: https://issues.apache.org/jira/browse/FLINK-29221
Project: Flink
Issue Type: Improvement
Reporter: xuyang
The cause of the possible imcompatible state is that the sql before adding join
hint and after is changed.
Adding the following code in DagOptimizationTest.scala can re-produce this
change.
{code:java}
@Test
def testMultiSinks6(): Unit = {
val stmtSet = util.tableEnv.createStatementSet()
util.tableEnv.getConfig.set(
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
Boolean.box(true))
// test with non-deterministic udf
util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())
val table1 = util.tableEnv.sqlQuery(
"SELECT random_udf(a) AS a, cast(b as int) as b, c FROM MyTable join
MyTable1 on MyTable.c = MyTable1.f")
util.tableEnv.registerTable("table1", table1)
val table2 = util.tableEnv.sqlQuery("SELECT SUM(a) AS total_sum FROM table1")
val table3 = util.tableEnv.sqlQuery("SELECT MIN(b) AS total_min FROM table1")
val sink1 = util.createCollectTableSink(Array("total_sum"), Array(INT))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("sink1",
sink1)
stmtSet.addInsert("sink1", table2)
val sink2 = util.createCollectTableSink(Array("total_min"), Array(INT))
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("sink2",
sink2)
stmtSet.addInsert("sink2", table3)
util.verifyExecPlan(stmtSet)
} {code}
The plan is :
{code:java}
// ast
LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink1`],
fields=[total_sum])
+- LogicalAggregate(group=[{}], total_sum=[SUM($0)])
+- LogicalProject(a=[$0])
+- LogicalProject(a=[random_udf($0)], b=[CAST($1):INTEGER], c=[$2])
+- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(d, e, f)]]])
LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink2`],
fields=[total_min])
+- LogicalAggregate(group=[{}], total_min=[MIN($0)])
+- LogicalProject(b=[$1])
+- LogicalProject(a=[random_udf($0)], b=[CAST($1):INTEGER], c=[$2])
+- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]])
+- LogicalTableScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(d, e, f)]]])
// optimized exec
HashJoin(joinType=[InnerJoin], where=[(c = f)], select=[a, b, c, d, e, f],
build=[right])(reuse_id=[1])
:- Exchange(distribution=[hash[c]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
LegacySink(name=[`default_catalog`.`default_database`.`sink1`],
fields=[total_sum])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS total_sum])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(a) AS sum$0])
+- Calc(select=[random_udf(a) AS a])
+- Reused(reference_id=[1])
LegacySink(name=[`default_catalog`.`default_database`.`sink2`],
fields=[total_min])
+- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS total_min])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_MIN(b) AS min$0])
+- Calc(select=[CAST(b AS INTEGER) AS b])
+- Reused(reference_id=[1]){code}
If the join hint is added, the `sqlToRelConverterConfig` will add a config
'withBloat(-1)' and disable merging project when convert sql node to rel
node(see more in FlinkPlannerImpl), and the optimized exec plan will be changed
because of SubGraphBasedOptimizer:
{code:java}
// optimized exec
Calc(select=[random_udf(a) AS a, CAST(b AS INTEGER) AS b, c])(reuse_id=[1])
+- HashJoin(joinType=[InnerJoin], where=[(c = f)], select=[a, b, c, f],
build=[right])
:- Exchange(distribution=[hash[c]])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f]])
+- Calc(select=[f])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e,
f])LegacySink(name=[`default_catalog`.`default_database`.`sink1`],
fields=[total_sum])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS total_sum])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_SUM(a) AS sum$0])
+- Calc(select=[a])
+-
Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`sink2`],
fields=[total_min])
+- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS total_min])
+- Exchange(distribution=[single])
+- LocalHashAggregate(select=[Partial_MIN(b) AS min$0])
+- Calc(select=[b])
+- Reused(reference_id=[1]) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)