[ https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308970#comment-16308970 ]
Liang-Chi Hsieh commented on SPARK-22898: ----------------------------------------- If no problem I will resolve this as duplicate. You can re-open it if you have other questions. > collect_set aggregation on bucketed table causes an exchange stage > ------------------------------------------------------------------ > > Key: SPARK-22898 > URL: https://issues.apache.org/jira/browse/SPARK-22898 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.0 > Reporter: Modi Tamam > Labels: bucketing > > I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed > table, here's the desc formatted my_bucketed_tbl output: > +--------------------+--------------------+-------+ > | col_nam| data_type|comment| > +--------------------+--------------------+-------+ > | bundle| string| null| > | ifa| string| null| > | date_| date| null| > | hour| int| null| > | | | | > |# Detailed Table ...| | | > | Database| default| | > | Table| my_bucketed_tbl| > | Owner| zeppelin| | > | Created|Thu Dec 21 13:43:...| | > | Last Access|Thu Jan 01 00:00:...| | > | Type| EXTERNAL| | > | Provider| orc| | > | Num Buckets| 16| | > | Bucket Columns| [`ifa`]| | > | Sort Columns| [`ifa`]| | > | Table Properties|[transient_lastDd...| | > | Location|hdfs:/user/hive/w...| | > | Serde Library|org.apache.hadoop...| | > | InputFormat|org.apache.hadoop...| | > | OutputFormat|org.apache.hadoop...| | > | Storage Properties|[serialization.fo...| | > +--------------------+--------------------+-------+ > When I'm executing an explain of a group by query, I can see that we've > spared the exchange phase : > {code:java} > sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain > == Physical Plan == > SortAggregate(key=[ifa#932], functions=[max(bundle#920)]) > +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)]) > +- *Sort [ifa#932 ASC NULLS FIRST], false, 0 > +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, > Format: ORC, Location: > InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<bundle:string,ifa:string> > {code} > But, when I replace Spark's max function with collect_set, I can see that the > execution plan is the same as a non-bucketed table, means, the exchange phase > is not spared : > {code:java} > sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by > ifa").explain > == Physical Plan == > ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, > 0)]) > +- Exchange hashpartitioning(ifa#1010, 200) > +- ObjectHashAggregate(keys=[ifa#1010], > functions=[partial_collect_set(bundle#998, 0, 0)]) > +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, > Format: ORC, Location: > InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct<bundle:string,ifa:string > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org