This is an automated email from the ASF dual-hosted git repository.

sandy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 63ea26030f96 [SPARK-55055][PYTHON] Support SparkSession.Builder.create 
for PySpark Classic
63ea26030f96 is described below

commit 63ea26030f961b34cca11587ebc2dcf8849e2941
Author: Jon Mio <[email protected]>
AuthorDate: Tue Jan 27 15:51:30 2026 -0800

    [SPARK-55055][PYTHON] Support SparkSession.Builder.create for PySpark 
Classic
    
    ### What changes were proposed in this pull request?
    Allow users to call `SparkSessionBuilder.create` from PySpark in Classic 
Mode so that they can a new session with the provided builder configs without 
mutating configurations in any existing sessions.
    
    ### Why are the changes needed?
    Currently, `create` is not supported in PySpark classic. Users can call 
`getOrCreate` to obtain a session with the provided configurations, but if a 
session already exists, `getOrCreate` will mutate the existing configurations 
in the session. We would like to provide an API for users which guarantees 
creation of a session and does not have side effects on existing sessions.
    
    This change also unifies the API between Classic and Connect mode since 
`create` is already supported in Connect.
    
    ### Does this PR introduce _any_ user-facing change?
    Previously calling `SparkSessionBuilder.create()` would throw an exception 
saying that create is only supported in Connect mode.
    
    After this change, `SparkSessionBuilder.create()` will create a new 
SparkSession with the specified builder configurations.
    
    ### How was this patch tested?
    Added the new test class `SparkSessionBuilderCreateTests` in 
`python/pyspark/sql/tests/test_session.py`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53820 from jonmio/pyspark_create.
    
    Authored-by: Jon Mio <[email protected]>
    Signed-off-by: Sandy Ryza <[email protected]>
---
 python/pyspark/sql/session.py            |  28 ++++--
 python/pyspark/sql/tests/test_session.py | 155 +++++++++++++++++++++++++++++++
 2 files changed, 175 insertions(+), 8 deletions(-)

diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index cba5b02a9ccf..e11817249be5 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -568,14 +568,14 @@ class SparkSession(SparkConversionMixin):
                     module.applyModifiableSettings(session._jsparkSession, 
self._options)
                 return session
 
-        # Spark Connect-specific API
-        @remote_only
         def create(self) -> "SparkSession":
-            """Creates a new SparkSession. Can only be used in the context of 
Spark Connect
-            and will throw an exception otherwise.
+            """Creates a new SparkSession.
 
             .. versionadded:: 3.5.0
 
+            .. versionchanged:: 4.2.0
+                Supports creating a SparkSession in Classic mode.
+
             Returns
             -------
             :class:`SparkSession`
@@ -584,7 +584,10 @@ class SparkSession(SparkConversionMixin):
             -----
             This method will update the default and/or active session if they 
are not set.
             """
+            from pyspark.core.context import SparkContext
+
             opts = dict(self._options)
+            # Connect mode
             if "SPARK_REMOTE" in os.environ or "spark.remote" in opts:
                 from pyspark.sql.connect.session import SparkSession as 
RemoteSparkSession
 
@@ -603,11 +606,20 @@ class SparkSession(SparkConversionMixin):
                 os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
                 opts["spark.remote"] = url
                 return cast(SparkSession, 
RemoteSparkSession.builder.config(map=opts).create())
+            # Classic mode
             else:
-                raise PySparkRuntimeError(
-                    errorClass="ONLY_SUPPORTED_WITH_SPARK_CONNECT",
-                    messageParameters={"feature": 
"SparkSession.builder.create"},
-                )
+                with self._lock:
+                    # Build SparkConf from options
+                    sparkConf = SparkConf()
+                    for key, value in self._options.items():
+                        sparkConf.set(key, str(value))
+
+                    sc = SparkContext.getOrCreate(sparkConf)
+                    jSparkSessionClass = 
SparkSession._get_j_spark_session_class(sc._jvm)
+                    # Create a new SparkSession in the JVM
+                    jSparkSession = 
jSparkSessionClass.builder().config(self._options).create()
+                    # Wrap the JVM SparkSession in a Python SparkSession
+                    return SparkSession(sc, jsparkSession=jSparkSession, 
options=self._options)
 
     # SPARK-47544: Explicitly declaring this as an identifier instead of a 
method.
     # If changing, make sure this bug is not reintroduced.
diff --git a/python/pyspark/sql/tests/test_session.py 
b/python/pyspark/sql/tests/test_session.py
index 4da4e355695c..3606056f6793 100644
--- a/python/pyspark/sql/tests/test_session.py
+++ b/python/pyspark/sql/tests/test_session.py
@@ -28,6 +28,7 @@ from pyspark.testing.connectutils import (
     should_test_connect,
     connect_requirement_message,
 )
+from pyspark.errors.exceptions.captured import SparkNoSuchElementException
 from pyspark.sql.profiler import Profile
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 from pyspark.testing.utils import PySparkTestCase, PySparkErrorTestUtils
@@ -462,6 +463,160 @@ class SparkSessionBuilderTests(unittest.TestCase, 
PySparkErrorTestUtils):
         )
 
 
+class SparkSessionBuilderCreateTests(unittest.TestCase, PySparkErrorTestUtils):
+    """
+    Tests for SparkSession.Builder.create() API.
+    """
+
+    def _get_builder(self):
+        """
+        Helper method to get a SparkSession.builder pre-configured for testing.
+
+        Returns:
+            SparkSession.Builder: A builder with basic configurations
+        """
+        return SparkSession.builder.master("local[4]")
+
+    def setUp(self):
+        """Initialize session variable for tests."""
+        self.session = None
+
+    def tearDown(self):
+        """Clean up SparkSession after each test."""
+        if self.session is not None:
+            self.session.stop()
+
+    def test_create_basic_functionality(self):
+        # Ensure that there is no active session initially
+        self.assertIsNone(SparkSession.getActiveSession())
+        self.session = self._get_builder().create()
+
+        # Verify session was created
+        self.assertIsNotNone(self.session)
+        self.assertIsNotNone(self.session.sparkContext)
+        self.assertIsNotNone(self.session._jsparkSession)
+
+        # Verify we can perform basic operations
+        df = self.session.range(10)
+        self.assertEqual(df.count(), 10)
+
+        # Ensure the active session is updated when it was previously None
+        self.assertEqual(self.session, SparkSession.getActiveSession())
+        # Check that calling create again will create a different session
+        session2 = self._get_builder().create()
+        # Ensure that the active session is not updated since it is already set
+        self.assertNotEqual(session2, SparkSession.getActiveSession())
+        # Ensure that a brand new session was created
+        self.assertNotEqual(self.session, session2)
+        self.assertNotEqual(self.session._jsparkSession, 
session2._jsparkSession)
+
+    def test_create_works_with_or_without_existing_spark_context(self):
+        """
+        Test create() both without a pre-existing SparkContext and with a 
pre-existing SparkContext.
+        """
+        sc = None
+        session = None
+        try:
+            # Stop any existing SparkContext first to ensure a clean state
+            existing_sc = SparkContext._active_spark_context
+            if existing_sc is not None:
+                existing_sc.stop()
+
+            # Create session without a pre-existing SparkContext
+            session = SparkSession.builder.master("local[4]").create()
+            sc = session.sparkContext
+            self.assertIsNotNone(sc)
+            # Call create again while the SparkContext is still running
+            session2 = SparkSession.builder.create()
+            # Verify SparkSession attaches to the existing SparkContext
+            self.assertEqual(session2.sparkContext, sc)
+
+        finally:
+            # Stop the SparkContext which also stops all sessions
+            if sc is not None:
+                sc.stop()
+
+    def test_create_respects_spark_configs(self):
+        """
+        Test that Spark configs are properly applied and not leaked between 
sessions.
+        """
+        # Create a session which also starts the SparkContext
+        self.session = self._get_builder().create()
+
+        # Create a second session with additional custom config
+        session2 = (
+            self._get_builder()
+            .config("spark.sql.shuffle.partitions", "10")
+            .config("spark.test.additional.config", "extra_value")
+            .create()
+        )
+        self.assertEqual(session2.conf.get("spark.sql.shuffle.partitions"), 
"10")
+        self.assertEqual(session2.conf.get("spark.test.additional.config"), 
"extra_value")
+
+        session3 = self._get_builder().config("spark.sql.shuffle.partitions", 
"20").create()
+        self.assertEqual(session3.conf.get("spark.sql.shuffle.partitions"), 
"20")
+        # Ensure config doesn't leak between sessions
+        with self.assertRaises(SparkNoSuchElementException):
+            session3.conf.get("spark.test.additional.config")
+
+    def test_create_and_getOrCreate_interaction(self):
+        """
+        Test interaction between create() and getOrCreate().
+        """
+        self.session = self._get_builder().create()
+        # getOrCreate() should return the active session (self.session)
+        session2 = SparkSession.builder.getOrCreate()
+        self.assertEqual(self.session, session2)
+
+    def test_create_with_invalid_master(self):
+        """Test create() with invalid master URL."""
+        with self.assertRaises(Exception):
+            self.session = 
SparkSession.builder.master("invalid://localhost").create()
+
+    def test_create_with_app_name(self):
+        """Test create() with appName() builder method."""
+        app_name = "TestCreateAppName"
+        self.session = self._get_builder().appName(app_name).create()
+
+        self.assertEqual(self.session.sparkContext.appName, app_name)
+        self.assertEqual(self.session.range(5).count(), 5)
+
+    def test_create_default_session_behavior(self):
+        """Test that first create() sets active session, subsequent calls 
don't override."""
+        self.assertIsNone(SparkSession.getActiveSession())
+
+        self.session = 
self._get_builder().appName("DefaultSessionTest1").create()
+        self.assertEqual(self.session, SparkSession.getActiveSession())
+
+        session2 = self._get_builder().appName("DefaultSessionTest2").create()
+        try:
+            self.assertEqual(self.session, SparkSession.getActiveSession())
+            self.assertNotEqual(session2, SparkSession.getActiveSession())
+            self.assertEqual(self.session.range(3).count(), 3)
+            self.assertEqual(session2.range(5).count(), 5)
+        finally:
+            session2.stop()
+
+    def test_create_sessions_share_spark_context(self):
+        """Test that multiple create() sessions share SparkContext but have 
independent state."""
+        self.session = 
self._get_builder().appName("SharedContextTest1").create()
+        session2 = self._get_builder().appName("SharedContextTest2").create()
+        try:
+            self.assertEqual(self.session.sparkContext, session2.sparkContext)
+            self.assertIsNotNone(self.session.sparkContext)
+
+            df1 = self.session.createDataFrame([(1, "Alice"), (2, "Bob")], 
["id", "name"])
+            self.assertEqual(df1.count(), 2)
+
+            df2 = session2.createDataFrame([(3, "Charlie"), (4, "David")], 
["id", "name"])
+            self.assertEqual(df2.count(), 2)
+
+            self.assertNotEqual(self.session, session2)
+            self.assertNotEqual(self.session._jsparkSession, 
session2._jsparkSession)
+        finally:
+            session2.stop()
+
+
 class SparkSessionProfileTests(unittest.TestCase, PySparkErrorTestUtils):
     def setUp(self):
         self.profiler_collector_mock = unittest.mock.Mock()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to