Natea Eshetu Beshada created FLINK-39411:
--------------------------------------------
Summary: PTF code generation uses wrong type for table arguments —
METADATA VIRTUAL columns inaccessible
Key: FLINK-39411
URL: https://issues.apache.org/jira/browse/FLINK-39411
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.2.0, 2.1.0, 2.0.0
Reporter: Natea Eshetu Beshada
When a table with METADATA VIRTUAL columns (e.g., Kafka headers MAP<STRING,
STRING> METADATA VIRTUAL) is passed as a PTF table argument, accessing the
metadata column by name fails at runtime:
Unknown field name 'headers' for mapping to a position.
Root Cause
ProcessTableRunnerGenerator.generateEvalOperands() (line 350) uses
call.getType() — the PTF's return type — instead of tableArgCall.getType() —
the table argument's type — when generating code for RexTableArgCall operands.
{{ // Bug:}}
{{ val tableType = FlinkTypeFactory.toLogicalType(call.getType).copy(true)}}
{{ // Fix:}}
{{ val tableType =
FlinkTypeFactory.toLogicalType(tableArgCall.getType).copy(true)}}
The tableType is used for the GeneratedExpression result type, which flows into
RowRowConverter field mapping. Using the PTF output type instead of the table
input type causes field name/count mismatches at runtime.
The upstream pipeline is correct:
- CatalogSchemaTable.getRowType() correctly includes metadata columns
(required by Calcite contract)
- DynamicSourceUtils.convertSourceToRel() correctly adds
pushMetadataProjection() for the table scan input
- FlinkConvertletTable.convertTableArgs() correctly stores the full table
type (including metadata) in RexTableArgCall
- PTF extraction stores ROW<> placeholder for TABLE arguments; actual schema
is resolved at planning time
Steps to Reproduce
{{ -- Table with METADATA VIRTUAL column (e.g., Kafka-backed table with
headers)}}
{{ DESCRIBE EXTENDED my_table;}}
{{ -- shows: headers | MAP<STRING, STRING> | NULL | METADATA VIRTUAL}}
{{ -- PTF that accesses 'headers' from input row}}
{{ SELECT * FROM my_ptf(}}
{{ input => TABLE my_table PARTITION BY key}}
{{ );}}
{{ -- Error: Unknown field name 'headers' for mapping to a position.}}
{{ -- Compare with regular query (works fine):}}
{{ SELECT headers FROM my_table;}}
{{ -- Succeeds — DynamicSourceUtils adds metadata projection}}
Workaround
Materialize metadata into a physical column via subquery:
{{ SELECT * FROM my_ptf(}}
{{ input => TABLE (SELECT *, headers FROM my_table) PARTITION BY key}}
{{ )}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)