[
https://issues.apache.org/jira/browse/SPARK-17154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529552#comment-15529552
]
Kousuke Saruta commented on SPARK-17154:
----------------------------------------
[~cloud_fan] Sorry I forgot to answer your question.
For the first one, I added a test case related to indirect-self-join to
`DataFrameSuite` and the existing `HiveDataframeSuite` already has a test case
related to direct-self-join.
For the second one, I ran a job like you mentioned and got result as follows.
{code}
val df1 = Seq((1, "a"), (2, "b"), (3, "c")).toDF("a", "b")
val df2 = Seq((2, "A"), (3, "B"), (4, "C")).toDF("a", "B")
val joined = df1.join(df2, (df1("a") + 1) === df2("a"))
val dropped = joined.drop(df2("a"))
dropped.show
+---+---+---+
| a| b| B|
+---+---+---+
| 1| a| A|
| 2| b| B|
| 3| c| C|
+---+---+---+
df1.explain
== Physical Plan ==
LocalTableScan [a#5, b#6]
df2.explain
== Physical Plan ==
LocalTableScan [a#15, B#16]
dropped.explain
== Physical Plan ==
*Project [a#5, b#6, B#16]
+- *BroadcastHashJoin [(a#5 + 1)], [a#15], Inner, BuildRight
:- LocalTableScan [a#5, b#6]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
false] as bigint)))
+- LocalTableScan [a#15, B#16]
{code}
For the third one, I also ran a job as well and got following result.
{code}
val df = Seq((0, 0, 0),
(0, 0, 1),
(0, 1, 0),
(0, 1, 1),
(1, 0, 0),
(1, 0, 1),
(1, 1, 0),
(1, 1, 1)).toDF("a", "b", "c")
df.filter(df("a") > 0).filter(df("b") > 0).filter(df("c") === 1).show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 1| 1|
+---+---+---+
{code}
I think it's expected behavior.
> Wrong result can be returned or AnalysisException can be thrown after
> self-join or similar operations
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-17154
> URL: https://issues.apache.org/jira/browse/SPARK-17154
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.2, 2.0.0
> Reporter: Kousuke Saruta
> Attachments: Name-conflicts-2.pdf, Solution_Proposal_SPARK-17154.pdf
>
>
> When we join two DataFrames which are originated from a same DataFrame,
> operations to the joined DataFrame can fail.
> One reproducible example is as follows.
> {code}
> val df = Seq(
> (1, "a", "A"),
> (2, "b", "B"),
> (3, "c", "C"),
> (4, "d", "D"),
> (5, "e", "E")).toDF("col1", "col2", "col3")
> val filtered = df.filter("col1 != 3").select("col1", "col2")
> val joined = filtered.join(df, filtered("col1") === df("col1"), "inner")
> val selected1 = joined.select(df("col3"))
> {code}
> In this case, AnalysisException is thrown.
> Another example is as follows.
> {code}
> val df = Seq(
> (1, "a", "A"),
> (2, "b", "B"),
> (3, "c", "C"),
> (4, "d", "D"),
> (5, "e", "E")).toDF("col1", "col2", "col3")
> val filtered = df.filter("col1 != 3").select("col1", "col2")
> val rightOuterJoined = filtered.join(df, filtered("col1") === df("col1"),
> "right")
> val selected2 = rightOuterJoined.select(df("col1"))
> selected2.show
> {code}
> In this case, we will expect to get the answer like as follows.
> {code}
> 1
> 2
> 3
> 4
> 5
> {code}
> But the actual result is as follows.
> {code}
> 1
> 2
> null
> 4
> 5
> {code}
> The cause of the problems in the examples is that the logical plan related to
> the right side DataFrame and the expressions of its output are re-created in
> the analyzer (at ResolveReference rule) when a DataFrame has expressions
> which have a same exprId each other.
> Re-created expressions are equally to the original ones except exprId.
> This will happen when we do self-join or similar pattern operations.
> In the first example, df("col3") returns a Column which includes an
> expression and the expression have an exprId (say id1 here).
> After join, the expresion which the right side DataFrame (df) has is
> re-created and the old and new expressions are equally but exprId is renewed
> (say id2 for the new exprId here).
> Because of the mismatch of those exprIds, AnalysisException is thrown.
> In the second example, df("col1") returns a column and the expression
> contained in the column is assigned an exprId (say id3).
> On the other hand, a column returned by filtered("col1") has an expression
> which has the same exprId (id3).
> After join, the expressions in the right side DataFrame are re-created and
> the expression assigned id3 is no longer present in the right side but
> present in the left side.
> So, referring df("col1") to the joined DataFrame, we get col1 of right side
> which includes null.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]