[ 
https://issues.apache.org/jira/browse/SPARK-44156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuel Velzi updated SPARK-44156:
----------------------------------
    Summary: SortAggregation slows down dropDuplicates().  (was: Should 
HashAggregation improve dropDuplicates()?)

> SortAggregation slows down dropDuplicates().
> --------------------------------------------
>
>                 Key: SPARK-44156
>                 URL: https://issues.apache.org/jira/browse/SPARK-44156
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.2
>            Reporter: Emanuel Velzi
>            Priority: Major
>
> TL;DR: SortAggregate makes dropDuplicates slower than HashAggregate.
> How to make Spark to use HashAggregate over SortAggregate? 
> ----------------------
> We have a Spark cluster running on Kubernetes with the following 
> configurations:
>  * Spark v3.3.2
>  * Hadoop 3.3.4
>  * Java 17
> We are running a simple job on a dataset (~6GBi) with almost 600 columns, 
> many of which contain null values. The job involves the following steps:
>  # Load data from S3.
>  # Apply dropDuplicates().
>  # Save the deduplicated data back to S3 using magic committers.
> One of the columns is of type "map". When we run dropDuplicates() without 
> specifying any parameters (i.e., using all columns), it throws an error:
>  
> {noformat}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> have map type columns in DataFrame which calls set operations(intersect, 
> except, etc.), but the type of column my_column is 
> map<string,array<struct<ci:string,co:string,cur:string,Getaways_Zone:string,Getaways_ID:string>>>;{noformat}
>  
> To overcome this issue, we used "dropDuplicates(id)" by specifying an 
> identifier column.
> However, the performance of this method was {*}much worse than expected{*}, 
> taking around 30 minutes.
> As an alternative approach, we tested converting the "map" column to JSON, 
> applying dropDuplicates() without parameters, and then converting the column 
> back to "map" format:
>  
> {code:java}
> DataType t = ds.schema().apply("my_column").dataType();
> ds = ds.withColumn("my_column", functions.to_json(ds.col("my_column")));
> ds = ds.dropDuplicates();
> ds = ds.withColumn("my_column", functions.from_json(ds.col("my_column"),t)); 
> {code}
>  
> Surprisingly, this approach {*}significantly improved the performance{*}, 
> reducing the execution time to 7 minutes.
> The only noticeable difference was in the execution plan. In the *slower* 
> case, the execution plan involved {*}SortAggregate{*}, while in the *faster* 
> case, it involved {*}HashAggregate{*}.
>  
> {noformat}
> == Physical Plan [slow case] == 
> Execute InsertIntoHadoopFsRelationCommand (13)
> +- AdaptiveSparkPlan (12)
>    +- == Final Plan ==
>       Coalesce (8)
>       +- SortAggregate (7)
>          +- Sort (6)
>             +- ShuffleQueryStage (5), Statistics(sizeInBytes=141.3 GiB, 
> rowCount=1.25E+7)
>                +- Exchange (4)
>                   +- SortAggregate (3)
>                      +- Sort (2)
>                         +- Scan parquet  (1)
>    +- == Initial Plan ==
>       Coalesce (11)
>       +- SortAggregate (10)
>          +- Sort (9)
>             +- Exchange (4)
>                +- SortAggregate (3)
>                   +- Sort (2)
>                      +- Scan parquet  (1)
> {noformat}
> {noformat}
> == Physical Plan [fast case] ==
> Execute InsertIntoHadoopFsRelationCommand (11)
> +- AdaptiveSparkPlan (10)
>    +- == Final Plan ==
>       Coalesce (7)
>       +- HashAggregate (6)
>          +- ShuffleQueryStage (5), Statistics(sizeInBytes=81.6 GiB, 
> rowCount=1.25E+7)
>             +- Exchange (4)
>                +- HashAggregate (3)
>                   +- Project (2)
>                      +- Scan parquet  (1)
>    +- == Initial Plan ==
>       Coalesce (9)
>       +- HashAggregate (8)
>          +- Exchange (4)
>             +- HashAggregate (3)
>                +- Project (2)
>                   +- Scan parquet  (1)
> {noformat}
>  
> Based on this observation, we concluded that the difference in performance is 
> related to {*}SortAggregate versus HashAggregate{*}.
> Is this line of thinking correct? How we can to enforce the use of 
> HashAggregate instead of SortAggregate?
> *The final result is somewhat counterintuitive* because deduplicating using 
> only one column should theoretically be faster, as it provides a simpler way 
> to compare rows and determine duplicates.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to