[
https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-47840.
---------------------------------
> Remove foldable propagation across Streaming Aggregate/Join nodes
> -----------------------------------------------------------------
>
> Key: SPARK-47840
> URL: https://issues.apache.org/jira/browse/SPARK-47840
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.1, 4.0.0
> Reporter: Bhuwan Sahni
> Assignee: Bhuwan Sahni
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.5.2, 4.0.0
>
>
> Streaming queries with Union of 2 data streams followed by an Aggregate
> (groupBy) can produce incorrect results if the grouping key is a constant
> literal for micro-batch duration.
> The query produces incorrect results because the query optimizer recognizes
> the literal value in the grouping key as foldable and replaces the grouping
> key expression with the actual literal value. This optimization is correct
> for batch queries. However Streaming queries also read information from
> StateStore, and the output contains both the results from StateStore
> (computed in previous microbatches) and data from input sources (computed in
> this microbatch). The HashAggregate node after StateStore always reads
> grouping key value as the optimized literal (as the grouping key expression
> is optimized into a literal by the optimizer). This ends up replacing keys in
> StateStore with the literal value resulting in incorrect output.
> See an example logical and physical plan below for a query performing a union
> on 2 data streams, followed by a groupBy. Note that the name#4 expression has
> been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node
> as child of HashAggregate, however any grouping key read from StateStore will
> still be read as ds1 due to the optimization.
>
> *Optimized Logical Plan*
> {quote}=== Applying Rule
> org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
> === Old Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202,
> Complete, 0
> +- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS
> count#31L]
> +- Project [ds1 AS name#4|#4]
> +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> === New Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202,
> Complete, 0
> +- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS
> count#31L]
> +- Project [ds1 AS name#4|#4]
> +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> ====
> {quote}
> *Corresponding Physical Plan*
> {quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer:
> org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
>
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
> +- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge
> count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4,
> count#31L|#4, count#31L])
> +- StateStoreSave [ds1#39|#39], state info [ checkpoint =
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
> runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0,
> numPartitions = 5], Complete, 0, 0, 2
> +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39,
> count#38L])
> +- StateStoreRestore [ds1#39|#39], state info [ checkpoint =
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
> runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0,
> numPartitions = 5], 2
> +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39,
> count#38L])
> +- HashAggregate(keys=[ds1 AS ds1#39|#39],
> functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39,
> count#38L|#39, count#38L])
> +- Project
> +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
> {quote}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]