This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a9987a38d5fd [SPARK-51738][SQL][FOLLOWUP] Fix HashJoin to accept
structurally-equal types
a9987a38d5fd is described below
commit a9987a38d5fd5c97f9f58dae66ca3d68eec10020
Author: Takuya Ueshin <[email protected]>
AuthorDate: Thu Apr 10 10:28:29 2025 +0800
[SPARK-51738][SQL][FOLLOWUP] Fix HashJoin to accept structurally-equal types
### What changes were proposed in this pull request?
This is a follow-up of #50537.
Fixes `HashJoin` to accept structurally-equal types.
### Why are the changes needed?
#50537 relaxed the requirement for binary comparison, so should `HashJoin`;
otherwise, it can fail with `IllegalArgumentException`.
For example, in `SubquerySuite`:
```scala
sql("""
|SELECT foo IN (SELECT struct(c, d) FROM r)
|FROM (SELECT struct(a, b) foo FROM l)
|""".stripMargin).show()
```
fails with:
```
[info] java.lang.IllegalArgumentException: requirement failed: Join keys
from two sides should have same length and types
[info] at scala.Predef$.require(Predef.scala:337)
[info] at
org.apache.spark.sql.execution.joins.HashJoin.org$apache$spark$sql$execution$joins$HashJoin$$x$6(HashJoin.scala:115)
[info] at
org.apache.spark.sql.execution.joins.HashJoin.org$apache$spark$sql$execution$joins$HashJoin$$x$6$(HashJoin.scala:110)
[info] at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$6$lzycompute(BroadcastHashJoinExec.scala:40)
[info] at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$6(BroadcastHashJoinExec.scala:40)
[info] at
org.apache.spark.sql.execution.joins.HashJoin.buildKeys(HashJoin.scala:110)
[info] at
org.apache.spark.sql.execution.joins.HashJoin.buildKeys$(HashJoin.scala:110)
[info] at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys$lzycompute(BroadcastHashJoinExec.scala:40)
[info] at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys(BroadcastHashJoinExec.scala:40)
[info] at
org.apache.spark.sql.execution.joins.HashJoin.buildBoundKeys(HashJoin.scala:130)
[info] at
org.apache.spark.sql.execution.joins.HashJoin.buildBoundKeys$(HashJoin.scala:129)
[info] at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildBoundKeys$lzycompute(BroadcastHashJoinExec.scala:40)
[info] at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildBoundKeys(BroadcastHashJoinExec.scala:40)
[info] at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.requiredChildDistribution(BroadcastHashJoinExec.scala:63)
...
```
### Does this PR introduce _any_ user-facing change?
Yes, `HashJoin` will work.
### How was this patch tested?
Added the related test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #50549 from ueshin/issues/SPARK-51738/hashjoin.
Authored-by: Takuya Ueshin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/sql/execution/joins/HashJoin.scala | 5 ++---
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 9 +++++++++
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index ce7d48babc91..a1abb64e262d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -25,11 +25,10 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{CodegenSupport, ExplainUtils,
RowIterator}
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.{BooleanType, IntegralType, LongType}
+import org.apache.spark.sql.types.{BooleanType, DataType, IntegralType,
LongType}
/**
* @param relationTerm variable name for HashedRelation
@@ -111,7 +110,7 @@ trait HashJoin extends JoinCodegenSupport {
require(leftKeys.length == rightKeys.length &&
leftKeys.map(_.dataType)
.zip(rightKeys.map(_.dataType))
- .forall(types => DataTypeUtils.sameType(types._1, types._2)),
+ .forall(types => DataType.equalsStructurally(types._1, types._2,
ignoreNullability = true)),
"Join keys from two sides should have same length and types")
buildSide match {
case BuildLeft => (leftKeys, rightKeys)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 45cd6c2af301..576f93e94ec1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2836,5 +2836,14 @@ class SubquerySuite extends QueryTest
sql("SELECT foo IN (SELECT struct(1 a)) FROM (SELECT struct(1 b) foo)"),
Row(true)
)
+
+ checkAnswer(
+ sql("""
+ |SELECT foo IN (SELECT struct(c, d) FROM r)
+ |FROM (SELECT struct(a, b) foo FROM l)
+ |""".stripMargin),
+ Row(false) :: Row(false) :: Row(false) :: Row(false) :: Row(false)
+ :: Row(true) :: Row(true) :: Row(true) :: Nil
+ )
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]