[ https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496178#comment-16496178 ]
Liang-Chi Hsieh commented on SPARK-24410: ----------------------------------------- Yeah, it depends on how we combine the RDDs from Union's children. Currently {{SparkContext.union}} doesn't produce the results that this issue wants to have. So I will leave {{UnionExec#outputPartitioning}} untouched for now. > Missing optimization for Union on bucketed tables > ------------------------------------------------- > > Key: SPARK-24410 > URL: https://issues.apache.org/jira/browse/SPARK-24410 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.3.0 > Reporter: Ohad Raviv > Priority: Major > > A common use-case we have is of a partially aggregated table and daily > increments that we need to further aggregate. we do this my unioning the two > tables and aggregating again. > we tried to optimize this process by bucketing the tables, but currently it > seems that the union operator doesn't leverage the tables being bucketed > (like the join operator). > for example, for two bucketed tables a1,a2: > {code} > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write > .mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a1") > sparkSession.range(N).selectExpr( > "id as key", > "id % 2 as t1", > "id % 3 as t2") > .repartition(col("key")) > .write.mode(SaveMode.Overwrite) > .bucketBy(3, "key") > .sortBy("t1") > .saveAsTable("a2") > {code} > for the join query we get the "SortMergeJoin" > {code} > select * from a1 join a2 on (a1.key=a2.key) > == Physical Plan == > *(3) SortMergeJoin [key#24L], [key#27L], Inner > :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0 > : +- *(1) Project [key#24L, t1#25L, t2#26L] > : +- *(1) Filter isnotnull(key#24L) > : +- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct<key:bigint,t1:bigint,t2:bigint> > +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0 > +- *(2) Project [key#27L, t1#28L, t2#29L] > +- *(2) Filter isnotnull(key#27L) > +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [IsNotNull(key)], ReadSchema: > struct<key:bigint,t1:bigint,t2:bigint> > {code} > but for aggregation after union we get a shuffle: > {code} > select key,count(*) from (select * from a1 union all select * from a2)z group > by key > == Physical Plan == > *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, > count(1)#36L]) > +- Exchange hashpartitioning(key#25L, 1) > +- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], > output=[key#25L, count#38L]) > +- Union > :- *(1) Project [key#25L] > : +- *(1) FileScan parquet default.a1[key#25L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct<key:bigint> > +- *(2) Project [key#28L] > +- *(2) FileScan parquet default.a2[key#28L] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], > PushedFilters: [], ReadSchema: struct<key:bigint> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org