[ 
https://issues.apache.org/jira/browse/FLINK-39735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramin Gharib updated FLINK-39735:
---------------------------------
    Description: 
h2. Problem

{\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed 
argument. Semantics surface today exposes:

* \{{dataType()}}
* \{{partitionByColumns()}} -- only populated when caller wrote \{{PARTITION 
BY}} in SQL.
* \{{orderByColumns()}} / \{{orderByDirections()}}
* \{{timeColumn()}}
* \{{changelogMode()}} -- planner-derived, lifecycle-aware (empty during type 
inference).

Does *not* expose input table's upsert key. Planner already derives it via 
\{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
\{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.

Forces PTF authors to either:

* Require caller to repeat key via \{{PARTITION BY}} even when planner already 
knows it from PK constraints, or
* Re-derive upsert keys inside function (impossible -- at constructor time, 
function has only \{{TableSemantics}}, not a \{{RelNode}} or 
\{{RelMetadataQuery}}).

Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input 
upsert key, function cannot emit partial DELETE rows that preserve identity 
columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}", 
unergonomic for users whose input table already declares a primary key.

h2. Proposal

Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by planner 
via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one candidate 
via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when no upsert key 
is derivable (pure append-only sources, post-Sort streams that destroyed the 
key, etc.) or during type inference (metadata not yet computed).

Plumbing:

# *\{{TableSemantics}}* (\{{flink-table-common}}): add a default 
\{{upsertKeyColumns()}} method returning an empty array. Default avoids 
breaking source-compatibility for rare external implementor.
# *\{{RuntimeTableSemantics}}* (\{{flink-table-runtime}}): add serializable 
\{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
# *\{{StreamExecProcessTableFunction}}* (\{{flink-table-planner}}): persist 
\{{List<int[]> inputUpsertKeys}} as a new \{{@JsonProperty}} field (one entry 
per table input). Default to per-input empty arrays for older compiled plans 
(back-compat).
# *\{{StreamPhysicalProcessTableFunction.translateToExecNode}}*: derive upsert 
keys for each input via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} + 
\{{UpsertKeyUtil.smallestKey(...).orElse(...)}}. Pass list to new ExecNode 
field.
# *\{{OperatorBindingCallContext}}* / *\{{OperatorBindingTableSemantics}}*: 
extend to accept \{{inputUpsertKeys}} so value is visible at specialization 
time (function constructor sees operator-binding context, not runtime context).
# *\{{BridgingSqlFunction.toCallContext}}* / 
*\{{ProcessTableRunnerGenerator.generate}}*: thread \{{inputUpsertKeys}} 
through codegen path.
# *\{{TableSemanticsMock}}* and *\{{TestHarnessTableSemantics}}*: accept 
optional setter for unit testing.
# *\{{TO_CHANGELOG}} runtime* (\{{ToChangelogFunction}}): consume 
\{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from 
FLINK-39636 without requiring \{{PARTITION BY}}.

  was:
## Problem

`ProcessTableFunction`s receive a `TableSemantics` for each table-typed 
argument. Semantics surface today exposes:

- `dataType()`
- `partitionByColumns()` — only populated when caller wrote `PARTITION BY` in 
SQL.
- `orderByColumns()` / `orderByDirections()`
- `timeColumn()`
- `changelogMode()` — planner-derived, lifecycle-aware (empty during type 
inference).

Does **not** expose input table's upsert key. Planner already derives it via 
`FlinkRelMetadataQuery.getUpsertKeys(input)` (metadata handler chain in 
`FlinkRelMdUpsertKeys`), but result is invisible to PTF.

Forces PTF authors to either:

- Require caller to repeat key via `PARTITION BY` even when planner already 
knows it from PK constraints, or
- Re-derive upsert keys inside function (impossible — at constructor time, 
function has only `TableSemantics`, not a `RelNode` or `RelMetadataQuery`).

Concrete impact for `TO_CHANGELOG` (FLINK-39636): without access to input 
upsert key, function cannot emit partial DELETE rows that preserve identity 
columns in row semantics. Current workaround is "add `PARTITION BY <pk>`", 
unergonomic for users whose input table already declares a primary key.

## Proposal

Add `int[] upsertKeyColumns()` to `TableSemantics`, populated by planner via 
`FlinkRelMetadataQuery.getUpsertKeys(input)` collapsed to one candidate via 
`UpsertKeyUtil.smallestKey(...)`. Returns empty array when no upsert key is 
derivable (pure append-only sources, post-Sort streams that destroyed the key, 
etc.) or during type inference (metadata not yet computed).

Plumbing:

1. **`TableSemantics`** (`flink-table-common`): add `default int[] 
upsertKeyColumns()` method returning empty array. Default avoids breaking 
source-compatibility for rare external implementor.
2. **`RuntimeTableSemantics`** (`flink-table-runtime`): add serializable `int[] 
upsertKeyColumns` field, constructor parameter, accessor.
3. **`StreamExecProcessTableFunction`** (`flink-table-planner`): persist 
`List<int[]> inputUpsertKeys` as new `@JsonProperty` field (one entry per table 
input). Default to per-input empty arrays for older compiled plans 
(back-compat).
4. **`StreamPhysicalProcessTableFunction.translateToExecNode`**: derive upsert 
keys for each input via `FlinkRelMetadataQuery.getUpsertKeys(input)` + 
`UpsertKeyUtil.smallestKey(...).orElse(new int[0])`. Pass list to new ExecNode 
field.
5. **`OperatorBindingCallContext`** / **`OperatorBindingTableSemantics`**: 
extend to accept `inputUpsertKeys` so value is visible at specialization time 
(function constructor sees operator-binding context, not runtime context).
6. **`BridgingSqlFunction.toCallContext`** / 
**`ProcessTableRunnerGenerator.generate`**: thread `inputUpsertKeys` through 
codegen path.
7. **`TableSemanticsMock`** and **`TestHarnessTableSemantics`**: accept 
optional setter for unit testing.
8. **`TO_CHANGELOG` runtime** (`ToChangelogFunction`): consume 
`tableSemantics.upsertKeyColumns()` to decide which columns to preserve on 
DELETE when `produces_full_deletes=false`. Solves row-semantics case from 
FLINK-39636 without requiring `PARTITION BY`.


> Expose input upsert key on TableSemantics for ProcessTableFunctions
> -------------------------------------------------------------------
>
>                 Key: FLINK-39735
>                 URL: https://issues.apache.org/jira/browse/FLINK-39735
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Ramin Gharib
>            Priority: Major
>
> h2. Problem
> {\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed 
> argument. Semantics surface today exposes:
> * \{{dataType()}}
> * \{{partitionByColumns()}} -- only populated when caller wrote \{{PARTITION 
> BY}} in SQL.
> * \{{orderByColumns()}} / \{{orderByDirections()}}
> * \{{timeColumn()}}
> * \{{changelogMode()}} -- planner-derived, lifecycle-aware (empty during type 
> inference).
> Does *not* expose input table's upsert key. Planner already derives it via 
> \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in 
> \{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.
> Forces PTF authors to either:
> * Require caller to repeat key via \{{PARTITION BY}} even when planner 
> already knows it from PK constraints, or
> * Re-derive upsert keys inside function (impossible -- at constructor time, 
> function has only \{{TableSemantics}}, not a \{{RelNode}} or 
> \{{RelMetadataQuery}}).
> Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input 
> upsert key, function cannot emit partial DELETE rows that preserve identity 
> columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}", 
> unergonomic for users whose input table already declares a primary key.
> h2. Proposal
> Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by 
> planner via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one 
> candidate via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when 
> no upsert key is derivable (pure append-only sources, post-Sort streams that 
> destroyed the key, etc.) or during type inference (metadata not yet computed).
> Plumbing:
> # *\{{TableSemantics}}* (\{{flink-table-common}}): add a default 
> \{{upsertKeyColumns()}} method returning an empty array. Default avoids 
> breaking source-compatibility for rare external implementor.
> # *\{{RuntimeTableSemantics}}* (\{{flink-table-runtime}}): add serializable 
> \{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
> # *\{{StreamExecProcessTableFunction}}* (\{{flink-table-planner}}): persist 
> \{{List<int[]> inputUpsertKeys}} as a new \{{@JsonProperty}} field (one entry 
> per table input). Default to per-input empty arrays for older compiled plans 
> (back-compat).
> # *\{{StreamPhysicalProcessTableFunction.translateToExecNode}}*: derive 
> upsert keys for each input via 
> \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} + 
> \{{UpsertKeyUtil.smallestKey(...).orElse(...)}}. Pass list to new ExecNode 
> field.
> # *\{{OperatorBindingCallContext}}* / *\{{OperatorBindingTableSemantics}}*: 
> extend to accept \{{inputUpsertKeys}} so value is visible at specialization 
> time (function constructor sees operator-binding context, not runtime 
> context).
> # *\{{BridgingSqlFunction.toCallContext}}* / 
> *\{{ProcessTableRunnerGenerator.generate}}*: thread \{{inputUpsertKeys}} 
> through codegen path.
> # *\{{TableSemanticsMock}}* and *\{{TestHarnessTableSemantics}}*: accept 
> optional setter for unit testing.
> # *\{{TO_CHANGELOG}} runtime* (\{{ToChangelogFunction}}): consume 
> \{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on 
> DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from 
> FLINK-39636 without requiring \{{PARTITION BY}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to