This is an automated email from the ASF dual-hosted git repository.
sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 63ea26030f96 [SPARK-55055][PYTHON] Support SparkSession.Builder.create
for PySpark Classic
63ea26030f96 is described below
commit 63ea26030f961b34cca11587ebc2dcf8849e2941
Author: Jon Mio <[email protected]>
AuthorDate: Tue Jan 27 15:51:30 2026 -0800
[SPARK-55055][PYTHON] Support SparkSession.Builder.create for PySpark
Classic
### What changes were proposed in this pull request?
Allow users to call `SparkSessionBuilder.create` from PySpark in Classic
Mode so that they can a new session with the provided builder configs without
mutating configurations in any existing sessions.
### Why are the changes needed?
Currently, `create` is not supported in PySpark classic. Users can call
`getOrCreate` to obtain a session with the provided configurations, but if a
session already exists, `getOrCreate` will mutate the existing configurations
in the session. We would like to provide an API for users which guarantees
creation of a session and does not have side effects on existing sessions.
This change also unifies the API between Classic and Connect mode since
`create` is already supported in Connect.
### Does this PR introduce _any_ user-facing change?
Previously calling `SparkSessionBuilder.create()` would throw an exception
saying that create is only supported in Connect mode.
After this change, `SparkSessionBuilder.create()` will create a new
SparkSession with the specified builder configurations.
### How was this patch tested?
Added the new test class `SparkSessionBuilderCreateTests` in
`python/pyspark/sql/tests/test_session.py`
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53820 from jonmio/pyspark_create.
Authored-by: Jon Mio <[email protected]>
Signed-off-by: Sandy Ryza <[email protected]>
---
python/pyspark/sql/session.py | 28 ++++--
python/pyspark/sql/tests/test_session.py | 155 +++++++++++++++++++++++++++++++
2 files changed, 175 insertions(+), 8 deletions(-)
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index cba5b02a9ccf..e11817249be5 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -568,14 +568,14 @@ class SparkSession(SparkConversionMixin):
module.applyModifiableSettings(session._jsparkSession,
self._options)
return session
- # Spark Connect-specific API
- @remote_only
def create(self) -> "SparkSession":
- """Creates a new SparkSession. Can only be used in the context of
Spark Connect
- and will throw an exception otherwise.
+ """Creates a new SparkSession.
.. versionadded:: 3.5.0
+ .. versionchanged:: 4.2.0
+ Supports creating a SparkSession in Classic mode.
+
Returns
-------
:class:`SparkSession`
@@ -584,7 +584,10 @@ class SparkSession(SparkConversionMixin):
-----
This method will update the default and/or active session if they
are not set.
"""
+ from pyspark.core.context import SparkContext
+
opts = dict(self._options)
+ # Connect mode
if "SPARK_REMOTE" in os.environ or "spark.remote" in opts:
from pyspark.sql.connect.session import SparkSession as
RemoteSparkSession
@@ -603,11 +606,20 @@ class SparkSession(SparkConversionMixin):
os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
opts["spark.remote"] = url
return cast(SparkSession,
RemoteSparkSession.builder.config(map=opts).create())
+ # Classic mode
else:
- raise PySparkRuntimeError(
- errorClass="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
- messageParameters={"feature":
"SparkSession.builder.create"},
- )
+ with self._lock:
+ # Build SparkConf from options
+ sparkConf = SparkConf()
+ for key, value in self._options.items():
+ sparkConf.set(key, str(value))
+
+ sc = SparkContext.getOrCreate(sparkConf)
+ jSparkSessionClass =
SparkSession._get_j_spark_session_class(sc._jvm)
+ # Create a new SparkSession in the JVM
+ jSparkSession =
jSparkSessionClass.builder().config(self._options).create()
+ # Wrap the JVM SparkSession in a Python SparkSession
+ return SparkSession(sc, jsparkSession=jSparkSession,
options=self._options)
# SPARK-47544: Explicitly declaring this as an identifier instead of a
method.
# If changing, make sure this bug is not reintroduced.
diff --git a/python/pyspark/sql/tests/test_session.py
b/python/pyspark/sql/tests/test_session.py
index 4da4e355695c..3606056f6793 100644
--- a/python/pyspark/sql/tests/test_session.py
+++ b/python/pyspark/sql/tests/test_session.py
@@ -28,6 +28,7 @@ from pyspark.testing.connectutils import (
should_test_connect,
connect_requirement_message,
)
+from pyspark.errors.exceptions.captured import SparkNoSuchElementException
from pyspark.sql.profiler import Profile
from pyspark.testing.sqlutils import ReusedSQLTestCase
from pyspark.testing.utils import PySparkTestCase, PySparkErrorTestUtils
@@ -462,6 +463,160 @@ class SparkSessionBuilderTests(unittest.TestCase,
PySparkErrorTestUtils):
)
+class SparkSessionBuilderCreateTests(unittest.TestCase, PySparkErrorTestUtils):
+ """
+ Tests for SparkSession.Builder.create() API.
+ """
+
+ def _get_builder(self):
+ """
+ Helper method to get a SparkSession.builder pre-configured for testing.
+
+ Returns:
+ SparkSession.Builder: A builder with basic configurations
+ """
+ return SparkSession.builder.master("local[4]")
+
+ def setUp(self):
+ """Initialize session variable for tests."""
+ self.session = None
+
+ def tearDown(self):
+ """Clean up SparkSession after each test."""
+ if self.session is not None:
+ self.session.stop()
+
+ def test_create_basic_functionality(self):
+ # Ensure that there is no active session initially
+ self.assertIsNone(SparkSession.getActiveSession())
+ self.session = self._get_builder().create()
+
+ # Verify session was created
+ self.assertIsNotNone(self.session)
+ self.assertIsNotNone(self.session.sparkContext)
+ self.assertIsNotNone(self.session._jsparkSession)
+
+ # Verify we can perform basic operations
+ df = self.session.range(10)
+ self.assertEqual(df.count(), 10)
+
+ # Ensure the active session is updated when it was previously None
+ self.assertEqual(self.session, SparkSession.getActiveSession())
+ # Check that calling create again will create a different session
+ session2 = self._get_builder().create()
+ # Ensure that the active session is not updated since it is already set
+ self.assertNotEqual(session2, SparkSession.getActiveSession())
+ # Ensure that a brand new session was created
+ self.assertNotEqual(self.session, session2)
+ self.assertNotEqual(self.session._jsparkSession,
session2._jsparkSession)
+
+ def test_create_works_with_or_without_existing_spark_context(self):
+ """
+ Test create() both without a pre-existing SparkContext and with a
pre-existing SparkContext.
+ """
+ sc = None
+ session = None
+ try:
+ # Stop any existing SparkContext first to ensure a clean state
+ existing_sc = SparkContext._active_spark_context
+ if existing_sc is not None:
+ existing_sc.stop()
+
+ # Create session without a pre-existing SparkContext
+ session = SparkSession.builder.master("local[4]").create()
+ sc = session.sparkContext
+ self.assertIsNotNone(sc)
+ # Call create again while the SparkContext is still running
+ session2 = SparkSession.builder.create()
+ # Verify SparkSession attaches to the existing SparkContext
+ self.assertEqual(session2.sparkContext, sc)
+
+ finally:
+ # Stop the SparkContext which also stops all sessions
+ if sc is not None:
+ sc.stop()
+
+ def test_create_respects_spark_configs(self):
+ """
+ Test that Spark configs are properly applied and not leaked between
sessions.
+ """
+ # Create a session which also starts the SparkContext
+ self.session = self._get_builder().create()
+
+ # Create a second session with additional custom config
+ session2 = (
+ self._get_builder()
+ .config("spark.sql.shuffle.partitions", "10")
+ .config("spark.test.additional.config", "extra_value")
+ .create()
+ )
+ self.assertEqual(session2.conf.get("spark.sql.shuffle.partitions"),
"10")
+ self.assertEqual(session2.conf.get("spark.test.additional.config"),
"extra_value")
+
+ session3 = self._get_builder().config("spark.sql.shuffle.partitions",
"20").create()
+ self.assertEqual(session3.conf.get("spark.sql.shuffle.partitions"),
"20")
+ # Ensure config doesn't leak between sessions
+ with self.assertRaises(SparkNoSuchElementException):
+ session3.conf.get("spark.test.additional.config")
+
+ def test_create_and_getOrCreate_interaction(self):
+ """
+ Test interaction between create() and getOrCreate().
+ """
+ self.session = self._get_builder().create()
+ # getOrCreate() should return the active session (self.session)
+ session2 = SparkSession.builder.getOrCreate()
+ self.assertEqual(self.session, session2)
+
+ def test_create_with_invalid_master(self):
+ """Test create() with invalid master URL."""
+ with self.assertRaises(Exception):
+ self.session =
SparkSession.builder.master("invalid://localhost").create()
+
+ def test_create_with_app_name(self):
+ """Test create() with appName() builder method."""
+ app_name = "TestCreateAppName"
+ self.session = self._get_builder().appName(app_name).create()
+
+ self.assertEqual(self.session.sparkContext.appName, app_name)
+ self.assertEqual(self.session.range(5).count(), 5)
+
+ def test_create_default_session_behavior(self):
+ """Test that first create() sets active session, subsequent calls
don't override."""
+ self.assertIsNone(SparkSession.getActiveSession())
+
+ self.session =
self._get_builder().appName("DefaultSessionTest1").create()
+ self.assertEqual(self.session, SparkSession.getActiveSession())
+
+ session2 = self._get_builder().appName("DefaultSessionTest2").create()
+ try:
+ self.assertEqual(self.session, SparkSession.getActiveSession())
+ self.assertNotEqual(session2, SparkSession.getActiveSession())
+ self.assertEqual(self.session.range(3).count(), 3)
+ self.assertEqual(session2.range(5).count(), 5)
+ finally:
+ session2.stop()
+
+ def test_create_sessions_share_spark_context(self):
+ """Test that multiple create() sessions share SparkContext but have
independent state."""
+ self.session =
self._get_builder().appName("SharedContextTest1").create()
+ session2 = self._get_builder().appName("SharedContextTest2").create()
+ try:
+ self.assertEqual(self.session.sparkContext, session2.sparkContext)
+ self.assertIsNotNone(self.session.sparkContext)
+
+ df1 = self.session.createDataFrame([(1, "Alice"), (2, "Bob")],
["id", "name"])
+ self.assertEqual(df1.count(), 2)
+
+ df2 = session2.createDataFrame([(3, "Charlie"), (4, "David")],
["id", "name"])
+ self.assertEqual(df2.count(), 2)
+
+ self.assertNotEqual(self.session, session2)
+ self.assertNotEqual(self.session._jsparkSession,
session2._jsparkSession)
+ finally:
+ session2.stop()
+
+
class SparkSessionProfileTests(unittest.TestCase, PySparkErrorTestUtils):
def setUp(self):
self.profiler_collector_mock = unittest.mock.Mock()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]