Allen George created SPARK-19981: ------------------------------------ Summary: Sort-Merge join inserts shuffles when joining dataframes with aliased columns Key: SPARK-19981 URL: https://issues.apache.org/jira/browse/SPARK-19981 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.2 Reporter: Allen George
Performing a sort-merge join with two dataframes - each of which has the join column aliased - causes Spark to insert an unnecessary shuffle. Consider the scala test code below, which should be equivalent to the following SQL. {code:SQL} SELECT * FROM (SELECT number AS aliased from df1) t1 LEFT JOIN (SELECT number AS aliased from df2) t2 ON t1.aliased = t2.aliased {code} {code:scala} private case class OneItem(number: Long) private case class TwoItem(number: Long, value: String) test("join with aliases should not trigger shuffle") { val df1 = sqlContext.createDataFrame( Seq( OneItem(0), OneItem(2), OneItem(4) ) ) val partitionedDf1 = df1.repartition(10, col("number")) partitionedDf1.createOrReplaceTempView("df1") partitionedDf1.cache() partitionedDf1.count() val df2 = sqlContext.createDataFrame( Seq( TwoItem(0, "zero"), TwoItem(2, "two"), TwoItem(4, "four") ) ) val partitionedDf2 = df2.repartition(10, col("number")) partitionedDf2.createOrReplaceTempView("df2") partitionedDf2.cache() partitionedDf2.count() val fromDf1 = sqlContext.sql("SELECT number from df1") val fromDf2 = sqlContext.sql("SELECT number from df2") val aliasedDf1 = fromDf1.select(col(fromDf1.columns.head) as "aliased") val aliasedDf2 = fromDf2.select(col(fromDf2.columns.head) as "aliased") aliasedDf1.join(aliasedDf2, Seq("aliased"), "left_outer") } {code} Both the SQL and the Scala code generate a query-plan where an extra exchange is inserted before performing the sort-merge join. This exchange changes the partitioning from {{HashPartitioning("number", 10)}} for each frame being joined into {{HashPartitioning("aliased", 5)}}. I would have expected that since it's a simple column aliasing, and both frames have exactly the same partitioning that the initial frames. {noformat} *Project [args=[aliased#267L]][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 5)%NONNULL)][outOrder=List(aliased#267L ASC%NONNULL)][output=List(aliased#267:bigint%NONNULL)] +- *SortMergeJoin [args=[aliased#267L], [aliased#270L], Inner][outPart=PartitioningCollection(5, hashpartitioning(aliased#267L, 5)%NONNULL,hashpartitioning(aliased#270L, 5)%NONNULL)][outOrder=List(aliased#267L ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL, aliased#270:bigint%NONNULL)] :- *Sort [args=[aliased#267L ASC], false, 0][outPart=HashPartitioning(5, aliased#267:bigint%NONNULL)][outOrder=List(aliased#267L ASC%NONNULL)][output=ArrayBuffer(aliased#267:bigint%NONNULL)] : +- Exchange [args=hashpartitioning(aliased#267L, 5)%NONNULL][outPart=HashPartitioning(5, aliased#267:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] : +- *Project [args=[number#198L AS aliased#267L]][outPart=HashPartitioning(10, number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#267:bigint%NONNULL)] : +- InMemoryTableScan [args=[number#198L]][outPart=HashPartitioning(10, number#198:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#198:bigint%NONNULL)] : : +- InMemoryRelation [number#198L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), false[Statistics(24,false)][output=List(number#198:bigint%NONNULL)] : : : +- Exchange [args=hashpartitioning(number#198L, 10)%NONNULL][outPart=HashPartitioning(10, number#198:bigint%NONNULL)][outOrder=List()][output=List(number#198:bigint%NONNULL)] : : : +- LocalTableScan [args=[number#198L]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#198:bigint%NONNULL)] +- *Sort [args=[aliased#270L ASC], false, 0][outPart=HashPartitioning(5, aliased#270:bigint%NONNULL)][outOrder=List(aliased#270L ASC%NONNULL)][output=ArrayBuffer(aliased#270:bigint%NONNULL)] +- Exchange [args=hashpartitioning(aliased#270L, 5)%NONNULL][outPart=HashPartitioning(5, aliased#270:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)] +- *Project [args=[number#223L AS aliased#270L]][outPart=HashPartitioning(10, number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(aliased#270:bigint%NONNULL)] +- InMemoryTableScan [args=[number#223L]][outPart=HashPartitioning(10, number#223:bigint%NONNULL)][outOrder=List()][output=ArrayBuffer(number#223:bigint%NONNULL)] : +- InMemoryRelation [number#223L, value#224], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), false[Statistics(47,false)][output=List(number#223:bigint%NONNULL, value#224:string%NULL)] : : +- Exchange [args=hashpartitioning(number#223L, 10)%NONNULL][outPart=HashPartitioning(10, number#223:bigint%NONNULL)][outOrder=List()][output=List(number#223:bigint%NONNULL, value#224:string%NULL)] : : +- LocalTableScan [args=[number#223L, value#224]][outPart=UnknownPartitioning(0)][outOrder=List()][output=List(number#223:bigint%NONNULL, value#224:string%NULL)] {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org