This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d92e66643e8d [SPARK-47104][SQL] `TakeOrderedAndProjectExec` should
initialize the unsafe projection
d92e66643e8d is described below
commit d92e66643e8dc4e5395a0b8c2f17a72afed1528e
Author: Bruce Robbins <[email protected]>
AuthorDate: Wed Feb 21 12:53:17 2024 -0800
[SPARK-47104][SQL] `TakeOrderedAndProjectExec` should initialize the unsafe
projection
### What changes were proposed in this pull request?
Change `TakeOrderedAndProjectExec#executeCollect` and
`TakeOrderedAndProjectExec#doExecute` to initialize the unsafe projection
before using it to produce output rows.
### Why are the changes needed?
Because the unsafe projection is not initialized, non-deterministic
expressions also don't get initialized. This results in errors when the
projection contains non-deterministic expressions. For example:
```
create or replace temp view v1(id, name) as values
(1, "fred"),
(2, "bob");
cache table v1;
select name, uuid() as _iid from (
select * from v1 order by name
)
limit 20;
```
This query produces the following error:
```
java.lang.NullPointerException: Cannot invoke
"org.apache.spark.sql.catalyst.util.RandomUUIDGenerator.getNextUUIDUTF8String()"
because "this.randomGen_0" is null
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$6(limit.scala:297)
at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:934)
at
org.apache.spark.sql.execution.TakeOrderedAndProjectExec.$anonfun$executeCollect$1(limit.scala:297)
...
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45199 from bersprockets/take_ordered_issue.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/sql/execution/limit.scala | 4 ++-
.../sql/execution/TakeOrderedAndProjectSuite.scala | 36 +++++++++++++++++++++-
2 files changed, 38 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 0dc4a69c0758..37fe2565d8f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -294,6 +294,7 @@ case class TakeOrderedAndProjectExec(
val data = if (offset > 0) limited.drop(offset) else limited
if (projectList != child.output) {
val proj = UnsafeProjection.create(projectList, child.output)
+ proj.initialize(0)
data.map(r => proj(r).copy())
} else {
data
@@ -335,11 +336,12 @@ case class TakeOrderedAndProjectExec(
writeMetrics),
readMetrics)
}
- singlePartitionRDD.mapPartitionsInternal { iter =>
+ singlePartitionRDD.mapPartitionsWithIndexInternal { (idx, iter) =>
val limited = Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
val topK = if (offset > 0) limited.drop(offset) else limited
if (projectList != child.output) {
val proj = UnsafeProjection.create(projectList, child.output)
+ proj.initialize(idx)
topK.map(r => proj(r))
} else {
topK
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index 647d46f8fbf9..c0ed9777e4e5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -21,7 +21,7 @@ import scala.util.Random
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Rand}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -127,4 +127,38 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest
with SharedSparkSession {
}
}
}
+
+ test("SPARK-47104: Non-deterministic expressions in projection") {
+ val expected = (input: SparkPlan) => {
+ GlobalLimitExec(limit,
+ LocalLimitExec(limit,
+ SortExec(sortOrder, true, input)))
+ }
+ val schema = StructType.fromDDL("a int, b int, c double")
+ val rdd = sparkContext.parallelize(
+ Seq(Row(1, 2, 0.0953472826424725d),
+ Row(2, 3, 0.5234194256885571d),
+ Row(3, 4, 0.7604953758285915d)), 1)
+ val df = spark.createDataFrame(rdd, schema)
+ val projection = df.queryExecution.sparkPlan.output.take(2) :+
+ Alias(Rand(Literal(0, IntegerType)), "_uuid")()
+
+ // test executeCollect
+ checkThatPlansAgree(
+ df,
+ input =>
+ TakeOrderedAndProjectExec(limit, sortOrder, projection,
+ SortExec(sortOrder, false, input)),
+ input => expected(input),
+ sortAnswers = false)
+
+ // test doExecute
+ checkThatPlansAgree(
+ df,
+ input =>
+ noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, projection,
+ SortExec(sortOrder, false, input))),
+ input => expected(input),
+ sortAnswers = false)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]