[ 
https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhuwan Sahni updated SPARK-47840:
---------------------------------
    Description: 
Streaming queries with Union of 2 data streams followed by an Aggregate 
(groupBy) can produce incorrect results if the grouping key is a constant 
literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the 
literal value in the grouping key as foldable and replaces the grouping key 
expression with the actual literal value. This optimization is correct for 
batch queries. However Streaming queries also read information from StateStore, 
and the output contains both the results from StateStore (computed in previous 
microbatches) and data from input sources (computed in this microbatch). The 
HashAggregate node after StateStore always reads grouping key value as the 
optimized literal (as the grouping key expression is optimized into a literal 
by the optimizer). This ends up replacing keys in StateStore with the literal 
value resulting in incorrect output. 

See an example logical and physical plan below for a query performing a union 
on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
as child of HashAggregate, however any grouping key read from StateStore will 
still be read as ds1 due to the optimization. 

 

*Optimized Logical Plan*
{quote}=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===

=== Old Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource

=== New Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource

====
{quote}
*Corresponding Physical Plan*
{quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
+- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
count#31L|#4, count#31L])
   +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0, 0, 2
      +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
         +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], 2
            +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
               +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, count#38L|#39, 
count#38L])
                  +- Project
                     +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
{quote}
 

  was:
Streaming queries with Union of 2 data streams followed by an Aggregate 
(groupBy) can produce incorrect results if the grouping key is a constant 
literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the 
literal value in the grouping key as foldable and replaces the grouping key 
expression with the actual literal value. This optimization is correct for 
batch queries. However Streaming queries also read information from StateStore, 
and the output contains both the results from StateStore (computed in previous 
microbatches) and data from input sources (computed in this microbatch). The 
HashAggregate node after StateStore always reads grouping key value as the 
optimized literal (as the grouping key expression is optimized into a literal 
by the optimizer). This ends up replacing keys in StateStore with the literal 
value resulting in incorrect output. 

See an example logical and physical plan below for a query performing a union 
on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
as child of HashAggregate, however any grouping key read from StateStore will 
still be read as ds1 due to the optimization. 

 

### Optimized Logical Plan
{quote}
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation 
===

=== Old Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource

=== New Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource

====
{quote}




### Corresponding Physical Plan
{quote}
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
+- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
count#31L|#4, count#31L])
   +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0, 0, 2
      +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
         +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], 2
            +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
               +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, count#38L|#39, 
count#38L])
                  +- Project
                     +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
{quote}
 


> Remove foldable propagation across Streaming Aggregate/Join nodes
> -----------------------------------------------------------------
>
>                 Key: SPARK-47840
>                 URL: https://issues.apache.org/jira/browse/SPARK-47840
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.0.0, 3.5.1
>            Reporter: Bhuwan Sahni
>            Priority: Major
>
> Streaming queries with Union of 2 data streams followed by an Aggregate 
> (groupBy) can produce incorrect results if the grouping key is a constant 
> literal for micro-batch duration.
> The query produces incorrect results because the query optimizer recognizes 
> the literal value in the grouping key as foldable and replaces the grouping 
> key expression with the actual literal value. This optimization is correct 
> for batch queries. However Streaming queries also read information from 
> StateStore, and the output contains both the results from StateStore 
> (computed in previous microbatches) and data from input sources (computed in 
> this microbatch). The HashAggregate node after StateStore always reads 
> grouping key value as the optimized literal (as the grouping key expression 
> is optimized into a literal by the optimizer). This ends up replacing keys in 
> StateStore with the literal value resulting in incorrect output. 
> See an example logical and physical plan below for a query performing a union 
> on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
> been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
> as child of HashAggregate, however any grouping key read from StateStore will 
> still be read as ds1 due to the optimization. 
>  
> *Optimized Logical Plan*
> {quote}=== Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===
> === Old Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> === New Plan ===
> WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
> Complete, 0
> +- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
> count#31L]
>    +- Project [ds1 AS name#4|#4]
>       +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource
> ====
> {quote}
> *Corresponding Physical Plan*
> {quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
> org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
>  
> org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
> +- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
> count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
> count#31L|#4, count#31L])
>    +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], Complete, 0, 0, 2
>       +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>          +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
> [file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
>  runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, 
> numPartitions = 5], 2
>             +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
> count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
> count#38L])
>                +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
> functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, 
> count#38L|#39, count#38L])
>                   +- Project
>                      +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to