[ 
https://issues.apache.org/jira/browse/FLINK-39899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Zhang updated FLINK-39899:
-------------------------------
    Affects Version/s: 1.20.4

> Flink SQL Window TVF didn't remove rowtime attribute from original rowtime 
> field
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-39899
>                 URL: https://issues.apache.org/jira/browse/FLINK-39899
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.19.3, 1.20.4
>            Reporter: Chen Zhang
>            Priority: Major
>
> h2. Summary
> Window Table-Valued Functions (TUMBLE/HOP/CUMULATE) do not materialize the 
> original rowtime attribute column to a regular {{TIMESTAMP}} type in the 
> output schema. Per FLIP-145 specification, the original time attribute should 
> become a regular timestamp after applying the window TVF, with only 
> {{window_time}} remaining as the rowtime attribute. Instead, both the 
> original column and {{window_time}} retain the {{*ROWTIME*}} indicator.
> h2. Description
> *FLIP-145 states:*
> {quote}
> The original row time attribute "timecol" will be a regular timestamp column 
> after applying window TVF.
> {quote}
> *Actual behavior:*
> After applying {{TUMBLE(TABLE t, DESCRIPTOR(ts), INTERVAL '10' SECOND)}}, the 
> output schema shows:
> {code}
> `window_time` TIMESTAMP(3) NOT NULL *ROWTIME*
> `ts`          TIMESTAMP(3) *ROWTIME*            <-- should be regular 
> TIMESTAMP(3)
> {code}
> Both {{ts}} and {{window_time}} are marked as {{*ROWTIME*}}, violating the 
> FLIP-145 design.
> *Evidence from plan AST:*
> {code}
> LogicalTableFunctionScan(
>   invocation=[TUMBLE(DESCRIPTOR($2), 10000:INTERVAL SECOND)],
>   rowType=[RecordType(
>     VARCHAR entity_id,
>     VARCHAR payload,
>     TIMESTAMP(3) *ROWTIME* ts,           <-- STILL ROWTIME
>     TIMESTAMP(3) window_start,
>     TIMESTAMP(3) window_end,
>     TIMESTAMP(3) *ROWTIME* window_time
>   )]
> )
> {code}
> h2. Impact
> h3. 1. Silent data loss in OVER aggregation after window TVF
> Because {{ts}} retains {{*ROWTIME*}}, it passes the time-attribute validation 
> in {{StreamExecOverAggregate.translateToPlanInternal()}}. The planner accepts 
> queries like:
> {code:sql}
> SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED PRECEDING)
> FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))
> {code}
> At runtime, the window aggregate operator ({{SlicingWindowOperator}}) 
> registers timers at {{window_end - 1}} and forwards intermediate watermarks. 
> When the window fires and emits records, records with {{ts}} values early in 
> the window are *behind the downstream OVER operator's watermark* and are 
> *silently dropped as late*.
> Example: For a window {{[12:00:00, 12:00:10)}}:
> * Intermediate watermarks (e.g., {{12:00:05}}, {{12:00:08}}) are forwarded to 
> the OVER operator
> * When the window fires, records with {{ts = 12:00:01}}, {{12:00:03}}, 
> {{12:00:06}} are late (behind watermark {{12:00:08}})
> * Only records near the end of the window survive
> * With {{window_time = 12:00:09.999}}, all records share the same timestamp 
> and none are dropped
> h3. 2. Multiple rowtime columns in output
> Having two rowtime columns causes errors when writing to sinks:
> {code}
> TableException: The query contains more than one rowtime attribute column
> [window_time, ts] for writing into table '*anonymous_datastream_sink*'.
> {code}
> This was partially worked around in FLINK-24186 by relaxing the check for 
> collect/print sinks, but the root cause was never fixed.
> h2. Root Cause
> The window TVF's output type derivation (in the planner's type inference for 
> {{LogicalTableFunctionScan}}) preserves the {{TimeIndicatorRelDataType}} on 
> the original time column. It should materialize the original time column to a 
> regular {{TIMESTAMP}} type, keeping only {{window_time}} as {{*ROWTIME*}}.
> h2. Steps to Reproduce
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
>     EnvironmentSettings.newInstance().inStreamingMode().build());
> // Create source with rowtime
> tableEnv.executeSql(
>     "CREATE TABLE source (" +
>     "  id STRING, ts TIMESTAMP(3)," +
>     "  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
>     ") WITH ('connector' = 'datagen')");
> // Query: TUMBLE + OVER using original ts
> Table result = tableEnv.sqlQuery(
>     "SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED 
> PRECEDING) AS cnt " +
>     "FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))");
> // Inspect schema - ts should NOT be *ROWTIME*
> result.printSchema();
> // Actual:   `ts` TIMESTAMP(3) *ROWTIME*
> // Expected: `ts` TIMESTAMP(3)
> {code}
> h2. Expected Behavior
> After window TVF, the output schema should be:
> {code}
> `ts`          TIMESTAMP(3)                      <-- regular timestamp, NOT 
> rowtime
> `window_time` TIMESTAMP(3) NOT NULL *ROWTIME*   <-- sole rowtime attribute
> {code}
> The OVER aggregation {{ORDER BY ts}} should be *rejected* by the planner 
> because {{ts}} is no longer a time attribute.
> h2. Related Issues
> * FLINK-24186 - Worked around the "multiple rowtime columns" symptom for 
> collect/print sinks
> * FLINK-38162 - Time attribute propagation issues with SQL functions after 
> window TVF
> * FLINK-10211 - Broader issue with time indicator materialization
> * 
> [FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145:+Support+SQL+windowing+table-valued+function]
>  - Original design specification for window TVFs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to