Repository: spark Updated Branches: refs/heads/master 5def10e61 -> ee214ef3a
[SPARK-25525][SQL][PYSPARK] Do not update conf for existing SparkContext in SparkSession.getOrCreate. ## What changes were proposed in this pull request? In [SPARK-20946](https://issues.apache.org/jira/browse/SPARK-20946), we modified `SparkSession.getOrCreate` to not update conf for existing `SparkContext` because `SparkContext` is shared by all sessions. We should not update it in PySpark side as well. ## How was this patch tested? Added tests. Closes #22545 from ueshin/issues/SPARK-25525/not_update_existing_conf. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee214ef3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee214ef3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee214ef3 Branch: refs/heads/master Commit: ee214ef3a0ec36c4aae5040778d41c376df3da19 Parents: 5def10e Author: Takuya UESHIN <ues...@databricks.com> Authored: Thu Sep 27 12:37:03 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Thu Sep 27 12:37:03 2018 +0800 ---------------------------------------------------------------------- python/pyspark/sql/session.py | 14 ++++-------- python/pyspark/sql/tests.py | 46 +++++++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ee214ef3/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index a5e2872..079af8c 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -156,7 +156,7 @@ class SparkSession(object): default. >>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate() - >>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1" + >>> s1.conf.get("k1") == "v1" True In case an existing SparkSession is returned, the config options specified @@ -179,19 +179,13 @@ class SparkSession(object): sparkConf = SparkConf() for key, value in self._options.items(): sparkConf.set(key, value) - sc = SparkContext.getOrCreate(sparkConf) # This SparkContext may be an existing one. - for key, value in self._options.items(): - # we need to propagate the confs - # before we create the SparkSession. Otherwise, confs like - # warehouse path and metastore url will not be set correctly ( - # these confs cannot be changed once the SparkSession is created). - sc._conf.set(key, value) + sc = SparkContext.getOrCreate(sparkConf) + # Do not update `SparkConf` for existing `SparkContext`, as it's shared + # by all sessions. session = SparkSession(sc) for key, value in self._options.items(): session._jsparkSession.sessionState().conf().setConfString(key, value) - for key, value in self._options.items(): - session.sparkContext._conf.set(key, value) return session builder = Builder() http://git-wip-us.apache.org/repos/asf/spark/blob/ee214ef3/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 74642d4..64a7ceb 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -80,7 +80,7 @@ _have_pandas = _pandas_requirement_message is None _have_pyarrow = _pyarrow_requirement_message is None _test_compiled = _test_not_compiled_message is None -from pyspark import SparkContext +from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier @@ -283,6 +283,50 @@ class DataTypeTests(unittest.TestCase): self.assertRaises(ValueError, lambda: row_class(1, 2, 3)) +class SparkSessionBuilderTests(unittest.TestCase): + + def test_create_spark_context_first_then_spark_session(self): + sc = None + session = None + try: + conf = SparkConf().set("key1", "value1") + sc = SparkContext('local[4]', "SessionBuilderTests", conf=conf) + session = SparkSession.builder.config("key2", "value2").getOrCreate() + + self.assertEqual(session.conf.get("key1"), "value1") + self.assertEqual(session.conf.get("key2"), "value2") + self.assertEqual(session.sparkContext, sc) + + self.assertFalse(sc.getConf().contains("key2")) + self.assertEqual(sc.getConf().get("key1"), "value1") + finally: + if session is not None: + session.stop() + if sc is not None: + sc.stop() + + def test_another_spark_session(self): + session1 = None + session2 = None + try: + session1 = SparkSession.builder.config("key1", "value1").getOrCreate() + session2 = SparkSession.builder.config("key2", "value2").getOrCreate() + + self.assertEqual(session1.conf.get("key1"), "value1") + self.assertEqual(session2.conf.get("key1"), "value1") + self.assertEqual(session1.conf.get("key2"), "value2") + self.assertEqual(session2.conf.get("key2"), "value2") + self.assertEqual(session1.sparkContext, session2.sparkContext) + + self.assertEqual(session1.sparkContext.getConf().get("key1"), "value1") + self.assertFalse(session1.sparkContext.getConf().contains("key2")) + finally: + if session1 is not None: + session1.stop() + if session2 is not None: + session2.stop() + + class SQLTests(ReusedSQLTestCase): @classmethod --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org