[ 
https://issues.apache.org/jira/browse/SPARK-44156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emanuel Velzi updated SPARK-44156:
----------------------------------
    Description: 
TL;DR: SortAggregate makes dropDuplicates slower than HashAggregate.

How to make Spark to use HashAggregate over SortAggregate? 

----------------------

We have a Spark cluster running on Kubernetes with the following configurations:
 * Spark v3.3.2
 * Hadoop 3.3.4
 * Java 17

We are running a simple job on a dataset (~6GBi) with almost 600 columns, many 
of which contain null values. The job involves the following steps:
 # Load data from S3.
 # Apply dropDuplicates().
 # Save the deduplicated data back to S3 using magic committers.

One of the columns is of type "map". When we run dropDuplicates() without 
specifying any parameters (i.e., using all columns), it throws an error:

 
{noformat}
Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot have 
map type columns in DataFrame which calls set operations(intersect, except, 
etc.), but the type of column my_column is 
map<string,array<struct<ci:string,co:string,cur:string,Getaways_Zone:string,Getaways_ID:string>>>;{noformat}
 

To overcome this issue, we used "dropDuplicates(id)" by specifying an 
identifier column.

However, the performance of this method was {*}much worse than expected{*}, 
taking around 30 minutes.

As an alternative approach, we tested converting the "map" column to JSON, 
applying dropDuplicates() without parameters, and then converting the column 
back to "map" format:

 
{code:java}
DataType t = ds.schema().apply("my_column").dataType();
ds = ds.withColumn("my_column", functions.to_json(ds.col("my_column")));
ds = ds.dropDuplicates();
ds = ds.withColumn("my_column", functions.from_json(ds.col("my_column"),t)); 
{code}
 

Surprisingly, this approach {*}significantly improved the performance{*}, 
reducing the execution time to 7 minutes.

The only noticeable difference was in the execution plan. In the *slower* case, 
the execution plan involved {*}SortAggregate{*}, while in the *faster* case, it 
involved {*}HashAggregate{*}.

 
{noformat}
== Physical Plan [slow case] == 
Execute InsertIntoHadoopFsRelationCommand (13)
+- AdaptiveSparkPlan (12)
   +- == Final Plan ==
      Coalesce (8)
      +- SortAggregate (7)
         +- Sort (6)
            +- ShuffleQueryStage (5), Statistics(sizeInBytes=141.3 GiB, 
rowCount=1.25E+7)
               +- Exchange (4)
                  +- SortAggregate (3)
                     +- Sort (2)
                        +- Scan parquet  (1)
   +- == Initial Plan ==
      Coalesce (11)
      +- SortAggregate (10)
         +- Sort (9)
            +- Exchange (4)
               +- SortAggregate (3)
                  +- Sort (2)
                     +- Scan parquet  (1)
{noformat}
{noformat}
== Physical Plan [fast case] ==
Execute InsertIntoHadoopFsRelationCommand (11)
+- AdaptiveSparkPlan (10)
   +- == Final Plan ==
      Coalesce (7)
      +- HashAggregate (6)
         +- ShuffleQueryStage (5), Statistics(sizeInBytes=81.6 GiB, 
rowCount=1.25E+7)
            +- Exchange (4)
               +- HashAggregate (3)
                  +- Project (2)
                     +- Scan parquet  (1)
   +- == Initial Plan ==
      Coalesce (9)
      +- HashAggregate (8)
         +- Exchange (4)
            +- HashAggregate (3)
               +- Project (2)
                  +- Scan parquet  (1)
{noformat}
 

Based on this observation, we concluded that the difference in performance is 
related to {*}SortAggregate vs. HashAggregate{*}.

Is this line of thinking correct? How we can to enforce the use of 
HashAggregate instead of SortAggregate {*}even using one colum to 
deduplicate{*}?

*The final result is somewhat counterintuitive* because deduplicating using 
only one column should theoretically be faster, as it provides a simpler way to 
compare rows and determine duplicates.

 

  was:
TL;DR: SortAggregate makes dropDuplicates slower than HashAggregate.

How to make Spark to use HashAggregate over SortAggregate? 

----------------------

We have a Spark cluster running on Kubernetes with the following configurations:
 * Spark v3.3.2
 * Hadoop 3.3.4
 * Java 17

We are running a simple job on a dataset (~6GBi) with almost 600 columns, many 
of which contain null values. The job involves the following steps:
 # Load data from S3.
 # Apply dropDuplicates().
 # Save the deduplicated data back to S3 using magic committers.

One of the columns is of type "map". When we run dropDuplicates() without 
specifying any parameters (i.e., using all columns), it throws an error:

 
{noformat}
Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot have 
map type columns in DataFrame which calls set operations(intersect, except, 
etc.), but the type of column my_column is 
map<string,array<struct<ci:string,co:string,cur:string,Getaways_Zone:string,Getaways_ID:string>>>;{noformat}
 

To overcome this issue, we used "dropDuplicates(id)" by specifying an 
identifier column.

However, the performance of this method was {*}much worse than expected{*}, 
taking around 30 minutes.

As an alternative approach, we tested converting the "map" column to JSON, 
applying dropDuplicates() without parameters, and then converting the column 
back to "map" format:

 
{code:java}
DataType t = ds.schema().apply("my_column").dataType();
ds = ds.withColumn("my_column", functions.to_json(ds.col("my_column")));
ds = ds.dropDuplicates();
ds = ds.withColumn("my_column", functions.from_json(ds.col("my_column"),t)); 
{code}
 

Surprisingly, this approach {*}significantly improved the performance{*}, 
reducing the execution time to 7 minutes.

The only noticeable difference was in the execution plan. In the *slower* case, 
the execution plan involved {*}SortAggregate{*}, while in the *faster* case, it 
involved {*}HashAggregate{*}.

 
{noformat}
== Physical Plan [slow case] == 
Execute InsertIntoHadoopFsRelationCommand (13)
+- AdaptiveSparkPlan (12)
   +- == Final Plan ==
      Coalesce (8)
      +- SortAggregate (7)
         +- Sort (6)
            +- ShuffleQueryStage (5), Statistics(sizeInBytes=141.3 GiB, 
rowCount=1.25E+7)
               +- Exchange (4)
                  +- SortAggregate (3)
                     +- Sort (2)
                        +- Scan parquet  (1)
   +- == Initial Plan ==
      Coalesce (11)
      +- SortAggregate (10)
         +- Sort (9)
            +- Exchange (4)
               +- SortAggregate (3)
                  +- Sort (2)
                     +- Scan parquet  (1)
{noformat}
{noformat}
== Physical Plan [fast case] ==
Execute InsertIntoHadoopFsRelationCommand (11)
+- AdaptiveSparkPlan (10)
   +- == Final Plan ==
      Coalesce (7)
      +- HashAggregate (6)
         +- ShuffleQueryStage (5), Statistics(sizeInBytes=81.6 GiB, 
rowCount=1.25E+7)
            +- Exchange (4)
               +- HashAggregate (3)
                  +- Project (2)
                     +- Scan parquet  (1)
   +- == Initial Plan ==
      Coalesce (9)
      +- HashAggregate (8)
         +- Exchange (4)
            +- HashAggregate (3)
               +- Project (2)
                  +- Scan parquet  (1)
{noformat}
 

Based on this observation, we concluded that the difference in performance is 
related to {*}SortAggregate vs. HashAggregate{*}.

Is this line of thinking correct? How we can to enforce the use of 
HashAggregate instead of SortAggregate?

*The final result is somewhat counterintuitive* because deduplicating using 
only one column should theoretically be faster, as it provides a simpler way to 
compare rows and determine duplicates.

 


> SortAggregation slows down dropDuplicates()
> -------------------------------------------
>
>                 Key: SPARK-44156
>                 URL: https://issues.apache.org/jira/browse/SPARK-44156
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.3.2
>            Reporter: Emanuel Velzi
>            Priority: Major
>
> TL;DR: SortAggregate makes dropDuplicates slower than HashAggregate.
> How to make Spark to use HashAggregate over SortAggregate? 
> ----------------------
> We have a Spark cluster running on Kubernetes with the following 
> configurations:
>  * Spark v3.3.2
>  * Hadoop 3.3.4
>  * Java 17
> We are running a simple job on a dataset (~6GBi) with almost 600 columns, 
> many of which contain null values. The job involves the following steps:
>  # Load data from S3.
>  # Apply dropDuplicates().
>  # Save the deduplicated data back to S3 using magic committers.
> One of the columns is of type "map". When we run dropDuplicates() without 
> specifying any parameters (i.e., using all columns), it throws an error:
>  
> {noformat}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> have map type columns in DataFrame which calls set operations(intersect, 
> except, etc.), but the type of column my_column is 
> map<string,array<struct<ci:string,co:string,cur:string,Getaways_Zone:string,Getaways_ID:string>>>;{noformat}
>  
> To overcome this issue, we used "dropDuplicates(id)" by specifying an 
> identifier column.
> However, the performance of this method was {*}much worse than expected{*}, 
> taking around 30 minutes.
> As an alternative approach, we tested converting the "map" column to JSON, 
> applying dropDuplicates() without parameters, and then converting the column 
> back to "map" format:
>  
> {code:java}
> DataType t = ds.schema().apply("my_column").dataType();
> ds = ds.withColumn("my_column", functions.to_json(ds.col("my_column")));
> ds = ds.dropDuplicates();
> ds = ds.withColumn("my_column", functions.from_json(ds.col("my_column"),t)); 
> {code}
>  
> Surprisingly, this approach {*}significantly improved the performance{*}, 
> reducing the execution time to 7 minutes.
> The only noticeable difference was in the execution plan. In the *slower* 
> case, the execution plan involved {*}SortAggregate{*}, while in the *faster* 
> case, it involved {*}HashAggregate{*}.
>  
> {noformat}
> == Physical Plan [slow case] == 
> Execute InsertIntoHadoopFsRelationCommand (13)
> +- AdaptiveSparkPlan (12)
>    +- == Final Plan ==
>       Coalesce (8)
>       +- SortAggregate (7)
>          +- Sort (6)
>             +- ShuffleQueryStage (5), Statistics(sizeInBytes=141.3 GiB, 
> rowCount=1.25E+7)
>                +- Exchange (4)
>                   +- SortAggregate (3)
>                      +- Sort (2)
>                         +- Scan parquet  (1)
>    +- == Initial Plan ==
>       Coalesce (11)
>       +- SortAggregate (10)
>          +- Sort (9)
>             +- Exchange (4)
>                +- SortAggregate (3)
>                   +- Sort (2)
>                      +- Scan parquet  (1)
> {noformat}
> {noformat}
> == Physical Plan [fast case] ==
> Execute InsertIntoHadoopFsRelationCommand (11)
> +- AdaptiveSparkPlan (10)
>    +- == Final Plan ==
>       Coalesce (7)
>       +- HashAggregate (6)
>          +- ShuffleQueryStage (5), Statistics(sizeInBytes=81.6 GiB, 
> rowCount=1.25E+7)
>             +- Exchange (4)
>                +- HashAggregate (3)
>                   +- Project (2)
>                      +- Scan parquet  (1)
>    +- == Initial Plan ==
>       Coalesce (9)
>       +- HashAggregate (8)
>          +- Exchange (4)
>             +- HashAggregate (3)
>                +- Project (2)
>                   +- Scan parquet  (1)
> {noformat}
>  
> Based on this observation, we concluded that the difference in performance is 
> related to {*}SortAggregate vs. HashAggregate{*}.
> Is this line of thinking correct? How we can to enforce the use of 
> HashAggregate instead of SortAggregate {*}even using one colum to 
> deduplicate{*}?
> *The final result is somewhat counterintuitive* because deduplicating using 
> only one column should theoretically be faster, as it provides a simpler way 
> to compare rows and determine duplicates.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to