This is an automated email from the ASF dual-hosted git repository.
xinrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 027aeb1764a8 [SPARK-46277][PYTHON] Validate startup urls with the
config being set
027aeb1764a8 is described below
commit 027aeb1764a816858b7ea071cd2b620f02a6a525
Author: Xinrong Meng <[email protected]>
AuthorDate: Thu Dec 7 13:45:31 2023 -0800
[SPARK-46277][PYTHON] Validate startup urls with the config being set
### What changes were proposed in this pull request?
Validate startup urls with the config being set, see example in the "Does
this PR introduce _any_ user-facing change".
### Why are the changes needed?
Clear and user-friendly error messages.
### Does this PR introduce _any_ user-facing change?
Yes.
FROM
```py
>>> SparkSession.builder.config(map={"spark.master": "x", "spark.remote":
"y"})
<pyspark.sql.session.SparkSession.Builder object at 0x7fa310115b20
>>> SparkSession.builder.config(map={"spark.master": "x", "spark.remote":
"y"}).config("x", "z") # Only raises the error when adding new configs
Traceback (most recent call last):
...
RuntimeError: Spark master cannot be configured with Spark Connect server;
however, found URL for Spark Connect [y]
```
TO
```py
>>> SparkSession.builder.config(map={"spark.master": "x", "spark.remote":
"y"})
Traceback (most recent call last):
...
RuntimeError: Spark master cannot be configured with Spark Connect server;
however, found URL for Spark Connect [y]
```
### How was this patch tested?
Unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44194 from xinrong-meng/fix_session.
Authored-by: Xinrong Meng <[email protected]>
Signed-off-by: Xinrong Meng <[email protected]>
---
python/pyspark/errors/error_classes.py | 6 +++---
python/pyspark/sql/session.py | 28 +++++++++++-----------------
python/pyspark/sql/tests/test_session.py | 30 ++++++++++++++++++++++++++++--
3 files changed, 42 insertions(+), 22 deletions(-)
diff --git a/python/pyspark/errors/error_classes.py
b/python/pyspark/errors/error_classes.py
index 965fd04a9135..cc8400270967 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -86,12 +86,12 @@ ERROR_CLASSES_JSON = """
},
"CANNOT_CONFIGURE_SPARK_CONNECT": {
"message": [
- "Spark Connect server cannot be configured with Spark master; however,
found URL for Spark master [<url>]."
+ "Spark Connect server cannot be configured: Existing [<existing_url>],
New [<new_url>]."
]
},
- "CANNOT_CONFIGURE_SPARK_MASTER": {
+ "CANNOT_CONFIGURE_SPARK_CONNECT_MASTER": {
"message": [
- "Spark master cannot be configured with Spark Connect server; however,
found URL for Spark Connect [<url>]."
+ "Spark Connect server and Spark master cannot be configured together:
Spark master [<master_url>], Spark Connect [<connect_url>]."
]
},
"CANNOT_CONVERT_COLUMN_INTO_BOOL": {
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 7f4589557cd2..86aacfa54c6e 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -286,17 +286,17 @@ class SparkSession(SparkConversionMixin):
with self._lock:
if conf is not None:
for k, v in conf.getAll():
- self._validate_startup_urls()
self._options[k] = v
+ self._validate_startup_urls()
elif map is not None:
for k, v in map.items(): # type: ignore[assignment]
v = to_str(v) # type: ignore[assignment]
- self._validate_startup_urls()
self._options[k] = v
+ self._validate_startup_urls()
else:
value = to_str(value)
- self._validate_startup_urls()
self._options[cast(str, key)] = value
+ self._validate_startup_urls()
return self
def _validate_startup_urls(
@@ -306,22 +306,16 @@ class SparkSession(SparkConversionMixin):
Helper function that validates the combination of startup URLs and
raises an exception
if incompatible options are selected.
"""
- if "spark.master" in self._options and (
+ if ("spark.master" in self._options or "MASTER" in os.environ) and
(
"spark.remote" in self._options or "SPARK_REMOTE" in os.environ
):
raise PySparkRuntimeError(
- error_class="CANNOT_CONFIGURE_SPARK_MASTER",
+ error_class="CANNOT_CONFIGURE_SPARK_CONNECT_MASTER",
message_parameters={
- "url": self._options.get("spark.remote",
os.environ.get("SPARK_REMOTE"))
- },
- )
- if "spark.remote" in self._options and (
- "spark.master" in self._options or "MASTER" in os.environ
- ):
- raise PySparkRuntimeError(
- error_class="CANNOT_CONFIGURE_SPARK_CONNECT",
- message_parameters={
- "url": self._options.get("spark.master",
os.environ.get("MASTER"))
+ "master_url": self._options.get("spark.master",
os.environ.get("MASTER")),
+ "connect_url": self._options.get(
+ "spark.remote", os.environ.get("SPARK_REMOTE")
+ ),
},
)
@@ -333,8 +327,8 @@ class SparkSession(SparkConversionMixin):
raise PySparkRuntimeError(
error_class="CANNOT_CONFIGURE_SPARK_CONNECT",
message_parameters={
- "new_url": os.environ["SPARK_REMOTE"],
- "existing_url": remote,
+ "existing_url": os.environ["SPARK_REMOTE"],
+ "new_url": remote,
},
)
diff --git a/python/pyspark/sql/tests/test_session.py
b/python/pyspark/sql/tests/test_session.py
index da27bf925749..ba1d999ff7ba 100644
--- a/python/pyspark/sql/tests/test_session.py
+++ b/python/pyspark/sql/tests/test_session.py
@@ -20,10 +20,11 @@ import unittest
import unittest.mock
from pyspark import SparkConf, SparkContext
+from pyspark.errors import PySparkRuntimeError
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import col
from pyspark.testing.sqlutils import ReusedSQLTestCase
-from pyspark.testing.utils import PySparkTestCase
+from pyspark.testing.utils import PySparkTestCase, PySparkErrorTestUtils
class SparkSessionTests(ReusedSQLTestCase):
@@ -259,7 +260,7 @@ class SparkSessionTests5(unittest.TestCase):
self.assertIs(SQLContext.getOrCreate(self.sc)._sc, self.sc)
-class SparkSessionBuilderTests(unittest.TestCase):
+class SparkSessionBuilderTests(unittest.TestCase, PySparkErrorTestUtils):
def test_create_spark_context_first_then_spark_session(self):
sc = None
session = None
@@ -352,6 +353,31 @@ class SparkSessionBuilderTests(unittest.TestCase):
if session is not None:
session.stop()
+ def test_create_spark_context_with_invalid_configs(self):
+ with self.assertRaises(PySparkRuntimeError) as pe1:
+ SparkSession.builder.config(map={"spark.master": "x",
"spark.remote": "y"})
+
+ self.check_error(
+ exception=pe1.exception,
+ error_class="CANNOT_CONFIGURE_SPARK_CONNECT_MASTER",
+ message_parameters={"master_url": "x", "connect_url": "y"},
+ )
+
+ with unittest.mock.patch.dict(
+ "os.environ", {"SPARK_REMOTE": "remote_url", "SPARK_LOCAL_REMOTE":
"true"}
+ ):
+ with self.assertRaises(PySparkRuntimeError) as pe2:
+ SparkSession.builder.config("spark.remote",
"different_remote_url")
+
+ self.check_error(
+ exception=pe2.exception,
+ error_class="CANNOT_CONFIGURE_SPARK_CONNECT",
+ message_parameters={
+ "existing_url": "remote_url",
+ "new_url": "different_remote_url",
+ },
+ )
+
class SparkExtensionsTest(unittest.TestCase):
# These tests are separate because it uses 'spark.sql.extensions' which is
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]