This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d968986704ad [SPARK-46732][CONNECT] Make Subquery/Broadcast thread
work with Connect's artifact management
d968986704ad is described below
commit d968986704adf2d177cafc9165487c68b1461a95
Author: xieshuaihu <[email protected]>
AuthorDate: Wed Jan 17 08:28:29 2024 +0900
[SPARK-46732][CONNECT] Make Subquery/Broadcast thread work with Connect's
artifact management
### What changes were proposed in this pull request?
Similar with SPARK-44794, propagate JobArtifactState to broadcast/subquery
thread.
This is an example:
```scala
val add1 = udf((i: Long) => i + 1)
val tableA = spark.range(2).alias("a")
val tableB =
broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
tableA.join(tableB).
where(col("a.id")===col("b.id")).
select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
collect().
mkString("[", ", ", "]")
```
Before this pr, this example will throw exception `ClassNotFoundException`.
Subquery and Broadcast execution use a separate ThreadPool which don't have the
`JobArtifactState`.
### Why are the changes needed?
Fix bug. Make Subquery/Broadcast thread work with Connect's artifact
management.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add a new test to `ReplE2ESuite`
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44753 from xieshuaihu/broadcast_artifact.
Authored-by: xieshuaihu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/application/ReplE2ESuite.scala | 16 ++++++++++++++++
.../org/apache/spark/sql/execution/SQLExecution.scala | 5 +++--
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
index 0cce44c6e3d9..76958f055f2e 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala
@@ -397,6 +397,22 @@ class ReplE2ESuite extends RemoteSparkSession with
BeforeAndAfterEach {
assertContains("noException: Boolean = true", output)
}
+ test("broadcast works with REPL generated code") {
+ val input =
+ """
+ |val add1 = udf((i: Long) => i + 1)
+ |val tableA = spark.range(2).alias("a")
+ |val tableB =
broadcast(spark.range(2).select(add1(col("id")).alias("id"))).alias("b")
+ |tableA.join(tableB).
+ | where(col("a.id")===col("b.id")).
+ | select(col("a.id").alias("a_id"), col("b.id").alias("b_id")).
+ | collect().
+ | mkString("[", ", ", "]")
+ |""".stripMargin
+ val output = runCommandsInShell(input)
+ assertContains("""String = "[[1,1]]"""", output)
+ }
+
test("closure cleaner") {
val input =
"""
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index e839d2c06913..561deacfb72d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.util.control.NonFatal
-import org.apache.spark.{ErrorMessageFormat, SparkException, SparkThrowable,
SparkThrowableHelper}
+import org.apache.spark.{ErrorMessageFormat, JobArtifactSet, SparkException,
SparkThrowable, SparkThrowableHelper}
import org.apache.spark.SparkContext.{SPARK_JOB_DESCRIPTION,
SPARK_JOB_INTERRUPT_ON_CANCEL}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX,
SPARK_EXECUTOR_PREFIX}
@@ -255,7 +255,8 @@ object SQLExecution extends Logging {
val activeSession = sparkSession
val sc = sparkSession.sparkContext
val localProps = Utils.cloneProperties(sc.getLocalProperties)
- exec.submit(() => {
+ val artifactState = JobArtifactSet.getCurrentJobArtifactState.orNull
+ exec.submit(() => JobArtifactSet.withActiveJobArtifactState(artifactState)
{
val originalSession = SparkSession.getActiveSession
val originalLocalProps = sc.getLocalProperties
SparkSession.setActiveSession(activeSession)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]