[ https://issues.apache.org/jira/browse/SPARK-20073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938810#comment-15938810 ]
Everett Anderson commented on SPARK-20073: ------------------------------------------ With the local master in spark-shell and cross joins enabled, here are the query plans for the cases above: {noformat} innerJoinSqlNullSafeEqOp.explain == Physical Plan == CartesianProduct :- *Filter (variant_count#11L > 1) : +- *HashAggregate(keys=[name#3], functions=[count(distinct named_struct(name, name#3, group, group#4, data, data#5)#172)]) : +- Exchange hashpartitioning(name#3, 200) : +- *HashAggregate(keys=[name#3], functions=[partial_count(distinct named_struct(name, name#3, group, group#4, data, data#5)#172)]) : +- *HashAggregate(keys=[name#3, named_struct(name, name#3, group, group#4, data, data#5)#172], functions=[]) : +- Exchange hashpartitioning(name#3, named_struct(name, name#3, group, group#4, data, data#5)#172, 200) : +- *HashAggregate(keys=[name#3, named_struct(name, name#3, group, group#4, data, data#5) AS named_struct(name, name#3, group, group#4, data, data#5)#172], functions=[]) : +- Scan ExistingRDD[name#3,group#4,data#5] +- Scan ExistingRDD[name#103,group#104,data#105] {noformat} vs {noformat} manualVarCountsInnerJoinSqlNullSafeEqOp.explain == Physical Plan == *SortMergeJoin [coalesce(name#139, )], [coalesce(name#3, )], Inner, (name#139 <=> name#3) :- *Sort [coalesce(name#139, ) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(name#139, ), 200) : +- Scan ExistingRDD[name#139,variant_count#140] +- *Sort [coalesce(name#3, ) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(name#3, ), 200) +- Scan ExistingRDD[name#3,group#4,data#5] {noformat} > Unexpected Cartesian product when using eqNullSafe in join with a derived > table > ------------------------------------------------------------------------------- > > Key: SPARK-20073 > URL: https://issues.apache.org/jira/browse/SPARK-20073 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 2.0.2, 2.1.0 > Reporter: Everett Anderson > > It appears that if you try to join tables A and B when B is derived from A > and you use the eqNullSafe / <=> operator for the join condition, Spark > performs a Cartesian product. > However, if you perform the join on tables of the same data when they don't > have a relationship, the expected non-Cartesian product join occurs. > {noformat} > // Create some fake data. > import org.apache.spark.sql.Row > import org.apache.spark.sql.Dataset > import org.apache.spark.sql.types._ > import org.apache.spark.sql.functions > val peopleRowsRDD = sc.parallelize(Seq( > Row("Fred", 8, 1), > Row("Fred", 8, 2), > Row(null, 10, 3), > Row(null, 10, 4), > Row("Amy", 12, 5), > Row("Amy", 12, 6))) > > val peopleSchema = StructType(Seq( > StructField("name", StringType, nullable = true), > StructField("group", IntegerType, nullable = true), > StructField("data", IntegerType, nullable = true))) > > val people = spark.createDataFrame(peopleRowsRDD, peopleSchema) > people.createOrReplaceTempView("people") > scala> people.show > +----+-----+----+ > |name|group|data| > +----+-----+----+ > |Fred| 8| 1| > |Fred| 8| 2| > |null| 10| 3| > |null| 10| 4| > | Amy| 12| 5| > | Amy| 12| 6| > +----+-----+----+ > // Now create a derived table from that table. It doesn't matter much what. > val variantCounts = spark.sql("select name, count(distinct(name, group, > data)) as variant_count from people group by name having variant_count > 1") > variantCounts.show > +----+-------------+ > > |name|variant_count| > +----+-------------+ > |Fred| 2| > |null| 2| > | Amy| 2| > +----+-------------+ > // Now try an inner join using the regular equalTo that drops nulls. This > works fine. > val innerJoinEqualTo = variantCounts.join(people, > variantCounts("name").equalTo(people("name"))) > innerJoinEqualTo.show > +----+-------------+----+-----+----+ > > |name|variant_count|name|group|data| > +----+-------------+----+-----+----+ > |Fred| 2|Fred| 8| 1| > |Fred| 2|Fred| 8| 2| > | Amy| 2| Amy| 12| 5| > | Amy| 2| Amy| 12| 6| > +----+-------------+----+-----+----+ > // Okay now lets switch to the <=> operator > // > // If you haven't set spark.sql.crossJoin.enabled=true, you'll get an error > like > // "Cartesian joins could be prohibitively expensive and are disabled by > default. To explicitly enable them, please set spark.sql.crossJoin.enabled = > true;" > // > // if you have enabled them, you'll get the table below. > // > // However, we really don't want or expect a Cartesian product! > val innerJoinSqlNullSafeEqOp = variantCounts.join(people, > variantCounts("name")<=>(people("name"))) > innerJoinSqlNullSafeEqOp.show > +----+-------------+----+-----+----+ > > |name|variant_count|name|group|data| > +----+-------------+----+-----+----+ > |Fred| 2|Fred| 8| 1| > |Fred| 2|Fred| 8| 2| > |Fred| 2|null| 10| 3| > |Fred| 2|null| 10| 4| > |Fred| 2| Amy| 12| 5| > |Fred| 2| Amy| 12| 6| > |null| 2|Fred| 8| 1| > |null| 2|Fred| 8| 2| > |null| 2|null| 10| 3| > |null| 2|null| 10| 4| > |null| 2| Amy| 12| 5| > |null| 2| Amy| 12| 6| > | Amy| 2|Fred| 8| 1| > | Amy| 2|Fred| 8| 2| > | Amy| 2|null| 10| 3| > | Amy| 2|null| 10| 4| > | Amy| 2| Amy| 12| 5| > | Amy| 2| Amy| 12| 6| > +----+-------------+----+-----+----+ > // Okay, let's try to construct the exact same variantCount table manually > // so it has no relationship to the original. > val variantCountRowsRDD = sc.parallelize(Seq( > Row("Fred", 2), > Row(null, 2), > Row("Amy", 2))) > > val variantCountSchema = StructType(Seq( > StructField("name", StringType, nullable = true), > StructField("variant_count", IntegerType, nullable = true))) > > val manualVariantCounts = spark.createDataFrame(variantCountRowsRDD, > variantCountSchema) > // Now perform the same join with the null-safe equals operator. > val manualVarCountsInnerJoinSqlNullSafeEqOp = > manualVariantCounts.join(people, > manualVariantCounts("name")<=>(people("name"))) > manualVarCountsInnerJoinSqlNullSafeEqOp.show > +----+-------------+----+-----+----+ > |name|variant_count|name|group|data| > +----+-------------+----+-----+----+ > |Fred| 2|Fred| 8| 1| > |Fred| 2|Fred| 8| 2| > | Amy| 2| Amy| 12| 5| > | Amy| 2| Amy| 12| 6| > |null| 2|null| 10| 3| > |null| 2|null| 10| 4| > +----+-------------+----+-----+----+ > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org