[ 
https://issues.apache.org/jira/browse/SPARK-55038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Zhang updated SPARK-55038:
-------------------------------
    Description: 
h4. Summary

When `spark.sql.adaptive.enabled=true` 
and`spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1`,  queries 
using `array_agg(DISTINCT)` in correlated subqueries produce incorrect results.

 
h4. Reproducer
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.objectHashAggregate.sortBased.fallbackThreshold", "1")
spark.sql("""
  WITH t AS (SELECT explode(array(15, 16, 17)) AS v)
  SELECT
      (SELECT COUNT(*) FROM t WHERE array_contains(arr1, v)) AS array_agg_cnt,
      (SELECT COUNT(*) FROM t WHERE array_contains(arr2, v)) AS collect_set_cnt,
      arr1,
      arr2
  FROM (
      SELECT
          array_agg(DISTINCT v) AS arr1,
          collect_set(v) AS arr2
      FROM t
  )
""").show()

Expected: Both counts = 3
Actual: array_agg_cnt = 0, collect_set_cnt = 3
  {code}
 
h4. Root Cause Analysis

 

The correlated subquery is decorrelated into a hash join using the array as the 
join key.

When array_agg(DISTINCT) (which uses collect_list(distinct)) is computed 
multiple times with sort-based aggregation + AQE, the computations produce 
arrays with the same elements but different orderings. Since array equality is 
element-by-element, the hash join fails to match.

 
h4. Workarounds

 

  - Set spark.sql.adaptive.enabled=false, OR
  - Set spark.sql.objectHashAggregate.sortBased.fallbackThreshold to default 
(128)

 
h4. Test Results

 

  - AQE=true, sortBased.fallbackThreshold=1: FAIL (cnt=0, expected 3)
  - AQE=false, sortBased.fallbackThreshold=1: PASS (cnt=3)
  - AQE=true, sortBased.fallbackThreshold=128 (default): PASS (cnt=3)

 
h4. Conclusion

 

The bug only occurs when both AQE is enabled and sortBased.fallbackThreshold=1.

 

  was:
h4. Summary


When `spark.sql.adaptive.enabled=true` 
and`spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1`,  queries 
using `array_agg(DISTINCT)` in correlated subqueries produce incorrect results.

 
h4. Reproducer
{code:java}
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.objectHashAggregate.sortBased.fallbackThreshold", 
"1")  spark.sql("""
    WITH t AS (SELECT explode(array(15, 16, 17)) AS v)
    SELECT (SELECT COUNT(*) FROM t WHERE array_contains(arr, v)) AS cnt, arr
    FROM (SELECT collect_set(v) AS arr FROM t)
""").show() 

-- Expected: cnt = 3
-- Actual: cnt = 0
  {code}
 
h4. Root Cause Analysis

 

The correlated subquery is decorrelated into a hash join using the array as the 
join key.

When array_agg(DISTINCT) (which uses collect_list(distinct)) is computed 
multiple times with sort-based aggregation + AQE, the computations produce 
arrays with the same elements but different orderings. Since array equality is 
element-by-element, the hash join fails to match.

 
h4. Workarounds

 

  - Set spark.sql.adaptive.enabled=false, OR
  - Set spark.sql.objectHashAggregate.sortBased.fallbackThreshold to default 
(128)

 
h4. Test Results

 

  - AQE=true, sortBased.fallbackThreshold=1: FAIL (cnt=0, expected 3)
  - AQE=false, sortBased.fallbackThreshold=1: PASS (cnt=3)
  - AQE=true, sortBased.fallbackThreshold=128 (default): PASS (cnt=3)

 
h4. Conclusion

 

The bug only occurs when both AQE is enabled and sortBased.fallbackThreshold=1.

 


> AQE + sortBased aggregation produces wrong results for array_agg(DISTINCT) in 
> correlated subqueries
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55038
>                 URL: https://issues.apache.org/jira/browse/SPARK-55038
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.0, 4.0.0
>         Environment:   - Spark 4.0.1 (also reproducible on 3.5.x)
>   - Reproduced in both local[4] and distributed mode
>  
>            Reporter: Feng Zhang
>            Priority: Critical
>
> h4. Summary
> When `spark.sql.adaptive.enabled=true` 
> and`spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1`,  queries 
> using `array_agg(DISTINCT)` in correlated subqueries produce incorrect 
> results.
>  
> h4. Reproducer
> {code:java}
> spark.conf.set("spark.sql.adaptive.enabled", "true")
> spark.conf.set("spark.sql.objectHashAggregate.sortBased.fallbackThreshold", 
> "1")
> spark.sql("""
>   WITH t AS (SELECT explode(array(15, 16, 17)) AS v)
>   SELECT
>       (SELECT COUNT(*) FROM t WHERE array_contains(arr1, v)) AS array_agg_cnt,
>       (SELECT COUNT(*) FROM t WHERE array_contains(arr2, v)) AS 
> collect_set_cnt,
>       arr1,
>       arr2
>   FROM (
>       SELECT
>           array_agg(DISTINCT v) AS arr1,
>           collect_set(v) AS arr2
>       FROM t
>   )
> """).show()
> Expected: Both counts = 3
> Actual: array_agg_cnt = 0, collect_set_cnt = 3
>   {code}
>  
> h4. Root Cause Analysis
>  
> The correlated subquery is decorrelated into a hash join using the array as 
> the join key.
> When array_agg(DISTINCT) (which uses collect_list(distinct)) is computed 
> multiple times with sort-based aggregation + AQE, the computations produce 
> arrays with the same elements but different orderings. Since array equality 
> is element-by-element, the hash join fails to match.
>  
> h4. Workarounds
>  
>   - Set spark.sql.adaptive.enabled=false, OR
>   - Set spark.sql.objectHashAggregate.sortBased.fallbackThreshold to default 
> (128)
>  
> h4. Test Results
>  
>   - AQE=true, sortBased.fallbackThreshold=1: FAIL (cnt=0, expected 3)
>   - AQE=false, sortBased.fallbackThreshold=1: PASS (cnt=3)
>   - AQE=true, sortBased.fallbackThreshold=128 (default): PASS (cnt=3)
>  
> h4. Conclusion
>  
> The bug only occurs when both AQE is enabled and 
> sortBased.fallbackThreshold=1.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to