[
https://issues.apache.org/jira/browse/SPARK-40821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17622892#comment-17622892
]
Jungtaek Lim commented on SPARK-40821:
--------------------------------------
[~alex-balikov]
I thought you created multiple JIRA tickets... My bad. Could you please clone
this ticket to have a separate ticket number, and update this ticket to only
contain the window_time change? Thanks in advance.
> Fix late record filtering to support chaining of steteful operators
> -------------------------------------------------------------------
>
> Key: SPARK-40821
> URL: https://issues.apache.org/jira/browse/SPARK-40821
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 3.3.0
> Reporter: Alex Balikov
> Assignee: Alex Balikov
> Priority: Major
> Fix For: 3.4.0
>
>
> Currently chaining of stateful operators is Spark Structured Streaming is not
> supported for various reasons and is blocked by the unsupported operations
> check (spark.sql.streaming.unsupportedOperationCheck flag). We propose to fix
> this as chaining of stateful operators is a common streaming scenario - e.g.
> stream-stream join -> windowed aggregation
> window aggregation -> window aggregation
> etc
> What is broken:
> # every stateful operator performs late record filtering against the global
> watermark. When chaining stateful operators (e.g. window aggregations) the
> output produced by the first stateful operator is effectively late against
> the watermark and thus filtered out by the next operator late record
> filtering (technically the next operator should not do late record filtering
> but it can be changed to assert for correctness detection, etc)
> # when chaining window aggregations, the first window aggregating operator
> produces records with schema \{ window: { start: Timestamp, end: Timestamp },
> agg: Long } - there is not explicit event time in the schema to be used by
> the next stateful operator (the correct event time should be window.end - 1 )
> # stream-stream time-interval join can produce late records by semantics,
> e.g. if the join condition is:
> left.eventTime BETWEEN right.eventTime + INTERVAL 1 HOUR right.eventTime -
> INTERVAL 1 HOUR
> the produced records can be delayed by 1 hr relative to the
> watermark.
> Proposed fixes:
> 1. 1 can be fixed by performing late record filtering against the previous
> microbatch watermark instead of the current microbatch watermark.
> 2. 2 can be fixed by allowing the window and session_window functions to work
> on the window column directly and compute the correct event time
> transparently to the user. Also, introduce window_time SQL function to
> compute correct event time from the window column.
> 3. 3 can be fixed by adding support for per-operator watermarks instead of a
> single global watermark. In the example of stream-stream time interval join
> followed by a stateful operator, the join operator will 'delay' the
> downstream operator watermarks by a correct value to handle the delayed
> records. Only stream-stream time-interval joins will be delaying the
> watermark, any other operators will not delay downstream watermarks.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]