[ 
https://issues.apache.org/jira/browse/FLINK-39735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39735:
-----------------------------------
    Labels: pull-request-available  (was: )

> Expose input upsert key on TableSemantics for ProcessTableFunctions
> -------------------------------------------------------------------
>
>                 Key: FLINK-39735
>                 URL: https://issues.apache.org/jira/browse/FLINK-39735
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Ramin Gharib
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Problem
> {{ProcessTableFunction}}s receive a {{TableSemantics}} for each table-typed 
> argument. Semantics surface today exposes:
>  * dataType()
>  * {{partitionByColumns()}} – only populated when caller wrote {{PARTITION 
> BY}} in SQL.
>  * {{orderByColumns()}}
>  * timeColumn()
>  * {{changelogMode()}} – planner-derived, lifecycle-aware (empty during type 
> inference).
> Does *not* expose input table's upsert key. Planner already derives it via 
> {{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
> {{{}FlinkRelMdUpsertKeys{}}}), but result is invisible to PTF.
> Forces PTF authors to either:
>  * Require caller to repeat key via {{PARTITION BY}} even when planner 
> already knows it from PK constraints, or
>  * Re-derive upsert keys inside function (impossible – at constructor time, 
> function has only {{{}TableSemantics{}}}, not a {{RelNode}} or 
> {{{}RelMetadataQuery{}}}).
> Concrete impact for {{TO_CHANGELOG}} (FLINK-39636): without access to input 
> upsert key, function cannot emit partial DELETE rows that preserve identity 
> columns in row semantics. Current workaround is "add {{{}PARTITION BY 
> <pk>{}}}", unergonomic for users whose input table already declares a primary 
> key.
> h2. Proposal
> Add {{int[] upsertKeyColumns()}} to {{{}TableSemantics{}}}, populated by 
> planner via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one 
> candidate via {{{}UpsertKeyUtil.smallestKey(...){}}}. Returns empty array 
> when no upsert key is derivable (pure append-only sources, post-Sort streams 
> that destroyed the key, etc.) or during type inference (metadata not yet 
> computed).
> Plumbing:
>  # *{{TableSemantics}}* ({{{}flink-table-common{}}}): add a default 
> {{upsertKeyColumns()}} method returning an empty array. Default avoids 
> breaking source-compatibility for rare external implementor.
>  # *{{RuntimeTableSemantics}}* ({{{}flink-table-runtime{}}}): add 
> serializable {{int[] upsertKeyColumns}} field, constructor parameter, 
> accessor.
>  # *{{StreamExecProcessTableFunction}}* ({{{}flink-table-planner{}}}): 
> persist {{List<int[]> inputUpsertKeys}} as a new {{@JsonProperty}} field (one 
> entry per table input). Default to per-input empty arrays for older compiled 
> plans (back-compat).
>  # {*}{{StreamPhysicalProcessTableFunction.translateToExecNode}}{*}: derive 
> upsert keys for each input via {{FlinkRelMetadataQuery.getUpsertKeys(input)}} 
> + {{{}UpsertKeyUtil.smallestKey(...).orElse(...){}}}. Pass list to new 
> ExecNode field.
>  # *{{OperatorBindingCallContext}}* / 
> {*}{{OperatorBindingTableSemantics}}{*}: extend to accept {{inputUpsertKeys}} 
> so value is visible at specialization time (function constructor sees 
> operator-binding context, not runtime context).
>  # *{{BridgingSqlFunction.toCallContext}}* / 
> {*}{{ProcessTableRunnerGenerator.generate}}{*}: thread {{inputUpsertKeys}} 
> through codegen path.
>  # *{{TableSemanticsMock}}* and {*}{{TestHarnessTableSemantics}}{*}: accept 
> optional setter for unit testing.
>  # *{{TO_CHANGELOG}} runtime* ({{{}ToChangelogFunction{}}}): consume 
> {{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
> DELETE when {{{}produces_full_deletes=false{}}}. Solves row-semantics case 
> from FLINK-39636 without requiring {{{}PARTITION BY{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to