[
https://issues.apache.org/jira/browse/SPARK-22223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-22223.
---------------------------------
Resolution: Fixed
Fix Version/s: 2.3.0
2.2.1
Issue resolved by pull request 19501
[https://github.com/apache/spark/pull/19501]
> ObjectHashAggregate introduces unnecessary shuffle
> --------------------------------------------------
>
> Key: SPARK-22223
> URL: https://issues.apache.org/jira/browse/SPARK-22223
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and following.
> {{spark.sql.execution.useObjectHashAggregateExec = true}}
> Reporter: Michele Costantino Soccio
> Fix For: 2.2.1, 2.3.0
>
>
> Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of
> unnecessary shuffle when the partitions at previous step are based on looser
> criteria than the current {{groupBy}}.
> For example:
> {code:java}
> //sample data from
> https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
> //Read the data and repartitions by "Country"
> val retailDF = spark.sql("Select * from online_retail")
> .repartition(col("Country"))
> //Group the data and collect.
> val aggregatedDF = retailDF
> .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
> .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
> .agg(collect_list("Good").as("Goods"))
> .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
> .groupBy("Country", "CustomerID")
> .agg(collect_list("Invoice").as("Invoices"))
> .withColumn("Customer", expr("(CustomerID, Invoices)"))
> .groupBy("Country")
> .agg(collect_list("Customer").as("Customers"))
> {code}
> Without disabling the {{ObjectHashAggregate}} one gets the following physical
> plan:
> {noformat}
> == Physical Plan ==
> ObjectHashAggregate(keys=[Country#14],
> functions=[finalmerge_collect_list(merge buf#317) AS
> collect_list(Customer#299, 0, 0)#310])
> +- Exchange hashpartitioning(Country#14, 200)
> +- ObjectHashAggregate(keys=[Country#14],
> functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
> +- *Project [Country#14, named_struct(CustomerID, CustomerID#13,
> Invoices, Invoices#294) AS Customer#299]
> +- ObjectHashAggregate(keys=[Country#14, CustomerID#13],
> functions=[finalmerge_collect_list(merge buf#319) AS
> collect_list(Invoice#278, 0, 0)#293])
> +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
> +- ObjectHashAggregate(keys=[Country#14, CustomerID#13],
> functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
> +- *Project [Country#14, CustomerID#13,
> named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods,
> Goods#271) AS Invoice#278]
> +- ObjectHashAggregate(keys=[Country#14, CustomerID#13,
> InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge
> buf#321) AS collect_list(Good#249, 0, 0)#270])
> +- Exchange hashpartitioning(Country#14,
> CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
> +- ObjectHashAggregate(keys=[Country#14,
> CustomerID#13, InvoiceNo#7, InvoiceDate#11],
> functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
> +- *Project [InvoiceNo#7, InvoiceDate#11,
> CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice,
> UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
> +- Exchange hashpartitioning(Country#14, 200)
> +- *FileScan csv
> default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14]
> Batched: false, Format: CSV, Location:
> InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438],
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more
> efficient:
> {noformat}
> == Physical Plan ==
> SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge
> buf#3834) AS collect_list(Customer#299, 0, 0)#310])
> +- SortAggregate(key=[Country#14],
> functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
> +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices,
> Invoices#294) AS Customer#299]
> +- SortAggregate(key=[Country#14, CustomerID#13],
> functions=[finalmerge_collect_list(merge buf#319) AS
> collect_list(Invoice#278, 0, 0)#293])
> +- SortAggregate(key=[Country#14, CustomerID#13],
> functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
> +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo,
> InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
> +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7,
> InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS
> collect_list(Good#249, 0, 0)#270])
> +- SortAggregate(key=[Country#14, CustomerID#13,
> InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0)
> AS buf#321])
> +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC
> NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST],
> false, 0
> +- *Project [InvoiceNo#7, InvoiceDate#11,
> CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice,
> UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
> +- Exchange hashpartitioning(Country#14, 200)
> +- *FileScan csv
> default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14]
> Batched: false, Format: CSV, Location:
> InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438],
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> In this example, a quick run on DataBricks Notebook showed that by manually
> disabling the {{ObjectHashAggregate}} one gets around 16s execution time
> versus the 25s needed when {{ObjectHashAggregate}} is enabled.
> The use of the {{ObjectHashAggregate}} in the {{groupBy}} was introduced with
> SPARK-17949.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]