[ https://issues.apache.org/jira/browse/SPARK-44156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Emanuel Velzi updated SPARK-44156: ---------------------------------- Summary: SortAggregation slows down dropDuplicates() (was: SortAggregation slows down dropDuplicates().) > SortAggregation slows down dropDuplicates() > ------------------------------------------- > > Key: SPARK-44156 > URL: https://issues.apache.org/jira/browse/SPARK-44156 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.3.2 > Reporter: Emanuel Velzi > Priority: Major > > TL;DR: SortAggregate makes dropDuplicates slower than HashAggregate. > How to make Spark to use HashAggregate over SortAggregate? > ---------------------- > We have a Spark cluster running on Kubernetes with the following > configurations: > * Spark v3.3.2 > * Hadoop 3.3.4 > * Java 17 > We are running a simple job on a dataset (~6GBi) with almost 600 columns, > many of which contain null values. The job involves the following steps: > # Load data from S3. > # Apply dropDuplicates(). > # Save the deduplicated data back to S3 using magic committers. > One of the columns is of type "map". When we run dropDuplicates() without > specifying any parameters (i.e., using all columns), it throws an error: > > {noformat} > Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot > have map type columns in DataFrame which calls set operations(intersect, > except, etc.), but the type of column my_column is > map<string,array<struct<ci:string,co:string,cur:string,Getaways_Zone:string,Getaways_ID:string>>>;{noformat} > > To overcome this issue, we used "dropDuplicates(id)" by specifying an > identifier column. > However, the performance of this method was {*}much worse than expected{*}, > taking around 30 minutes. > As an alternative approach, we tested converting the "map" column to JSON, > applying dropDuplicates() without parameters, and then converting the column > back to "map" format: > > {code:java} > DataType t = ds.schema().apply("my_column").dataType(); > ds = ds.withColumn("my_column", functions.to_json(ds.col("my_column"))); > ds = ds.dropDuplicates(); > ds = ds.withColumn("my_column", functions.from_json(ds.col("my_column"),t)); > {code} > > Surprisingly, this approach {*}significantly improved the performance{*}, > reducing the execution time to 7 minutes. > The only noticeable difference was in the execution plan. In the *slower* > case, the execution plan involved {*}SortAggregate{*}, while in the *faster* > case, it involved {*}HashAggregate{*}. > > {noformat} > == Physical Plan [slow case] == > Execute InsertIntoHadoopFsRelationCommand (13) > +- AdaptiveSparkPlan (12) > +- == Final Plan == > Coalesce (8) > +- SortAggregate (7) > +- Sort (6) > +- ShuffleQueryStage (5), Statistics(sizeInBytes=141.3 GiB, > rowCount=1.25E+7) > +- Exchange (4) > +- SortAggregate (3) > +- Sort (2) > +- Scan parquet (1) > +- == Initial Plan == > Coalesce (11) > +- SortAggregate (10) > +- Sort (9) > +- Exchange (4) > +- SortAggregate (3) > +- Sort (2) > +- Scan parquet (1) > {noformat} > {noformat} > == Physical Plan [fast case] == > Execute InsertIntoHadoopFsRelationCommand (11) > +- AdaptiveSparkPlan (10) > +- == Final Plan == > Coalesce (7) > +- HashAggregate (6) > +- ShuffleQueryStage (5), Statistics(sizeInBytes=81.6 GiB, > rowCount=1.25E+7) > +- Exchange (4) > +- HashAggregate (3) > +- Project (2) > +- Scan parquet (1) > +- == Initial Plan == > Coalesce (9) > +- HashAggregate (8) > +- Exchange (4) > +- HashAggregate (3) > +- Project (2) > +- Scan parquet (1) > {noformat} > > Based on this observation, we concluded that the difference in performance is > related to {*}SortAggregate versus HashAggregate{*}. > Is this line of thinking correct? How we can to enforce the use of > HashAggregate instead of SortAggregate? > *The final result is somewhat counterintuitive* because deduplicating using > only one column should theoretically be faster, as it provides a simpler way > to compare rows and determine duplicates. > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org