Ramin Gharib created FLINK-39735:
------------------------------------

             Summary: Expose input upsert key on TableSemantics for 
ProcessTableFunctions
                 Key: FLINK-39735
                 URL: https://issues.apache.org/jira/browse/FLINK-39735
             Project: Flink
          Issue Type: Sub-task
          Components: Table SQL / API
            Reporter: Ramin Gharib


{code:title=Jira wiki markup}
h3. Problem

{\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed 
argument. Semantics surface today exposes:

{\{dataType()}}
{\{partitionByColumns()}} — only populated when caller wrote \{{PARTITION BY}} 
in SQL.
{\{orderByColumns()}} / \{{orderByDirections()}}
{\{timeColumn()}}
{\{changelogMode()}} — planner-derived, lifecycle-aware (empty during type 
inference).
Does not expose input table's upsert key. Planner already derives it via 
\{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
\{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.

Forces PTF authors to either:

Require caller to repeat key via \{{PARTITION BY}} even when planner already 
knows it from PK constraints, or
Re-derive upsert keys inside function (impossible — at constructor time, 
function has only \{{TableSemantics}}, not a \{{RelNode}} or 
\{{RelMetadataQuery}}).
Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input 
upsert key, function cannot emit partial DELETE rows that preserve identity 
columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}", 
unergonomic for users whose input table already declares a primary key.

h3. Proposal

Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by planner 
via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one candidate 
via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when no upsert key 
is derivable (pure append-only sources, post-Sort streams that destroyed the 
key, etc.) or during type inference (metadata not yet computed).

Plumbing:

{\{TableSemantics}} (\{{flink-table-common}}): add \{{default int[] 
upsertKeyColumns() { return new int[0]; }}} method. Default avoids breaking 
source-compatibility for rare external implementor.
{\{RuntimeTableSemantics}} (\{{flink-table-runtime}}): add serializable 
\{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
{\{StreamExecProcessTableFunction}} (\{{flink-table-planner}}): persist 
\{{List<int[]> inputUpsertKeys}} as new \{{@JsonProperty}} field (one entry per 
table input). Default to per-input empty arrays for older compiled plans 
(back-compat).
{\{StreamPhysicalProcessTableFunction.translateToExecNode}}: derive upsert keys 
for each input via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} + 
\{{UpsertKeyUtil.smallestKey(...).orElse(new int[0])}}. Pass list to new 
ExecNode field.
{\{OperatorBindingCallContext}} / \{{OperatorBindingTableSemantics}}: extend to 
accept \{{inputUpsertKeys}} so value is visible at specialization time 
(function constructor sees operator-binding context, not runtime context).
{\{BridgingSqlFunction.toCallContext}} / 
\{{ProcessTableRunnerGenerator.generate}}: thread \{{inputUpsertKeys}} through 
codegen path.
{\{TableSemanticsMock}} and \{{TestHarnessTableSemantics}}: accept optional 
setter for unit testing.
{\{TO_CHANGELOG}} runtime (\{{ToChangelogFunction}}): consume 
\{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from 
FLINK-39636 without requiring \{{PARTITION BY}}.
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to