[
https://issues.apache.org/jira/browse/FLINK-39669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Metzger updated FLINK-39669:
-----------------------------------
Component/s: Table SQL / Planner
> BatchPhysicalCorrelateRule / StreamPhysicalCorrelateRule drop right-side
> projection when a Calc sits between Correlate and TableFunctionScan
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39669
> URL: https://issues.apache.org/jira/browse/FLINK-39669
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Venkata krishnan Sowrirajan
> Priority: Major
> Labels: pull-request-available
>
> BatchPhysicalCorrelateRule.convertToCorrelate and
> StreamPhysicalCorrelateRule.convertToCorrelate walk through a
> FlinkLogicalCalc between the Correlate and the underlying
> FlinkLogicalTableFunctionScan to extract the Calc's condition, but silently
> discard the Calc's projection list. The physical
> BatchPhysicalCorrelate / StreamPhysicalCorrelate is constructed with the
> Correlate's output RelDataType (left fields + projected right fields), while
> CorrelateCodeGenerator concatenates the left input with the full TFS output
> positionally (JoinedRowData.replace(input, udtfInput) in
> CorrelateCodeGenerator.scala). When the projection narrows the right-side
> output, downstream consumers index the first N fields of (left + full TFS),
> returning the wrong TFS column at runtime.
> Reproduction:
> For SELECT a, b, v FROM T CROSS JOIN UNNEST(c) AS f(k, v) with c of type
> MAP<STRING, INT>, once a rule pushes a non-identity projection above the TFS
> (e.g. Calcite's stock ProjectCorrelateTransposeRule together with the
> dispatch fix in CALCITE-7511), the wrapper Project(k=$0, v=$1) is pruned to
> Project(VALUE=$1). The codegen then reads position 0 of the TFS (= K) instead
> of position 1 (= V):
> {code:java}
> Expected: Actual:
> +I[1, 11, 10] +I[1, 11, a]
> +I[1, 11, 11] +I[1, 11, b]
> +I[2, 22, 20] +I[2, 22, c]
> +I[3, 33, 30] +I[3, 33, d]
> +I[3, 33, 31] +I[3, 33, e]
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)