Repository: spark Updated Branches: refs/heads/branch-2.0 b349237e4 -> 4c5e16f58
[SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf ## What changes were proposed in this pull request? When we create a SparkSession at the Python side, it is possible that a SparkContext has been created. For this case, we need to set configs of the SparkSession builder to the Scala SparkContext's SparkConf (we need to do so because conf changes on a active Python SparkContext will not be propagated to the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support is not enabled even if enableHiveSupport is called). ## How was this patch tested? New tests and manual tests. Author: Yin Huai <yh...@databricks.com> Closes #13931 from yhuai/SPARK-16224. (cherry picked from commit 0923c4f5676691e28e70ecb05890e123540b91f0) Signed-off-by: Davies Liu <davies....@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c5e16f5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c5e16f5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c5e16f5 Branch: refs/heads/branch-2.0 Commit: 4c5e16f58043b3103bbd59c5fa8fec4c411e5e11 Parents: b349237 Author: Yin Huai <yh...@databricks.com> Authored: Tue Jun 28 07:54:44 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Tue Jun 28 07:54:58 2016 -0700 ---------------------------------------------------------------------- python/pyspark/context.py | 2 ++ python/pyspark/sql/session.py | 7 +++++++ python/pyspark/sql/tests.py | 43 +++++++++++++++++++++++++++++++++++++- python/pyspark/tests.py | 8 +++++++ 4 files changed, 59 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7217a99..6e9f24e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -166,6 +166,8 @@ class SparkContext(object): # Create the Java SparkContext through Py4J self._jsc = jsc or self._initialize_context(self._conf._jconf) + # Reset the SparkConf to the one actually used by the SparkContext in JVM. + self._conf = SparkConf(_jconf=self._jsc.sc().conf()) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 0c8024e..b4152a3 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -165,6 +165,13 @@ class SparkSession(object): for key, value in self._options.items(): sparkConf.set(key, value) sc = SparkContext.getOrCreate(sparkConf) + # This SparkContext may be an existing one. + for key, value in self._options.items(): + # we need to propagate the confs + # before we create the SparkSession. Otherwise, confs like + # warehouse path and metastore url will not be set correctly ( + # these confs cannot be changed once the SparkSession is created). + sc._conf.set(key, value) session = SparkSession(sc) for key, value in self._options.items(): session.conf.set(key, value) http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3f56411..f863485 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -22,6 +22,7 @@ individual modules. """ import os import sys +import subprocess import pydoc import shutil import tempfile @@ -48,7 +49,7 @@ else: from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type -from pyspark.tests import ReusedPySparkTestCase +from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2 from pyspark.sql.window import Window from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException @@ -1619,6 +1620,46 @@ class SQLTests(ReusedPySparkTestCase): lambda: spark.catalog.uncacheTable("does_not_exist")) +class HiveSparkSubmitTests(SparkSubmitTests): + + def test_hivecontext(self): + # This test checks that HiveContext is using Hive metastore (SPARK-16224). + # It sets a metastore url and checks if there is a derby dir created by + # Hive metastore. If this derby dir exists, HiveContext is using + # Hive metastore. + metastore_path = os.path.join(tempfile.mkdtemp(), "spark16224_metastore_db") + metastore_URL = "jdbc:derby:;databaseName=" + metastore_path + ";create=true" + hive_site_dir = os.path.join(self.programDir, "conf") + hive_site_file = self.createTempFile("hive-site.xml", (""" + |<configuration> + | <property> + | <name>javax.jdo.option.ConnectionURL</name> + | <value>%s</value> + | </property> + |</configuration> + """ % metastore_URL).lstrip(), "conf") + script = self.createTempFile("test.py", """ + |import os + | + |from pyspark.conf import SparkConf + |from pyspark.context import SparkContext + |from pyspark.sql import HiveContext + | + |conf = SparkConf() + |sc = SparkContext(conf=conf) + |hive_context = HiveContext(sc) + |print(hive_context.sql("show databases").collect()) + """) + proc = subprocess.Popen( + [self.sparkSubmit, "--master", "local-cluster[1,1,1024]", + "--driver-class-path", hive_site_dir, script], + stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("default", out.decode('utf-8')) + self.assertTrue(os.path.exists(metastore_path)) + + class HiveContextSQLTests(ReusedPySparkTestCase): @classmethod http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 97ea39d..b1e92e1 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -1921,6 +1921,14 @@ class ContextTests(unittest.TestCase): post_parallalize_temp_files = os.listdir(sc._temp_dir) self.assertEqual(temp_files, post_parallalize_temp_files) + def test_set_conf(self): + # This is for an internal use case. When there is an existing SparkContext, + # SparkSession's builder needs to set configs into SparkContext's conf. + sc = SparkContext() + sc._conf.set("spark.test.SPARK16224", "SPARK16224") + self.assertEqual(sc._jsc.sc().conf().get("spark.test.SPARK16224"), "SPARK16224") + sc.stop() + def test_stop(self): sc = SparkContext() self.assertNotEqual(SparkContext._active_spark_context, None) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org