This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 917bc8cb927 [SPARK-45360][SQL][CONNECT] Initialize spark session
builder configuration from SPARK_REMOTE
917bc8cb927 is described below
commit 917bc8cb92728267fb93891f4ef9da13c06e4589
Author: Yihong He <[email protected]>
AuthorDate: Thu Sep 28 12:58:07 2023 -0400
[SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration
from SPARK_REMOTE
### What changes were proposed in this pull request?
- Initialize spark session builder configuration from SPARK_REMOTE
### Why are the changes needed?
- `SparkSession.builder().getOrCreate()` should follow the behavior
documents
[here](https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/docs/spark-connect-overview.md?plain=1#L241-L244)
and support initialization from SPARK_REMOTE
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"`
### Was this patch authored or co-authored using generative AI tooling?
Closes #43153 from heyihong/SPARK-45360.
Authored-by: Yihong He <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 183a3d761f36d35572cfb37ab921b6a86f8f28ed)
Signed-off-by: Herman van Hovell <[email protected]>
---
.../scala/org/apache/spark/sql/SparkSession.scala | 5 +-
.../connect/client/SparkConnectClientSuite.scala | 61 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 1 deletion(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7bd8fa59aea..421f37b9e8a 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -783,7 +783,10 @@ object SparkSession extends Logging {
}
class Builder() extends Logging {
- private val builder = SparkConnectClient.builder()
+ // Initialize the connection string of the Spark Connect client builder
from SPARK_REMOTE
+ // by default, if it exists. The connection string can be overridden using
+ // the remote() function, as it takes precedence over the SPARK_REMOTE
environment variable.
+ private val builder = SparkConnectClient.builder().loadFromEnvironment()
private var client: SparkConnectClient = _
private[this] val options = new scala.collection.mutable.HashMap[String,
String]
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 80e245ec78b..89acc2c60ac 100644
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -86,6 +86,24 @@ class SparkConnectClientSuite extends ConnectFunSuite with
BeforeAndAfterEach {
assert(response.getSessionId === "abc123")
}
+ private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
+ val readonlyEnv = System.getenv()
+ val field = readonlyEnv.getClass.getDeclaredField("m")
+ field.setAccessible(true)
+ val modifiableEnv =
field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
+ try {
+ for ((k, v) <- pairs) {
+ assert(!modifiableEnv.containsKey(k))
+ modifiableEnv.put(k, v)
+ }
+ f
+ } finally {
+ for ((k, _) <- pairs) {
+ modifiableEnv.remove(k)
+ }
+ }
+ }
+
test("Test connection") {
testClientConnection() { testPort =>
SparkConnectClient.builder().port(testPort).build() }
}
@@ -112,6 +130,49 @@ class SparkConnectClientSuite extends ConnectFunSuite with
BeforeAndAfterEach {
}
}
+ test("SparkSession create with SPARK_REMOTE") {
+ startDummyServer(0)
+
+ withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+ val session = SparkSession.builder().create()
+ val df = session.range(10)
+ df.analyze // Trigger RPC
+ assert(df.plan === service.getAndClearLatestInputPlan())
+
+ val session2 = SparkSession.builder().create()
+ assert(session != session2)
+ }
+ }
+
+ test("SparkSession getOrCreate with SPARK_REMOTE") {
+ startDummyServer(0)
+
+ withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+ val session = SparkSession.builder().getOrCreate()
+
+ val df = session.range(10)
+ df.analyze // Trigger RPC
+ assert(df.plan === service.getAndClearLatestInputPlan())
+
+ val session2 = SparkSession.builder().getOrCreate()
+ assert(session === session2)
+ }
+ }
+
+ test("Builder.remote takes precedence over SPARK_REMOTE") {
+ startDummyServer(0)
+ val incorrectUrl = s"sc://localhost:${server.getPort + 1}"
+
+ withEnvs("SPARK_REMOTE" -> incorrectUrl) {
+ val session =
+
SparkSession.builder().remote(s"sc://localhost:${server.getPort}").getOrCreate()
+
+ val df = session.range(10)
+ df.analyze // Trigger RPC
+ assert(df.plan === service.getAndClearLatestInputPlan())
+ }
+ }
+
test("SparkSession initialisation with connection string") {
startDummyServer(0)
client = SparkConnectClient
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]