Vishal Kamlapure created FLINK-38871:
----------------------------------------
Summary: PyFlink Planner incorrectly propagates constants into
downstream UDF inputs when filtering on materialized ROW fields
Key: FLINK-38871
URL: https://issues.apache.org/jira/browse/FLINK-38871
Project: Flink
Issue Type: Bug
Components: API / Python, Table SQL / Planner
Affects Versions: 2.2.0, 1.19.1
Environment: *Python :-* 3.11.14
*Flink :-* 1.19.1 & 2.2.0
*Java :-* 11
*Tested On:-* MacOs (Locally) & Yarn Flink Cluster (1.19.1 - EMR)
Reporter: Vishal Kamlapure
Attachments: Screenshot 2026-01-08 at 2.50.40 AM.png
I have identified a correctness/performance bug in the Flink Table Planner
regarding the optimization of {{ROW}} fields returned by Python UDFs.
When a Python UDF returns a {{ROW}} type and a field from that {{ROW}} is
materialized into a top-level column (using {{{}add_or_replace_columns{}}}) and
subsequently filtered, the planner aggressively applies {*}Constant
Propagation{*}.
If this column is used as input for a _second_ downstream Python UDF, the
planner replaces the actual column reference with the constant literal derived
from the filter (e.g., rewriting {{amountId}} to {{{}'0'{}}}). This causes the
downstream UDF to execute on rows that {*}do not satisfy the filter
predicate{*}, receiving the constant value instead of the actual data.
This behavior breaks filter semantics and corrupts intermediate data observed
by UDFs. Notably, this issue *does not occur* when the upstream UDF returns a
{{MAP}} type, suggesting the issue is specific to {{ROW}} field optimization
rules.
h3. Reproduction Script (PyFlink)
The following script reproduces the issue. It generates a batch with
{{amountId}} values {{"0"}} and {{{}"1"{}}}. We filter for {{{}"0"{}}}, but the
downstream UDF prints {{"0"}} for *all* rows, proving that the value {{"1"}}
was overwritten by the planner.
{code:python}
from pyflink.table import EnvironmentSettings, TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import col
import pandas as pd
def reproducer():
env_settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(env_settings)
# 1. Source Data: Row 'B' has amountId="1" (Should be filtered out)
t = t_env.from_elements(
[
("A", "0"),
("B", "1"),
("C", "0"),
],
DataTypes.ROW([
DataTypes.FIELD("transactionText", DataTypes.STRING()),
DataTypes.FIELD("amountId", DataTypes.STRING())
])
)
# 2. First UDF: Returns a ROW
@udf(result_type=DataTypes.ROW([
DataTypes.FIELD("out_text", DataTypes.STRING()),
DataTypes.FIELD("out_amountId", DataTypes.STRING())
]), func_type="pandas")
def validate_udf(text: pd.Series, amountId: pd.Series) -> pd.DataFrame:
return pd.DataFrame({
"out_text": text,
"out_amountId": amountId
})
# 3. Second UDF: Inspects the value it receives
@udf(result_type=DataTypes.ROW([
DataTypes.FIELD("seen_amountId", DataTypes.STRING())
]), func_type="pandas")
def second_udf(amountId: pd.Series) -> pd.DataFrame:
# DEBUG: Print exactly what the UDF sees
print("\n[second_udf] received batch:", amountId.tolist())
return pd.DataFrame({
"seen_amountId": amountId
})
# 4. Pipeline Construction
validated = t.add_columns(validate_udf(t.transactionText,
t.amountId).alias("v"))
# Materialize ROW fields to top-level columns
materialized = validated.add_or_replace_columns(
col("v").out_text.alias("transactionText"),
col("v").out_amountId.alias("amountId")
)
# Filter: We only want amountId == "0"
filtered = materialized.filter(col("amountId") == "0")
# Apply downstream UDF on the filtered data
final = filtered.add_columns(
second_udf(filtered.amountId).alias("s")
).select(
col("transactionText"),
col("amountId"),
col("s").seen_amountId
)
print("=== PLAN ===")
print(final.explain())
print("=== EXECUTION ===")
final.execute().print()
if __name__ == '__main__':
reproducer()
{code}
h3. Observed Behavior
The logs from {{second_udf}} show that it receives the value {{'0'}} for the
second row, even though that row actually contains {{{}'1'{}}}.
[second_udf] received batch: ['0', '0', '0']
_(Note: The batch size is 3, meaning the row with ID '1' was not dropped, but
its value was overwritten to '0'.)_
The execution plan reveals that the planner created a {{Calc}} node that
forcefully casts the column to a constant before calling the Python UDF:
Plaintext
Calc(select=[..., CAST('0' AS VARCHAR) AS amountId, ...])
h3. Expected Behavior
# The downstream UDF should only process rows that satisfy the filter (or if
execution pipelining allows processing, it must see the *original* values).
# The planner should not rewrite input columns to constants based on
downstream filters if those columns are inputs to Python UDFs.
h3. Analysis & Workaround
The issue appears to be an unsafe application of constant folding on {{ROW}}
fields materialized from {{PythonCalc}} outputs.
* *ROW Type (Buggy):* The planner views the {{ROW}} fields as transparent and
applies constant propagation ({{{}amountId = '0'{}}}) _before_ the filter is
physically enforced, and passes this constant to the next {{{}PythonCalc{}}}.
* *MAP Type (Workaround):* Changing {{validate_udf}} to return {{MAP<STRING,
STRING>}} fixes the issue. The planner treats the map access {{ITEM(map,
'key')}} as opaque, preventing constant propagation and forcing the correct
execution order (UDF -> Filter -> UDF).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)