[
https://issues.apache.org/jira/browse/FLINK-39899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18087848#comment-18087848
]
Chen Zhang commented on FLINK-39899:
------------------------------------
h3. Root Cause
The bug is in the {{inferRowType()}} method of {{SqlWindowTableFunction.java}}:
{code}
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
{code}
This method constructs the output schema of all window TVFs
(TUMBLE/HOP/CUMULATE) via {{ARG0_TABLE_FUNCTION_WINDOWING}}. It uses
{{.addAll(inputRowType.getFieldList())}} to copy all input fields into the
output, which blindly preserves the {{TimeIndicatorRelDataType}}
({{*ROWTIME*}}) on the original time column instead of materializing it to a
regular {{TIMESTAMP}}.
h3. Proposed Fix
In {{inferRowType()}}, replace the {{.addAll(inputRowType.getFieldList())}}
call with a loop that checks each field via
{{FlinkTypeFactory.isTimeIndicatorType()}}. For any field that is a time
indicator, materialize it to a regular {{TIMESTAMP}} (same SQL type name and
precision, but without the time indicator wrapper). Leave all other fields
unchanged.
This ensures only {{window_time}} retains {{*ROWTIME*}} in the output, per
FLIP-145 specification.
h3. Before/After
||Column||Before||After||
|{{ts}}|{{TIMESTAMP(3) \*ROWTIME\*}}|{{TIMESTAMP(3)}}|
|{{window_time}}|{{TIMESTAMP(3) NOT NULL \*ROWTIME\*}}|{{TIMESTAMP(3) NOT NULL
\*ROWTIME\*}} (unchanged)|
h3. Effects
* {{ORDER BY ts}} in a downstream streaming OVER aggregation would be
*correctly rejected*
* The "Found more than one rowtime field" error (FLINK-24186) would no longer
occur at the root cause level
* No impact on {{window_time}} — it remains the sole time attribute as designed
in FLIP-145
> Flink SQL Window TVF didn't remove rowtime attribute from original rowtime
> field
> --------------------------------------------------------------------------------
>
> Key: FLINK-39899
> URL: https://issues.apache.org/jira/browse/FLINK-39899
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.19.3
> Reporter: Chen Zhang
> Priority: Major
>
> h2. Summary
> Window Table-Valued Functions (TUMBLE/HOP/CUMULATE) do not materialize the
> original rowtime attribute column to a regular {{TIMESTAMP}} type in the
> output schema. Per FLIP-145 specification, the original time attribute should
> become a regular timestamp after applying the window TVF, with only
> {{window_time}} remaining as the rowtime attribute. Instead, both the
> original column and {{window_time}} retain the {{*ROWTIME*}} indicator.
> h2. Description
> *FLIP-145 states:*
> {quote}
> The original row time attribute "timecol" will be a regular timestamp column
> after applying window TVF.
> {quote}
> *Actual behavior:*
> After applying {{TUMBLE(TABLE t, DESCRIPTOR(ts), INTERVAL '10' SECOND)}}, the
> output schema shows:
> {code}
> `window_time` TIMESTAMP(3) NOT NULL *ROWTIME*
> `ts` TIMESTAMP(3) *ROWTIME* <-- should be regular
> TIMESTAMP(3)
> {code}
> Both {{ts}} and {{window_time}} are marked as {{*ROWTIME*}}, violating the
> FLIP-145 design.
> *Evidence from plan AST:*
> {code}
> LogicalTableFunctionScan(
> invocation=[TUMBLE(DESCRIPTOR($2), 10000:INTERVAL SECOND)],
> rowType=[RecordType(
> VARCHAR entity_id,
> VARCHAR payload,
> TIMESTAMP(3) *ROWTIME* ts, <-- STILL ROWTIME
> TIMESTAMP(3) window_start,
> TIMESTAMP(3) window_end,
> TIMESTAMP(3) *ROWTIME* window_time
> )]
> )
> {code}
> h2. Impact
> h3. 1. Silent data loss in OVER aggregation after window TVF
> Because {{ts}} retains {{*ROWTIME*}}, it passes the time-attribute validation
> in {{StreamExecOverAggregate.translateToPlanInternal()}}. The planner accepts
> queries like:
> {code:sql}
> SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED PRECEDING)
> FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))
> {code}
> At runtime, the window aggregate operator ({{SlicingWindowOperator}})
> registers timers at {{window_end - 1}} and forwards intermediate watermarks.
> When the window fires and emits records, records with {{ts}} values early in
> the window are *behind the downstream OVER operator's watermark* and are
> *silently dropped as late*.
> Example: For a window {{[12:00:00, 12:00:10)}}:
> * Intermediate watermarks (e.g., {{12:00:05}}, {{12:00:08}}) are forwarded to
> the OVER operator
> * When the window fires, records with {{ts = 12:00:01}}, {{12:00:03}},
> {{12:00:06}} are late (behind watermark {{12:00:08}})
> * Only records near the end of the window survive
> * With {{window_time = 12:00:09.999}}, all records share the same timestamp
> and none are dropped
> h3. 2. Multiple rowtime columns in output
> Having two rowtime columns causes errors when writing to sinks:
> {code}
> TableException: The query contains more than one rowtime attribute column
> [window_time, ts] for writing into table '*anonymous_datastream_sink*'.
> {code}
> This was partially worked around in FLINK-24186 by relaxing the check for
> collect/print sinks, but the root cause was never fixed.
> h2. Root Cause
> The window TVF's output type derivation (in the planner's type inference for
> {{LogicalTableFunctionScan}}) preserves the {{TimeIndicatorRelDataType}} on
> the original time column. It should materialize the original time column to a
> regular {{TIMESTAMP}} type, keeping only {{window_time}} as {{*ROWTIME*}}.
> h2. Steps to Reproduce
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
> EnvironmentSettings.newInstance().inStreamingMode().build());
> // Create source with rowtime
> tableEnv.executeSql(
> "CREATE TABLE source (" +
> " id STRING, ts TIMESTAMP(3)," +
> " WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
> ") WITH ('connector' = 'datagen')");
> // Query: TUMBLE + OVER using original ts
> Table result = tableEnv.sqlQuery(
> "SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED
> PRECEDING) AS cnt " +
> "FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))");
> // Inspect schema - ts should NOT be *ROWTIME*
> result.printSchema();
> // Actual: `ts` TIMESTAMP(3) *ROWTIME*
> // Expected: `ts` TIMESTAMP(3)
> {code}
> h2. Expected Behavior
> After window TVF, the output schema should be:
> {code}
> `ts` TIMESTAMP(3) <-- regular timestamp, NOT
> rowtime
> `window_time` TIMESTAMP(3) NOT NULL *ROWTIME* <-- sole rowtime attribute
> {code}
> The OVER aggregation {{ORDER BY ts}} should be *rejected* by the planner
> because {{ts}} is no longer a time attribute.
> h2. Related Issues
> * FLINK-24186 - Worked around the "multiple rowtime columns" symptom for
> collect/print sinks
> * FLINK-38162 - Time attribute propagation issues with SQL functions after
> window TVF
> * FLINK-10211 - Broader issue with time indicator materialization
> *
> [FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145:+Support+SQL+windowing+table-valued+function]
> - Original design specification for window TVFs
--
This message was sent by Atlassian Jira
(v8.20.10#820010)