[ 
https://issues.apache.org/jira/browse/SPARK-6231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375130#comment-14375130
 ] 

Michael Armbrust commented on SPARK-6231:
-----------------------------------------

After a bunch of investigation I'm going to bump this to 1.4 as fixing it is 
going to require significant changes to analysis.  The query is actually 
running correctly.  The problem is that {{txns("cust_id") === 
spend("cust_id")}} is always {{true}} as there is no way for the optimizer to 
differentiate these columns since they are actually identical.  We would need 
to change the DataFrame API to construct new expression IDs for each operation 
to handle this case correctly.  For now, use the workaround specified above 
with aliases.

Additionally, [SPARK-6459] will give a warning when users hit this case

> Join on two tables (generated from same one) is broken
> ------------------------------------------------------
>
>                 Key: SPARK-6231
>                 URL: https://issues.apache.org/jira/browse/SPARK-6231
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.3.0
>            Reporter: Davies Liu
>            Assignee: Michael Armbrust
>            Priority: Blocker
>              Labels: DataFrame
>
> If the two column used in joinExpr come from the same table, they have the 
> same id, then the joniExpr is explained in wrong way.
> {code}
> val df = sqlContext.load(path, "parquet")
> val txns = df.groupBy("cust_id").agg($"cust_id", 
> countDistinct($"day_num").as("txns"))
> val spend = df.groupBy("cust_id").agg($"cust_id", 
> sum($"extended_price").as("spend"))
> val rmJoin = txns.join(spend, txns("cust_id") === spend("cust_id"), "inner")
> scala> rmJoin.explain
> == Physical Plan ==
> CartesianProduct
>  Filter (cust_id#0 = cust_id#0)
>   Aggregate false, [cust_id#0], [cust_id#0,CombineAndCount(partialSets#25) AS 
> txns#7L]
>    Exchange (HashPartitioning [cust_id#0], 200)
>     Aggregate true, [cust_id#0], [cust_id#0,AddToHashSet(day_num#2L) AS 
> partialSets#25]
>      PhysicalRDD [cust_id#0,day_num#2L], MapPartitionsRDD[1] at map at 
> newParquet.scala:542
>  Aggregate false, [cust_id#17], [cust_id#17,SUM(PartialSum#38) AS spend#8]
>   Exchange (HashPartitioning [cust_id#17], 200)
>    Aggregate true, [cust_id#17], [cust_id#17,SUM(extended_price#20) AS 
> PartialSum#38]
>     PhysicalRDD [cust_id#17,extended_price#20], MapPartitionsRDD[3] at map at 
> newParquet.scala:542
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to