Natea Eshetu Beshada created FLINK-39411:
--------------------------------------------

             Summary: PTF code generation uses wrong type for table arguments — 
METADATA VIRTUAL columns inaccessible
                 Key: FLINK-39411
                 URL: https://issues.apache.org/jira/browse/FLINK-39411
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.2.0, 2.1.0, 2.0.0
            Reporter: Natea Eshetu Beshada


When a table with METADATA VIRTUAL columns (e.g., Kafka headers MAP<STRING, 
STRING> METADATA VIRTUAL) is passed as a PTF table argument, accessing the 
metadata column by name fails at runtime:

Unknown field name 'headers' for mapping to a position.

 

Root Cause

ProcessTableRunnerGenerator.generateEvalOperands() (line 350) uses 
call.getType() — the PTF's return type — instead of tableArgCall.getType() — 
the table argument's type — when generating code for RexTableArgCall operands.

{{  // Bug:}}
{{  val tableType = FlinkTypeFactory.toLogicalType(call.getType).copy(true)}}

{{  // Fix:}}
{{  val tableType = 
FlinkTypeFactory.toLogicalType(tableArgCall.getType).copy(true)}}

The tableType is used for the GeneratedExpression result type, which flows into 
RowRowConverter field mapping. Using the PTF output type instead of the table 
input type causes field name/count mismatches at runtime.

 

The upstream pipeline is correct:
  - CatalogSchemaTable.getRowType() correctly includes metadata columns 
(required by Calcite contract)
  - DynamicSourceUtils.convertSourceToRel() correctly adds 
pushMetadataProjection() for the table scan input
  - FlinkConvertletTable.convertTableArgs() correctly stores the full table 
type (including metadata) in RexTableArgCall
  - PTF extraction stores ROW<> placeholder for TABLE arguments; actual schema 
is resolved at planning time

 

  Steps to Reproduce

{{  -- Table with METADATA VIRTUAL column (e.g., Kafka-backed table with 
headers)}}
{{  DESCRIBE EXTENDED my_table;}}
{{  -- shows: headers | MAP<STRING, STRING> | NULL | METADATA VIRTUAL}}

{{  -- PTF that accesses 'headers' from input row}}
{{  SELECT * FROM my_ptf(}}
{{    input => TABLE my_table PARTITION BY key}}
{{  );}}
{{  -- Error: Unknown field name 'headers' for mapping to a position.}}

{{  -- Compare with regular query (works fine):}}
{{  SELECT headers FROM my_table;}}
{{  -- Succeeds — DynamicSourceUtils adds metadata projection}}

 

Workaround

Materialize metadata into a physical column via subquery:

{{  SELECT * FROM my_ptf(}}
{{    input => TABLE (SELECT *, headers FROM my_table) PARTITION BY key}}
{{  )}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to