Repository: spark
Updated Branches:
  refs/heads/branch-2.0 b349237e4 -> 4c5e16f58


[SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to 
the existing Scala SparkContext's SparkConf

## What changes were proposed in this pull request?
When we create a SparkSession at the Python side, it is possible that a 
SparkContext has been created. For this case, we need to set configs of the 
SparkSession builder to the Scala SparkContext's SparkConf (we need to do so 
because conf changes on a active Python SparkContext will not be propagated to 
the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support 
is not enabled even if enableHiveSupport is called).

## How was this patch tested?
New tests and manual tests.

Author: Yin Huai <yh...@databricks.com>

Closes #13931 from yhuai/SPARK-16224.

(cherry picked from commit 0923c4f5676691e28e70ecb05890e123540b91f0)
Signed-off-by: Davies Liu <davies....@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c5e16f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c5e16f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c5e16f5

Branch: refs/heads/branch-2.0
Commit: 4c5e16f58043b3103bbd59c5fa8fec4c411e5e11
Parents: b349237
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Jun 28 07:54:44 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Tue Jun 28 07:54:58 2016 -0700

----------------------------------------------------------------------
 python/pyspark/context.py     |  2 ++
 python/pyspark/sql/session.py |  7 +++++++
 python/pyspark/sql/tests.py   | 43 +++++++++++++++++++++++++++++++++++++-
 python/pyspark/tests.py       |  8 +++++++
 4 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 7217a99..6e9f24e 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -166,6 +166,8 @@ class SparkContext(object):
 
         # Create the Java SparkContext through Py4J
         self._jsc = jsc or self._initialize_context(self._conf._jconf)
+        # Reset the SparkConf to the one actually used by the SparkContext in 
JVM.
+        self._conf = SparkConf(_jconf=self._jsc.sc().conf())
 
         # Create a single Accumulator in Java that we'll send all our updates 
through;
         # they will be passed back to us through a TCP server

http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 0c8024e..b4152a3 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -165,6 +165,13 @@ class SparkSession(object):
                     for key, value in self._options.items():
                         sparkConf.set(key, value)
                     sc = SparkContext.getOrCreate(sparkConf)
+                    # This SparkContext may be an existing one.
+                    for key, value in self._options.items():
+                        # we need to propagate the confs
+                        # before we create the SparkSession. Otherwise, confs 
like
+                        # warehouse path and metastore url will not be set 
correctly (
+                        # these confs cannot be changed once the SparkSession 
is created).
+                        sc._conf.set(key, value)
                     session = SparkSession(sc)
                 for key, value in self._options.items():
                     session.conf.set(key, value)

http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3f56411..f863485 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -22,6 +22,7 @@ individual modules.
 """
 import os
 import sys
+import subprocess
 import pydoc
 import shutil
 import tempfile
@@ -48,7 +49,7 @@ else:
 from pyspark.sql import SparkSession, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
-from pyspark.tests import ReusedPySparkTestCase
+from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests
 from pyspark.sql.functions import UserDefinedFunction, sha2
 from pyspark.sql.window import Window
 from pyspark.sql.utils import AnalysisException, ParseException, 
IllegalArgumentException
@@ -1619,6 +1620,46 @@ class SQLTests(ReusedPySparkTestCase):
             lambda: spark.catalog.uncacheTable("does_not_exist"))
 
 
+class HiveSparkSubmitTests(SparkSubmitTests):
+
+    def test_hivecontext(self):
+        # This test checks that HiveContext is using Hive metastore 
(SPARK-16224).
+        # It sets a metastore url and checks if there is a derby dir created by
+        # Hive metastore. If this derby dir exists, HiveContext is using
+        # Hive metastore.
+        metastore_path = os.path.join(tempfile.mkdtemp(), 
"spark16224_metastore_db")
+        metastore_URL = "jdbc:derby:;databaseName=" + metastore_path + 
";create=true"
+        hive_site_dir = os.path.join(self.programDir, "conf")
+        hive_site_file = self.createTempFile("hive-site.xml", ("""
+            |<configuration>
+            |  <property>
+            |  <name>javax.jdo.option.ConnectionURL</name>
+            |  <value>%s</value>
+            |  </property>
+            |</configuration>
+            """ % metastore_URL).lstrip(), "conf")
+        script = self.createTempFile("test.py", """
+            |import os
+            |
+            |from pyspark.conf import SparkConf
+            |from pyspark.context import SparkContext
+            |from pyspark.sql import HiveContext
+            |
+            |conf = SparkConf()
+            |sc = SparkContext(conf=conf)
+            |hive_context = HiveContext(sc)
+            |print(hive_context.sql("show databases").collect())
+            """)
+        proc = subprocess.Popen(
+            [self.sparkSubmit, "--master", "local-cluster[1,1,1024]",
+             "--driver-class-path", hive_site_dir, script],
+            stdout=subprocess.PIPE)
+        out, err = proc.communicate()
+        self.assertEqual(0, proc.returncode)
+        self.assertIn("default", out.decode('utf-8'))
+        self.assertTrue(os.path.exists(metastore_path))
+
+
 class HiveContextSQLTests(ReusedPySparkTestCase):
 
     @classmethod

http://git-wip-us.apache.org/repos/asf/spark/blob/4c5e16f5/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 97ea39d..b1e92e1 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1921,6 +1921,14 @@ class ContextTests(unittest.TestCase):
             post_parallalize_temp_files = os.listdir(sc._temp_dir)
             self.assertEqual(temp_files, post_parallalize_temp_files)
 
+    def test_set_conf(self):
+        # This is for an internal use case. When there is an existing 
SparkContext,
+        # SparkSession's builder needs to set configs into SparkContext's conf.
+        sc = SparkContext()
+        sc._conf.set("spark.test.SPARK16224", "SPARK16224")
+        self.assertEqual(sc._jsc.sc().conf().get("spark.test.SPARK16224"), 
"SPARK16224")
+        sc.stop()
+
     def test_stop(self):
         sc = SparkContext()
         self.assertNotEqual(SparkContext._active_spark_context, None)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to