weiqingy commented on code in PR #28148:
URL: https://github.com/apache/flink/pull/28148#discussion_r3368884973
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala:
##########
@@ -56,31 +61,76 @@ class BatchPhysicalCorrelateRule(config: Config) extends
ConverterRule(config) {
val convInput: RelNode = RelOptRule.convert(join.getInput(0),
FlinkConventions.BATCH_PHYSICAL)
val right: RelNode = join.getInput(1)
- def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]):
BatchPhysicalCorrelate = {
- relNode match {
- case rel: RelSubset =>
- convertToCorrelate(rel.getRelList.get(0), condition)
-
- case calc: FlinkLogicalCalc =>
- convertToCorrelate(
- calc.getInput.asInstanceOf[RelSubset].getOriginal,
- if (calc.getProgram.getCondition == null) None
- else
Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))
- )
-
- case scan: FlinkLogicalTableFunctionScan =>
- new BatchPhysicalCorrelate(
- rel.getCluster,
- traitSet,
- convInput,
- scan,
- condition,
- rel.getRowType,
- join.getJoinType)
- }
+ val (scan, calcOpt) = unwrapToScan(right)
+
+ val condition: Option[RexNode] = calcOpt.flatMap {
+ calc =>
+ val program = calc.getProgram
+ Option(program.getCondition).map(program.expandLocalRef)
+ }
+
+ // Non-identity projection above the TFS cannot be folded into the
physical Correlate's
+ // output rowType: the codegen concatenates the left input with the full
TFS output
+ // positionally. Apply it via a wrapping Calc instead. SEMI/ANTI
Correlates output only
+ // the left fields, so the projection has no effect there.
+ val needsProjectionAbove =
+ calcOpt.exists(calc => !calc.getProgram.projectsOnlyIdentity()) &&
+ (join.getJoinType == JoinRelType.INNER || join.getJoinType ==
JoinRelType.LEFT)
+
+ if (needsProjectionAbove) {
+ val combinedRowType = SqlValidatorUtil.deriveJoinRowType(
+ convInput.getRowType,
+ scan.getRowType,
+ join.getJoinType,
+ join.getCluster.getTypeFactory,
+ null,
+ Collections.emptyList())
+
+ val physicalCorrelate = new BatchPhysicalCorrelate(
+ join.getCluster,
+ traitSet,
+ convInput,
+ scan,
+ condition,
+ combinedRowType,
+ join.getJoinType)
+
+ val wrappingProgram = CorrelateProjectionUtils.buildWrappingProgram(
+ calcOpt.get.getProgram,
+ combinedRowType,
+ join.getRowType,
+ convInput.getRowType.getFieldCount,
+ join.getCluster.getRexBuilder)
+
+ new BatchPhysicalCalc(
+ join.getCluster,
+ traitSet,
+ physicalCorrelate,
+ wrappingProgram,
+ join.getRowType)
+ } else {
+ new BatchPhysicalCorrelate(
+ join.getCluster,
+ traitSet,
+ convInput,
+ scan,
+ condition,
+ join.getRowType,
+ join.getJoinType)
+ }
+ }
+
+ private def unwrapToScan(
+ rel: RelNode): (FlinkLogicalTableFunctionScan, Option[FlinkLogicalCalc])
= {
+ rel match {
+ case subset: RelSubset => unwrapToScan(subset.getRelList.get(0))
+ case calc: FlinkLogicalCalc =>
+ val (scan, _) =
unwrapToScan(calc.getInput.asInstanceOf[RelSubset].getOriginal)
Review Comment:
Batch's `unwrapToScan` recurses to find the scan but returns only the
outermost calc — `val (scan, _) = unwrapToScan(...)` drops the inner calc's
program, so `buildWrappingProgram` sees only the outer projection/condition.
Stream instead folds the stack via `getMergedCalc`. If two `FlinkLogicalCalc`s
can stack here, batch would lose the inner one — the same class this PR fixes.
Are stacked Calcs guaranteed merged by CalcMerge before this rule runs? If yes,
a one-line note would help; if not, should batch adopt stream's `getMergedCalc`?
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml:
##########
@@ -689,8 +689,9 @@ LogicalProject(a=[$0], number=[$1], ordinality=[$2])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, number, ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,number,ordinality], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER number, INTEGER ordinality)], joinType=[INNER], condition=[=($1, 1)])
- +- TableSourceScan(table=[[default_catalog, default_database, MyTable]],
fields=[a, b])
++- Calc(select=[a, b, b0 AS number, 1 AS ordinality])
Review Comment:
The bug here is silently *wrong results* — per the JIRA, `SELECT a, b, v
FROM T CROSS JOIN UNNEST(c) AS f(k, v)` returned the keys (`[1, 11, a]`)
instead of the values (`[1, 11, 10]`). But the only verification is these two
plan-XML updates, which pin the plan *shape*, not the rows. A plan test staying
green doesn't prove codegen now reads position 1 instead of 0 — and a future
regression that reshaped the plan back would just re-bless the XML. Could we
add a result-asserting ITCase (the JIRA's exact query + expected rows)? That's
the test that actually catches this class of bug.
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml:
##########
@@ -700,8 +700,8 @@ LogicalProject(a=[$0], number=[$1], ordinality=[$2])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, number, ordinality])
-+- Correlate(invocation=[$UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b)],
correlate=[table($UNNEST_ROWS_WITH_ORDINALITY$1($cor0.b))],
select=[a,b,number,ordinality], rowType=[RecordType(INTEGER a, INTEGER ARRAY b,
INTEGER number, INTEGER ordinality)], joinType=[INNER], condition=[=($1, 1)])
+Calc(select=[a, b0 AS number, 1 AS ordinality])
Review Comment:
Interesting asymmetry in the generated plans: the stream side here collapses
the wrapping projection into a single `Calc(select=[a, b0 AS number, 1 AS
ordinality])`, but the batch `UnnestTest.xml` keeps two stacked Calcs
(`Calc(select=[a, number, ordinality])` over `Calc(select=[a, b, b0 AS number,
1 AS ordinality])`). So a PhysicalCalcMerge clearly *can* fire — it just didn't
on the batch side, leaving a second per-record projection over the Correlate
output. Is the batch pair expected to stay unmerged here, or is that an
unintended extra projection worth folding?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]