[ 
https://issues.apache.org/jira/browse/FLINK-39735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083026#comment-18083026
 ] 

Jubin Soni commented on FLINK-39735:
------------------------------------

For the approach, I’ll expose upsert keys end-to-end for PTFs by adding 
{{TableSemantics#upsertKeyColumns()}} with a default empty array for 
compatibility. I’ll wire it through {{{}RuntimeTableSemantics{}}}, derive it in 
the planner via {{{}FlinkRelMetadataQuery.getUpsertKeys(){}}}, and collapse per 
input using {{{}UpsertKeyUtil.smallestKey(...){}}}.

I’ll pass it through {{StreamPhysicalProcessTableFunction}} and 
{{StreamExecProcessTableFunction}} (with JSON back-compat), and extend the 
specialization/type-inference path so it’s available at construction time 
(before metadata is available, it returns {{{}new int[0]{}}}).

Finally, I’ll update codegen and {{TO_CHANGELOG}} to consume it, with fallback 
to existing {{PARTITION BY}} behavior when the upsert key is absent.

Can someone assign this ticket to me?

> Expose input upsert key on TableSemantics for ProcessTableFunctions
> -------------------------------------------------------------------
>
>                 Key: FLINK-39735
>                 URL: https://issues.apache.org/jira/browse/FLINK-39735
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Ramin Gharib
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Problem
> {{ProcessTableFunction}}s receive a {{TableSemantics}} for each table-typed 
> argument. Semantics surface today exposes:
>  * dataType()
>  * {{partitionByColumns()}} – only populated when caller wrote {{PARTITION 
> BY}} in SQL.
>  * {{orderByColumns()}}
>  * timeColumn()
>  * {{changelogMode()}} – planner-derived, lifecycle-aware (empty during type 
> inference).
> Does *not* expose input table's upsert key. Planner already derives it via 
> {{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
> {{{}FlinkRelMdUpsertKeys{}}}), but result is invisible to PTF.
> Forces PTF authors to either:
>  * Require caller to repeat key via {{PARTITION BY}} even when planner 
> already knows it from PK constraints, or
>  * Re-derive upsert keys inside function (impossible – at constructor time, 
> function has only {{{}TableSemantics{}}}, not a {{RelNode}} or 
> {{{}RelMetadataQuery{}}}).
> Concrete impact for {{TO_CHANGELOG}} (FLINK-39636): without access to input 
> upsert key, function cannot emit partial DELETE rows that preserve identity 
> columns in row semantics. Current workaround is "add {{{}PARTITION BY 
> <pk>{}}}", unergonomic for users whose input table already declares a primary 
> key.
> h2. Proposal
> Add {{int[] upsertKeyColumns()}} to {{{}TableSemantics{}}}, populated by 
> planner via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one 
> candidate via {{{}UpsertKeyUtil.smallestKey(...){}}}. Returns empty array 
> when no upsert key is derivable (pure append-only sources, post-Sort streams 
> that destroyed the key, etc.) or during type inference (metadata not yet 
> computed).
> Plumbing:
>  # *{{TableSemantics}}* ({{{}flink-table-common{}}}): add a default 
> {{upsertKeyColumns()}} method returning an empty array. Default avoids 
> breaking source-compatibility for rare external implementor.
>  # *{{RuntimeTableSemantics}}* ({{{}flink-table-runtime{}}}): add 
> serializable {{int[] upsertKeyColumns}} field, constructor parameter, 
> accessor.
>  # *{{StreamExecProcessTableFunction}}* ({{{}flink-table-planner{}}}): 
> persist {{List<int[]> inputUpsertKeys}} as a new {{@JsonProperty}} field (one 
> entry per table input). Default to per-input empty arrays for older compiled 
> plans (back-compat).
>  # {*}{{StreamPhysicalProcessTableFunction.translateToExecNode}}{*}: derive 
> upsert keys for each input via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} 
> + {{{}UpsertKeyUtil.smallestKey(...).orElse(...){}}}. Pass list to new 
> ExecNode field.
>  # *{{OperatorBindingCallContext}}* / 
> {*}{{OperatorBindingTableSemantics}}{*}: extend to accept {{inputUpsertKeys}} 
> so value is visible at specialization time (function constructor sees 
> operator-binding context, not runtime context).
>  # *{{BridgingSqlFunction.toCallContext}}* / 
> {*}{{ProcessTableRunnerGenerator.generate}}{*}: thread {{inputUpsertKeys}} 
> through codegen path.
>  # *{{TableSemanticsMock}}* and {*}{{TestHarnessTableSemantics}}{*}: accept 
> optional setter for unit testing.
>  # *{{TO_CHANGELOG}} runtime* ({{{}ToChangelogFunction{}}}): consume 
> {{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
> DELETE when {{{}produces_full_deletes=false{}}}. Solves row-semantics case 
> from FLINK-39636 without requiring {{{}PARTITION BY{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to