[
https://issues.apache.org/jira/browse/FLINK-39604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Natea Eshetu Beshada updated FLINK-39604:
-----------------------------------------
Description:
DESCRIBE FUNCTION EXTENDED was introduced in FLINK-35822 before Process Table
Functions (PTFs) landed under FLIP-440 (FLINK-36705 and follow-ups). As a
result, none of the metadata that makes a PTF distinctive — and several pieces
of metadata that matter for user-defined aggregates as well — is shown today.
Currently, DescribeFunctionOperation#execute emits, under the EXTENDED
branch, only:
- kind
- requirements
- is deterministic
- supports constant folding
- signature
It calls FunctionDefinition#getTypeInference(...) solely to render the
signature, ignoring everything else on TypeInference and on the
FunctionDefinition itself that is also a class-level fact:
- TypeInference#getStateTypeStrategies() — named state entries with their
type and TTL (from @StateHint(ttl = ...)). Applies to PTFs and to user-defined
AggregateFunction / TableAggregateFunction (where the accumulator surfaces
under DEFAULT_ACCUMULATOR_NAME).
- TypeInference#disableSystemArguments() — whether the framework
auto-injects uid / on_time system arguments into a PTF call.
- definition instanceof ChangelogFunction — whether a PTF may emit +U / -U
/ -D messages.
- Presence of an onTimer method on the function class — whether a PTF
schedules timers via TimeContext.
This makes it hard for users to introspect PTFs and stateful aggregates from
SQL — e.g. to confirm a function carries state, what its TTL is, whether the
function may emit updates, or whether it relies on timers.
h3. Proposed Changes
Append additional rows to the existing (info name, info value) result. The
output schema is unchanged; only new rows are added, and only when the
underlying definition carries that metadata. No new SQL syntax.
For PTFs:
\{noformat}
+---------------------------+---------------------------------------------------------------------+
| info name |
info value |
+---------------------------+---------------------------------------------------------------------+
| ... | ...
|
| signature | my_ptf(input => \{TABLE, SET SEMANTIC TABLE,
OPTIONAL PARTITION BY}) |
| state: state |
type=ROW<`count` BIGINT>, ttl=PT24H |
| accepts system arguments |
true |
| emits updates |
true |
| uses timers |
true |
+---------------------------+---------------------------------------------------------------------+
\{noformat}
For user-defined aggregates (accumulator surfaces via the same state:* row
mechanism):
\{noformat}
| signature | my_agg(value => BIGINT)
|
| state: acc | type=STRUCTURED<'...DescribeFunctionTestAgg$Acc', `count`
BIGINT, `sum` BIGINT>, ttl=PT48H |
\{noformat}
For non-PTF / non-stateful functions (most scalar UDFs, SUM, etc.) the output
is unchanged from today.
h3. Out of Scope
- Per-argument rows ("argument: <name>") — redundant with the signature
row, which already encodes name, type, and traits via f(arg => TYPE \{TRAITS}).
Considered and rejected.
- New SQL syntax (e.g. DESCRIBE FUNCTION ... SHOW STATE) — would require a
FLIP.
- Changes to the result schema — output remains (info name, info value).
- Resolved changelog mode —
ChangelogFunction#getChangelogMode(ChangelogContext) and
ChangelogModeStrategy#inferChangelogMode(...) both require call-time context
(input modes + downstream requirements), so only the instanceof boolean is
exposed here.
- Time / late-record / ordering behavior — all per-call.
h3. Acceptance Criteria
- state:* rows produced for PTFs and for user-defined AggregateFunction /
TableAggregateFunction whose TypeInference exposes state entries.
- "accepts system arguments", "emits updates", "uses timers" rows produced
for PTFs (kind == PROCESS_TABLE).
- No change in output for scalar/aggregate/table functions that don't
expose this metadata.
- .q-style golden test in
flink-sql-client/src/test/resources/sql/function.q covers a PTF (with state +
capability flags) and an aggregate (with typed accumulator + TTL).
h3. PR
[github.com/apache/flink/pull/28114|https://github.com/apache/flink/pull/28114]
was:
DESCRIBE FUNCTION EXTENDED was introduced in FLINK-35822 before Process Table
Functions (PTFs) landed under FLIP-440 (FLINK-36705 and follow-ups). As a
result, none of the metadata that makes a PTF distinctive is shown today.
Currently, DescribeFunctionOperation#execute emits, under the EXTENDED branch,
only:
- kind
- requirements
- is deterministic
- supports constant folding
- signature
It calls FunctionDefinition#getTypeInference(...) solely to render the
signature, ignoring the PTF-specific data already available on the same
TypeInference instance:
- TypeInference#getStaticArguments() — per-argument traits such as
ROW_SEMANTIC_TABLE / SET_SEMANTIC_TABLE, OPTIONAL_PARTITION_BY,
PASS_COLUMNS_THROUGH, SUPPORT_UPDATES, REQUIRE_UPDATE_BEFORE,
REQUIRE_FULL_DELETE, REQUIRE_ON_TIME.
- TypeInference#getStateTypeStrategies() — named state entries with their
declared types and TTL (from @StateHint(ttl = ...)).
This makes it hard for users to introspect PTFs from SQL — e.g. to confirm a
function carries state, what its TTL is, or whether an argument requires ON
TIME.
Proposed Changes
When DESCRIBE FUNCTION EXTENDED targets a function whose TypeInference exposes
static arguments and/or state entries, append additional rows to the existing
(info name, info value) result, e.g.:
{{
+-----------------------------+-----------------------------------------------+}}
{{ | info name | info value
|}}
{{
+-----------------------------+-----------------------------------------------+}}
{{ | kind | PROCESS_TABLE
|}}
{{ | requirements | []
|}}
{{ | is deterministic | true
|}}
{{ | supports constant folding | false
|}}
{{ | signature | f(input => <ROW...>, on_time =>
<TIMESTAMP>) |}}
{{ | argument: input | type=ROW<...>, traits=[SET_SEMANTIC_TABLE,
|}}
{{ | | OPTIONAL_PARTITION_BY, SUPPORT_UPDATES]
|}}
{{ | argument: on_time | type=TIMESTAMP_LTZ(3),
traits=[REQUIRE_ON_TIME]|}}
{{ | state: counter | type=BIGINT, ttl=1 d
|}}
{{
+-----------------------------+-----------------------------------------------+}}
The two-column output schema is unchanged; only new rows are added, and only
when the underlying TypeInference carries that metadata. No new SQL syntax.
Out of Scope
- New columns or a new SQL keyword (e.g. DESCRIBE FUNCTION ... STATE). Can be
considered separately if needed; would require a FLIP.
- Changes to non-EXTENDED DESCRIBE FUNCTION output.
Acceptance Criteria
- New rows produced for PTFs and any other functions whose TypeInference
exposes static arguments / state.
- No change in output for scalar/aggregate/table functions that don't expose
this metadata.
- Tests in DescribeFunctionOperationTest (or equivalent) cover a PTF with
state + traited arguments.
> Extend DESCRIBE FUNCTION EXTENDED to support PTF fields
> -------------------------------------------------------
>
> Key: FLINK-39604
> URL: https://issues.apache.org/jira/browse/FLINK-39604
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / API
> Affects Versions: 2.2.0
> Reporter: Natea Eshetu Beshada
> Assignee: Natea Eshetu Beshada
> Priority: Minor
> Labels: pull-request-available
>
> DESCRIBE FUNCTION EXTENDED was introduced in FLINK-35822 before Process
> Table Functions (PTFs) landed under FLIP-440 (FLINK-36705 and follow-ups). As
> a result, none of the metadata that makes a PTF distinctive — and several
> pieces of metadata that matter for user-defined aggregates as well — is shown
> today.
> Currently, DescribeFunctionOperation#execute emits, under the EXTENDED
> branch, only:
> - kind
> - requirements
> - is deterministic
> - supports constant folding
> - signature
> It calls FunctionDefinition#getTypeInference(...) solely to render the
> signature, ignoring everything else on TypeInference and on the
> FunctionDefinition itself that is also a class-level fact:
> - TypeInference#getStateTypeStrategies() — named state entries with their
> type and TTL (from @StateHint(ttl = ...)). Applies to PTFs and to
> user-defined AggregateFunction / TableAggregateFunction (where the
> accumulator surfaces under DEFAULT_ACCUMULATOR_NAME).
> - TypeInference#disableSystemArguments() — whether the framework
> auto-injects uid / on_time system arguments into a PTF call.
> - definition instanceof ChangelogFunction — whether a PTF may emit +U /
> -U / -D messages.
> - Presence of an onTimer method on the function class — whether a PTF
> schedules timers via TimeContext.
> This makes it hard for users to introspect PTFs and stateful aggregates
> from SQL — e.g. to confirm a function carries state, what its TTL is, whether
> the function may emit updates, or whether it relies on timers.
> h3. Proposed Changes
> Append additional rows to the existing (info name, info value) result. The
> output schema is unchanged; only new rows are added, and only when the
> underlying definition carries that metadata. No new SQL syntax.
> For PTFs:
> \{noformat}
>
> +---------------------------+---------------------------------------------------------------------+
> | info name |
> info value |
>
> +---------------------------+---------------------------------------------------------------------+
> | ... | ...
> |
> | signature | my_ptf(input => \{TABLE, SET SEMANTIC TABLE,
> OPTIONAL PARTITION BY}) |
> | state: state |
> type=ROW<`count` BIGINT>, ttl=PT24H |
> | accepts system arguments |
> true |
> | emits updates |
> true |
> | uses timers |
> true |
>
> +---------------------------+---------------------------------------------------------------------+
> \{noformat}
> For user-defined aggregates (accumulator surfaces via the same state:* row
> mechanism):
> \{noformat}
> | signature | my_agg(value => BIGINT)
> |
> | state: acc | type=STRUCTURED<'...DescribeFunctionTestAgg$Acc', `count`
> BIGINT, `sum` BIGINT>, ttl=PT48H |
> \{noformat}
> For non-PTF / non-stateful functions (most scalar UDFs, SUM, etc.) the
> output is unchanged from today.
> h3. Out of Scope
> - Per-argument rows ("argument: <name>") — redundant with the signature
> row, which already encodes name, type, and traits via f(arg => TYPE
> \{TRAITS}). Considered and rejected.
> - New SQL syntax (e.g. DESCRIBE FUNCTION ... SHOW STATE) — would require
> a FLIP.
> - Changes to the result schema — output remains (info name, info value).
> - Resolved changelog mode —
> ChangelogFunction#getChangelogMode(ChangelogContext) and
> ChangelogModeStrategy#inferChangelogMode(...) both require call-time context
> (input modes + downstream requirements), so only the instanceof boolean is
> exposed here.
> - Time / late-record / ordering behavior — all per-call.
> h3. Acceptance Criteria
> - state:* rows produced for PTFs and for user-defined AggregateFunction /
> TableAggregateFunction whose TypeInference exposes state entries.
> - "accepts system arguments", "emits updates", "uses timers" rows
> produced for PTFs (kind == PROCESS_TABLE).
> - No change in output for scalar/aggregate/table functions that don't
> expose this metadata.
> - .q-style golden test in
> flink-sql-client/src/test/resources/sql/function.q covers a PTF (with state +
> capability flags) and an aggregate (with typed accumulator + TTL).
> h3. PR
>
> [github.com/apache/flink/pull/28114|https://github.com/apache/flink/pull/28114]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)