[
https://issues.apache.org/jira/browse/FLINK-39735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ramin Gharib updated FLINK-39735:
---------------------------------
Description:
h3. Problem
{\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed
argument. Semantics surface today exposes:
{\{dataType()}}
{\{partitionByColumns()}} — only populated when caller wrote \{{PARTITION BY}}
in SQL.
{\{orderByColumns()}} / \{{orderByDirections()}}
{\{timeColumn()}}
{\{changelogMode()}} — planner-derived, lifecycle-aware (empty during type
inference).
Does not expose input table's upsert key. Planner already derives it via
\{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in
\{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.
Forces PTF authors to either:
Require caller to repeat key via \{{PARTITION BY}} even when planner already
knows it from PK constraints, or
Re-derive upsert keys inside function (impossible — at constructor time,
function has only \{{TableSemantics}}, not a \{{RelNode}} or
\{{RelMetadataQuery}}).
Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input
upsert key, function cannot emit partial DELETE rows that preserve identity
columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}",
unergonomic for users whose input table already declares a primary key.
h3. Proposal
Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by planner
via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one candidate
via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when no upsert key
is derivable (pure append-only sources, post-Sort streams that destroyed the
key, etc.) or during type inference (metadata not yet computed).
Plumbing:
{\{TableSemantics}} (\{{flink-table-common}}): add \{{default int[]
upsertKeyColumns() { return new int[0]; }}} method. Default avoids breaking
source-compatibility for rare external implementor.
{\{RuntimeTableSemantics}} (\{{flink-table-runtime}}): add serializable
\{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
{\{StreamExecProcessTableFunction}} (\{{flink-table-planner}}): persist
\{{List<int[]> inputUpsertKeys}} as new \{{@JsonProperty}} field (one entry per
table input). Default to per-input empty arrays for older compiled plans
(back-compat).
{\{StreamPhysicalProcessTableFunction.translateToExecNode}}: derive upsert keys
for each input via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} +
\{{UpsertKeyUtil.smallestKey(...).orElse(new int[0])}}. Pass list to new
ExecNode field.
{\{OperatorBindingCallContext}} / \{{OperatorBindingTableSemantics}}: extend to
accept \{{inputUpsertKeys}} so value is visible at specialization time
(function constructor sees operator-binding context, not runtime context).
{\{BridgingSqlFunction.toCallContext}} /
\{{ProcessTableRunnerGenerator.generate}}: thread \{{inputUpsertKeys}} through
codegen path.
{\{TableSemanticsMock}} and \{{TestHarnessTableSemantics}}: accept optional
setter for unit testing.
{\{TO_CHANGELOG}} runtime (\{{ToChangelogFunction}}): consume
\{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on
DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from
FLINK-39636 without requiring \{{PARTITION BY}}.
was:
{code:title=Jira wiki markup}
h3. Problem
{\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed
argument. Semantics surface today exposes:
{\{dataType()}}
{\{partitionByColumns()}} — only populated when caller wrote \{{PARTITION BY}}
in SQL.
{\{orderByColumns()}} / \{{orderByDirections()}}
{\{timeColumn()}}
{\{changelogMode()}} — planner-derived, lifecycle-aware (empty during type
inference).
Does not expose input table's upsert key. Planner already derives it via
\{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in
\{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.
Forces PTF authors to either:
Require caller to repeat key via \{{PARTITION BY}} even when planner already
knows it from PK constraints, or
Re-derive upsert keys inside function (impossible — at constructor time,
function has only \{{TableSemantics}}, not a \{{RelNode}} or
\{{RelMetadataQuery}}).
Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input
upsert key, function cannot emit partial DELETE rows that preserve identity
columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}",
unergonomic for users whose input table already declares a primary key.
h3. Proposal
Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by planner
via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one candidate
via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when no upsert key
is derivable (pure append-only sources, post-Sort streams that destroyed the
key, etc.) or during type inference (metadata not yet computed).
Plumbing:
{\{TableSemantics}} (\{{flink-table-common}}): add \{{default int[]
upsertKeyColumns() { return new int[0]; }}} method. Default avoids breaking
source-compatibility for rare external implementor.
{\{RuntimeTableSemantics}} (\{{flink-table-runtime}}): add serializable
\{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
{\{StreamExecProcessTableFunction}} (\{{flink-table-planner}}): persist
\{{List<int[]> inputUpsertKeys}} as new \{{@JsonProperty}} field (one entry per
table input). Default to per-input empty arrays for older compiled plans
(back-compat).
{\{StreamPhysicalProcessTableFunction.translateToExecNode}}: derive upsert keys
for each input via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} +
\{{UpsertKeyUtil.smallestKey(...).orElse(new int[0])}}. Pass list to new
ExecNode field.
{\{OperatorBindingCallContext}} / \{{OperatorBindingTableSemantics}}: extend to
accept \{{inputUpsertKeys}} so value is visible at specialization time
(function constructor sees operator-binding context, not runtime context).
{\{BridgingSqlFunction.toCallContext}} /
\{{ProcessTableRunnerGenerator.generate}}: thread \{{inputUpsertKeys}} through
codegen path.
{\{TableSemanticsMock}} and \{{TestHarnessTableSemantics}}: accept optional
setter for unit testing.
{\{TO_CHANGELOG}} runtime (\{{ToChangelogFunction}}): consume
\{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on
DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from
FLINK-39636 without requiring \{{PARTITION BY}}.
{code}
> Expose input upsert key on TableSemantics for ProcessTableFunctions
> -------------------------------------------------------------------
>
> Key: FLINK-39735
> URL: https://issues.apache.org/jira/browse/FLINK-39735
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: Ramin Gharib
> Priority: Major
>
> h3. Problem
> {\{ProcessTableFunction}}s receive a \{{TableSemantics}} for each table-typed
> argument. Semantics surface today exposes:
> {\{dataType()}}
> {\{partitionByColumns()}} — only populated when caller wrote \{{PARTITION
> BY}} in SQL.
> {\{orderByColumns()}} / \{{orderByDirections()}}
> {\{timeColumn()}}
> {\{changelogMode()}} — planner-derived, lifecycle-aware (empty during type
> inference).
> Does not expose input table's upsert key. Planner already derives it via
> \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} (metadata handler chain in
> \{{FlinkRelMdUpsertKeys}}), but result is invisible to PTF.
> Forces PTF authors to either:
> Require caller to repeat key via \{{PARTITION BY}} even when planner already
> knows it from PK constraints, or
> Re-derive upsert keys inside function (impossible — at constructor time,
> function has only \{{TableSemantics}}, not a \{{RelNode}} or
> \{{RelMetadataQuery}}).
> Concrete impact for \{{TO_CHANGELOG}} (FLINK-39636): without access to input
> upsert key, function cannot emit partial DELETE rows that preserve identity
> columns in row semantics. Current workaround is "add \{{PARTITION BY <pk>}}",
> unergonomic for users whose input table already declares a primary key.
> h3. Proposal
> Add \{{int[] upsertKeyColumns()}} to \{{TableSemantics}}, populated by
> planner via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} collapsed to one
> candidate via \{{UpsertKeyUtil.smallestKey(...)}}. Returns empty array when
> no upsert key is derivable (pure append-only sources, post-Sort streams that
> destroyed the key, etc.) or during type inference (metadata not yet computed).
> Plumbing:
> {\{TableSemantics}} (\{{flink-table-common}}): add \{{default int[]
> upsertKeyColumns() { return new int[0]; }}} method. Default avoids breaking
> source-compatibility for rare external implementor.
> {\{RuntimeTableSemantics}} (\{{flink-table-runtime}}): add serializable
> \{{int[] upsertKeyColumns}} field, constructor parameter, accessor.
> {\{StreamExecProcessTableFunction}} (\{{flink-table-planner}}): persist
> \{{List<int[]> inputUpsertKeys}} as new \{{@JsonProperty}} field (one entry
> per table input). Default to per-input empty arrays for older compiled plans
> (back-compat).
> {\{StreamPhysicalProcessTableFunction.translateToExecNode}}: derive upsert
> keys for each input via \{{FlinkRelMetadataQuery.getUpsertKeys(input)}} +
> \{{UpsertKeyUtil.smallestKey(...).orElse(new int[0])}}. Pass list to new
> ExecNode field.
> {\{OperatorBindingCallContext}} / \{{OperatorBindingTableSemantics}}: extend
> to accept \{{inputUpsertKeys}} so value is visible at specialization time
> (function constructor sees operator-binding context, not runtime context).
> {\{BridgingSqlFunction.toCallContext}} /
> \{{ProcessTableRunnerGenerator.generate}}: thread \{{inputUpsertKeys}}
> through codegen path.
> {\{TableSemanticsMock}} and \{{TestHarnessTableSemantics}}: accept optional
> setter for unit testing.
> {\{TO_CHANGELOG}} runtime (\{{ToChangelogFunction}}): consume
> \{{tableSemantics.upsertKeyColumns()}} to decide which columns to preserve on
> DELETE when \{{produces_full_deletes=false}}. Solves row-semantics case from
> FLINK-39636 without requiring \{{PARTITION BY}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)