[
https://issues.apache.org/jira/browse/SPARK-53143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Milicevic updated SPARK-53143:
------------------------------------
Description:
There is an edge case with self join when multiple joins are present.
Example: two joins, where the latter one is self join.
The first one is the "using" join - in this case, analyzer's
`ResolveNaturalAndUsingJoin` will add `Project` as the top node.
The second join is a self join, but with specified join condition (i.e.
`joinExprs`) -
if the join condition uses columns that are not part of the project list (of
the first
join), `AddMetadataColumns` rule will be hit to add metadata for those columns.
As a
consequence, `Project` will be added to the top of joined plan to return the
original/expected list of projected columns.
Whereas similar (i.e. `Project` node on top) can happen in multiple other cases,
from `Dataset` perspective the issue is specific to self joins only, since
`resolveSelfJoinCondition` assumed that the analyzed plan will be always of
`Join` type.
Minimalist PySpark repro:
{code:java}
spark.sql("CREATE TABLE IF NOT EXISTS table_11 (id INT);")
spark.sql("CREATE TABLE IF NOT EXISTS table_12 (id INT, col_1 STRING);")
df = spark.table("table_12").where("col_1 = 'test'").select("id")
spark.table("table_11").alias("t") \
.join(df.alias("df1"), on = ["id"]) \
.join(df.alias("df2"), col("df1.id") == col("df2.id"), how = "left"){code}
Throws an exception:
{code:java}
java.lang.ClassCastException: class
org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to class
org.apache.spark.sql.catalyst.plans.logical.Join
(org.apache.spark.sql.catalyst.plans.logical.Project and
org.apache.spark.sql.catalyst.plans.logical.Join are in unnamed module of
loader 'app')
at
org.apache.spark.sql.classic.Dataset.resolveSelfJoinCondition(Dataset.scala:665)
at org.apache.spark.sql.classic.Dataset.join(Dataset.scala:690)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
at java.base/java.lang.Thread.run(Thread.java:840) {code}
was:
There is an edge case with self join when multiple joins are present.
Example: two joins, where the latter one is self join.
The first one is the "using" join - in this case, analyzer's
`ResolveNaturalAndUsingJoin` will add `Project` as the top node.
The second join is a self join, but with specified join condition (i.e.
`joinExprs`) -
if the join condition uses columns that are not part of the project list (of
the first
join), `AddMetadataColumns` rule will be hit to add metadata for those columns.
As a
consequence, `Project` will be added to the top of joined plan to return the
original/expected list of projected columns.
Whereas similar (i.e. `Project` node on top) can happen in multiple other cases,
from `Dataset` perspective the issue is specific to self joins only, since
`resolveSelfJoinCondition` assumed that the analyzed plan will be always of
`Join` type.
> Fix self join in DataFrame API - Join is not the only expected output from
> analyzer
> -----------------------------------------------------------------------------------
>
> Key: SPARK-53143
> URL: https://issues.apache.org/jira/browse/SPARK-53143
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 4.0.0
> Reporter: David Milicevic
> Priority: Major
>
> There is an edge case with self join when multiple joins are present.
>
> Example: two joins, where the latter one is self join.
> The first one is the "using" join - in this case, analyzer's
> `ResolveNaturalAndUsingJoin` will add `Project` as the top node.
> The second join is a self join, but with specified join condition (i.e.
> `joinExprs`) -
> if the join condition uses columns that are not part of the project list (of
> the first
> join), `AddMetadataColumns` rule will be hit to add metadata for those
> columns. As a
> consequence, `Project` will be added to the top of joined plan to return the
> original/expected list of projected columns.
> Whereas similar (i.e. `Project` node on top) can happen in multiple other
> cases,
> from `Dataset` perspective the issue is specific to self joins only, since
> `resolveSelfJoinCondition` assumed that the analyzed plan will be always of
> `Join` type.
>
> Minimalist PySpark repro:
>
> {code:java}
> spark.sql("CREATE TABLE IF NOT EXISTS table_11 (id INT);")
> spark.sql("CREATE TABLE IF NOT EXISTS table_12 (id INT, col_1 STRING);")
> df = spark.table("table_12").where("col_1 = 'test'").select("id")
> spark.table("table_11").alias("t") \
> .join(df.alias("df1"), on = ["id"]) \
> .join(df.alias("df2"), col("df1.id") == col("df2.id"), how = "left"){code}
> Throws an exception:
> {code:java}
> java.lang.ClassCastException: class
> org.apache.spark.sql.catalyst.plans.logical.Project cannot be cast to class
> org.apache.spark.sql.catalyst.plans.logical.Join
> (org.apache.spark.sql.catalyst.plans.logical.Project and
> org.apache.spark.sql.catalyst.plans.logical.Join are in unnamed module of
> loader 'app')
> at
> org.apache.spark.sql.classic.Dataset.resolveSelfJoinCondition(Dataset.scala:665)
> at org.apache.spark.sql.classic.Dataset.join(Dataset.scala:690)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:569)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
> at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
> at java.base/java.lang.Thread.run(Thread.java:840) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]