[ 
https://issues.apache.org/jira/browse/SPARK-40821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-40821:
---------------------------------
    Description: 
Currently chaining of stateful operators is Spark Structured Streaming is not 
supported for various reasons and is blocked by the unsupported operations 
check (spark.sql.streaming.unsupportedOperationCheck flag). We propose to fix 
this as chaining of stateful operators is a common streaming scenario - e.g.

stream-stream join -> windowed aggregation

window aggregation -> window aggregation

etc

What is broken:
 # every stateful operator performs late record filtering against the global 
watermark. When chaining stateful operators (e.g. window aggregations) the 
output produced by the first stateful operator is effectively late against the 
watermark and thus filtered out by the next operator late record filtering 
(technically the next operator should not do late record filtering but it can 
be changed to assert for correctness detection, etc)
 # when chaining window aggregations, the first window aggregating operator 
produces records with schema \{ window: { start: Timestamp, end: Timestamp }, 
agg: Long } - there is not explicit event time in the schema to be used by the 
next stateful operator (the correct event time should be window.end - 1 )

 # stream-stream time-interval join can produce late records by semantics, e.g. 
if the join condition is:

left.eventTime BETWEEN right.eventTime + INTERVAL 1 HOUR right.eventTime - 
INTERVAL 1 HOUR

          the produced records can be delayed by 1 hr relative to the watermark.

Proposed fixes:

 1. 1 can be fixed by performing late record filtering against the previous 
microbatch watermark instead of the current microbatch watermark.

2. 2 can be fixed by allowing the window and session_window functions to work 
on the window column directly and compute the correct event time transparently 
to the user. Also, introduce window_time SQL function to compute correct event 
time from the window column.

3. 3 can be fixed by adding support for per-operator watermarks instead of a 
single global watermark. In the example of stream-stream time interval join 
followed by a stateful operator, the join operator will 'delay' the downstream 
operator watermarks by a correct value to handle the delayed records. Only 
stream-stream time-interval joins will be delaying the watermark, any other 
operators will not delay downstream watermarks.

 

*This ticket handles no. 2 of the proposal.* Others will be handled in separate 
ticket.

 

  was:
Currently chaining of stateful operators is Spark Structured Streaming is not 
supported for various reasons and is blocked by the unsupported operations 
check (spark.sql.streaming.unsupportedOperationCheck flag). We propose to fix 
this as chaining of stateful operators is a common streaming scenario - e.g.

stream-stream join -> windowed aggregation

window aggregation -> window aggregation

etc

What is broken:
 # every stateful operator performs late record filtering against the global 
watermark. When chaining stateful operators (e.g. window aggregations) the 
output produced by the first stateful operator is effectively late against the 
watermark and thus filtered out by the next operator late record filtering 
(technically the next operator should not do late record filtering but it can 
be changed to assert for correctness detection, etc)
 # when chaining window aggregations, the first window aggregating operator 
produces records with schema \{ window: { start: Timestamp, end: Timestamp }, 
agg: Long } - there is not explicit event time in the schema to be used by the 
next stateful operator (the correct event time should be window.end - 1 )
 # stream-stream time-interval join can produce late records by semantics, e.g. 
if the join condition is:

left.eventTime BETWEEN right.eventTime + INTERVAL 1 HOUR right.eventTime - 
INTERVAL 1 HOUR

          the produced records can be delayed by 1 hr relative to the watermark.

Proposed fixes:

 1. 1 can be fixed by performing late record filtering against the previous 
microbatch watermark instead of the current microbatch watermark.

2. 2 can be fixed by allowing the window and session_window functions to work 
on the window column directly and compute the correct event time transparently 
to the user. Also, introduce window_time SQL function to compute correct event 
time from the window column.

3. 3 can be fixed by adding support for per-operator watermarks instead of a 
single global watermark. In the example of stream-stream time interval join 
followed by a stateful operator, the join operator will 'delay' the downstream 
operator watermarks by a correct value to handle the delayed records. Only 
stream-stream time-interval joins will be delaying the watermark, any other 
operators will not delay downstream watermarks.

 

 


> Fix late record filtering to support chaining of stateful operators
> -------------------------------------------------------------------
>
>                 Key: SPARK-40821
>                 URL: https://issues.apache.org/jira/browse/SPARK-40821
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.4.0
>            Reporter: Alex Balikov
>            Assignee: Alex Balikov
>            Priority: Major
>             Fix For: 3.4.0
>
>
> Currently chaining of stateful operators is Spark Structured Streaming is not 
> supported for various reasons and is blocked by the unsupported operations 
> check (spark.sql.streaming.unsupportedOperationCheck flag). We propose to fix 
> this as chaining of stateful operators is a common streaming scenario - e.g.
> stream-stream join -> windowed aggregation
> window aggregation -> window aggregation
> etc
> What is broken:
>  # every stateful operator performs late record filtering against the global 
> watermark. When chaining stateful operators (e.g. window aggregations) the 
> output produced by the first stateful operator is effectively late against 
> the watermark and thus filtered out by the next operator late record 
> filtering (technically the next operator should not do late record filtering 
> but it can be changed to assert for correctness detection, etc)
>  # when chaining window aggregations, the first window aggregating operator 
> produces records with schema \{ window: { start: Timestamp, end: Timestamp }, 
> agg: Long } - there is not explicit event time in the schema to be used by 
> the next stateful operator (the correct event time should be window.end - 1 )
>  # stream-stream time-interval join can produce late records by semantics, 
> e.g. if the join condition is:
> left.eventTime BETWEEN right.eventTime + INTERVAL 1 HOUR right.eventTime - 
> INTERVAL 1 HOUR
>           the produced records can be delayed by 1 hr relative to the 
> watermark.
> Proposed fixes:
>  1. 1 can be fixed by performing late record filtering against the previous 
> microbatch watermark instead of the current microbatch watermark.
> 2. 2 can be fixed by allowing the window and session_window functions to work 
> on the window column directly and compute the correct event time 
> transparently to the user. Also, introduce window_time SQL function to 
> compute correct event time from the window column.
> 3. 3 can be fixed by adding support for per-operator watermarks instead of a 
> single global watermark. In the example of stream-stream time interval join 
> followed by a stateful operator, the join operator will 'delay' the 
> downstream operator watermarks by a correct value to handle the delayed 
> records. Only stream-stream time-interval joins will be delaying the 
> watermark, any other operators will not delay downstream watermarks.
>  
> *This ticket handles no. 2 of the proposal.* Others will be handled in 
> separate ticket.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to