[
https://issues.apache.org/jira/browse/FLINK-38871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18056162#comment-18056162
]
Natea Eshetu Beshada commented on FLINK-38871:
----------------------------------------------
Hi [~vishal24] , are you still working on this? If not, I'm interested in
taking a stab
> PyFlink Planner incorrectly propagates constants into downstream UDF inputs
> when filtering on materialized ROW fields
> ---------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38871
> URL: https://issues.apache.org/jira/browse/FLINK-38871
> Project: Flink
> Issue Type: Bug
> Components: API / Python, Table SQL / Planner
> Affects Versions: 1.19.1, 2.2.0
> Environment: *Python :-* 3.11.14
> *PyFlink :-* 1.19.1 & 2.2.0
> *Java :-* 11
> Reporter: Vishal Kamlapure
> Priority: Major
> Attachments: Screenshot 2026-01-08 at 2.50.40 AM.png
>
>
> I have identified a correctness/performance bug in the Flink Table Planner
> regarding the optimization of {{ROW}} fields returned by Python UDFs.
> When a Python UDF returns a {{ROW}} type and a field from that {{ROW}} is
> materialized into a top-level column (using {{{}add_or_replace_columns{}}})
> and subsequently filtered, the planner aggressively applies {*}Constant
> Propagation{*}.
> If this column is used as input for a _second_ downstream Python UDF, the
> planner replaces the actual column reference with the constant literal
> derived from the filter (e.g., rewriting {{amountId}} to {{{}'0'{}}}). This
> causes the downstream UDF to execute on rows that {*}do not satisfy the
> filter predicate{*}, receiving the constant value instead of the actual data.
> This behavior breaks filter semantics and corrupts intermediate data observed
> by UDFs. Notably, this issue *does not occur* when the upstream UDF returns a
> {{MAP}} type, suggesting the issue is specific to {{ROW}} field optimization
> rules.
> h3. Reproduction Script (PyFlink)
> The following script reproduces the issue. It generates a batch with
> {{amountId}} values {{"0"}} and {{{}"1"{}}}. We filter for {{{}"0"{}}}, but
> the downstream UDF prints {{"0"}} for *all* rows, proving that the value
> {{"1"}} was overwritten by the planner.
> {code:python}
> from pyflink.table import (
> EnvironmentSettings,
> TableEnvironment,
> DataTypes
> )
> from pyflink.table.udf import udf
> from pyflink.table.expressions import col
> import pandas as pd
> #
> ------------------------------------------------------------------------------
> # 1. Table environment
> #
> ------------------------------------------------------------------------------
> env_settings = EnvironmentSettings.in_batch_mode()
> t_env = TableEnvironment.create(env_settings)
> #
> ------------------------------------------------------------------------------
> # 2. Input table (NO connector, pure Python)
> #
> ------------------------------------------------------------------------------
> t = t_env.from_elements(
> [
> ("A", "0"),
> ("B", "1"), <-- should be filtered out
> ("C", "0"),
> ],
> DataTypes.ROW([
> DataTypes.FIELD("Text", DataTypes.STRING()),
> DataTypes.FIELD("amount", DataTypes.STRING())
> ])
> )
> #
> ------------------------------------------------------------------------------
> # 3. FIRST Python UDF (returns ROW)
> #
> ------------------------------------------------------------------------------
> @udf(
> result_type=DataTypes.ROW([
> DataTypes.FIELD("out_text", DataTypes.STRING()),
> DataTypes.FIELD("out_amount", DataTypes.STRING())
> ]),
> func_type="pandas"
> )
> def validate_udf(text: pd.Series, amount: pd.Series) -> pd.DataFrame:
> return pd.DataFrame({
> "out_text": text,
> "out_amount": amount
> })
> #
> ------------------------------------------------------------------------------
> # 4. SECOND Python UDF (just echoes input, used to show leak)
> #
> ------------------------------------------------------------------------------
> @udf(
> result_type=DataTypes.ROW([
> DataTypes.FIELD("seen_amount", DataTypes.STRING())
> ]),
> func_type="pandas"
> )
> def second_udf(amount: pd.Series) -> pd.DataFrame:
> print("\n[second_udf] received values:")
> print(amount.tolist())
> return pd.DataFrame({
> "seen_amount": amount
> })
> #
> ------------------------------------------------------------------------------
> # 5. Pipeline that TRIGGERS the bug
> #
> ------------------------------------------------------------------------------
> # Step 1: apply first Python UDF (ROW output)
> validated = t.add_columns(
> validate_udf(t.Text, t.amount).alias("v")
> )
> # Step 2: materialize ROW fields back into base columns <<< CRITICAL
> materialized = validated.add_or_replace_columns(
> col("v").out_text.alias("Text"),
> col("v").out_amount.alias("amount")
> )
> # Step 3: filter (should keep only amount == "0")
> filtered = materialized.filter(col("amount") == "0")
> # Step 4: pass filtered column into SECOND Python UDF
> final = filtered.add_columns(
> second_udf(filtered.amount).alias("s")
> ).select(
> col("Text"),
> col("amount"),
> col("s").seen_amount
> )
> #
> ------------------------------------------------------------------------------
> # 6. Observe execution plan and output
> #
> ------------------------------------------------------------------------------
> print("\n===== EXECUTION PLAN =====")
> print(final.explain())
> print("\n===== FINAL OUTPUT =====")
> final.execute().print()
> {code}
> h3. Observed Behavior
> The logs from {{second_udf}} show that it receives the value {{'0'}} for the
> second row, even though that row actually contains {{{}'1'{}}}.
>
> [second_udf] received batch: ['0', '0', '0']
> _(Note: The batch size is 3, meaning the row with ID '1' was not dropped, but
> its value was overwritten to '0'.)_
> The execution plan reveals that the planner created a {{Calc}} node that
> forcefully casts the column to a constant before calling the Python UDF:
> Plaintext
>
> Calc(select=[..., CAST('0' AS VARCHAR) AS amountId, ...])
> h3. Expected Behavior
> # The downstream UDF should only process rows that satisfy the filter (or if
> execution pipelining allows processing, it must see the *original* values).
> # The planner should not rewrite input columns to constants based on
> downstream filters if those columns are inputs to Python UDFs.
> h3. Analysis & Workaround
> The issue appears to be an unsafe application of constant folding on {{ROW}}
> fields materialized from {{PythonCalc}} outputs.
> * *ROW Type (Buggy):* The planner views the {{ROW}} fields as transparent
> and applies constant propagation ({{{}amountId = '0'{}}}) _before_ the filter
> is physically enforced, and passes this constant to the next
> {{{}PythonCalc{}}}.
> * *MAP Type (Workaround):* Changing {{validate_udf}} to return {{MAP<STRING,
> STRING>}} fixes the issue. The planner treats the map access {{ITEM(map,
> 'key')}} as opaque, preventing constant propagation and forcing the correct
> execution order (UDF -> Filter -> UDF).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)