[
https://issues.apache.org/jira/browse/SPARK-40925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17679003#comment-17679003
]
Apache Spark commented on SPARK-40925:
--------------------------------------
User 'HeartSaVioR' has created a pull request for this issue:
https://github.com/apache/spark/pull/39662
> Fix late record filtering to support chaining of steteful operators
> -------------------------------------------------------------------
>
> Key: SPARK-40925
> URL: https://issues.apache.org/jira/browse/SPARK-40925
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 3.4.0
> Reporter: Alex Balikov
> Assignee: Alex Balikov
> Priority: Major
> Labels: release-notes
> Fix For: 3.4.0
>
>
> This is followup ticket on https://issues.apache.org/jira/browse/SPARK-40821.
> Here we propose fixing the late record filtering in stateful operators to
> allow chaining of stateful operators which do not produce delayed records
> (like time-interval join of potentially flatMapGroupsWithState) - e.g.
> time-equality streaming join followed by aggregations or chaining of window
> aggregations.
>
> There are 2 issues which need to be addressed:
> # Stateful operators filter input late records based on the current
> watermark. If e.g. chaining window aggregations, the records produced by the
> first window aggregation will be behind the current watermark by semantics
> (the watermark closes all past windows and emits the corresponding
> aggregates) and therefore these records will by definition appear late
> relative to the current watermark in the second stateful operator. The
> proposed fix for this issue is to use the previous batch watermark for late
> record filtering and the current batch watermark for state eviction -
> effectively each stateful operator should be initialized with 2 watermark
> values instead of 1.
> # The second issue with chaining window aggregators is that the records
> produced by the first aggregator do not have explicit event time column and
> thus can not be directly fed into a subsequent stateful operator which needs
> that column. This is partially handled by
> [https://github.com/apache/spark/pull/38288] so the user can explicitly
> introduce a new event time column by extracting the event time from the
> window column. This is slightly cumbersome. We propose changing the window
> function to handle the window column transparently - e.g.
> input
> .withWatermark("eventTime", "1 seconds")
> .groupBy(window($"eventTime", "5 seconds") as 'window)
> .agg(count("*") as 'count)
> .groupBy({_}*window($"window", "10 seconds")*{_})
> .agg(count("*") as 'count, sum("count") as 'sum)
> .select($"count".as[Long], $"sum".as[Long])
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]