Repository: spark
Updated Branches:
  refs/heads/master c1839c991 -> 0903a185c


[SPARK-15084][PYTHON][SQL] Use builder pattern to create SparkSession in 
PySpark.

## What changes were proposed in this pull request?

This is a python port of corresponding Scala builder pattern code. `sql.py` is 
modified as a target example case.

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #12860 from dongjoon-hyun/SPARK-15084.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0903a185
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0903a185
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0903a185

Branch: refs/heads/master
Commit: 0903a185c7ebc57c75301a27d215b08efd347f99
Parents: c1839c9
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Tue May 3 18:05:40 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue May 3 18:05:40 2016 -0700

----------------------------------------------------------------------
 examples/src/main/python/sql.py | 35 ++++++--------
 python/pyspark/sql/session.py   | 91 +++++++++++++++++++++++++++++++++++-
 2 files changed, 105 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0903a185/examples/src/main/python/sql.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql.py b/examples/src/main/python/sql.py
index 2c18875..ea6a22d 100644
--- a/examples/src/main/python/sql.py
+++ b/examples/src/main/python/sql.py
@@ -20,33 +20,28 @@ from __future__ import print_function
 import os
 import sys
 
-from pyspark import SparkContext
-from pyspark.sql import SQLContext
+from pyspark.sql import SparkSession
 from pyspark.sql.types import Row, StructField, StructType, StringType, 
IntegerType
 
 
 if __name__ == "__main__":
-    sc = SparkContext(appName="PythonSQL")
-    sqlContext = SQLContext(sc)
+    spark = SparkSession.builder.appName("PythonSQL").getOrCreate()
 
-    # RDD is created from a list of rows
-    some_rdd = sc.parallelize([Row(name="John", age=19),
-                              Row(name="Smith", age=23),
-                              Row(name="Sarah", age=18)])
-    # Infer schema from the first row, create a DataFrame and print the schema
-    some_df = sqlContext.createDataFrame(some_rdd)
+    # A list of Rows. Infer schema from the first row, create a DataFrame and 
print the schema
+    rows = [Row(name="John", age=19), Row(name="Smith", age=23), 
Row(name="Sarah", age=18)]
+    some_df = spark.createDataFrame(rows)
     some_df.printSchema()
 
-    # Another RDD is created from a list of tuples
-    another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
+    # A list of tuples
+    tuples = [("John", 19), ("Smith", 23), ("Sarah", 18)]
     # Schema with two fields - person_name and person_age
     schema = StructType([StructField("person_name", StringType(), False),
                         StructField("person_age", IntegerType(), False)])
     # Create a DataFrame by applying the schema to the RDD and print the schema
-    another_df = sqlContext.createDataFrame(another_rdd, schema)
+    another_df = spark.createDataFrame(tuples, schema)
     another_df.printSchema()
     # root
-    #  |-- age: integer (nullable = true)
+    #  |-- age: long (nullable = true)
     #  |-- name: string (nullable = true)
 
     # A JSON dataset is pointed to by path.
@@ -57,7 +52,7 @@ if __name__ == "__main__":
     else:
         path = sys.argv[1]
     # Create a DataFrame from the file(s) pointed to by path
-    people = sqlContext.jsonFile(path)
+    people = spark.read.json(path)
     # root
     #  |-- person_name: string (nullable = false)
     #  |-- person_age: integer (nullable = false)
@@ -65,16 +60,16 @@ if __name__ == "__main__":
     # The inferred schema can be visualized using the printSchema() method.
     people.printSchema()
     # root
-    #  |-- age: IntegerType
-    #  |-- name: StringType
+    #  |-- age: long (nullable = true)
+    #  |-- name: string (nullable = true)
 
     # Register this DataFrame as a table.
-    people.registerAsTable("people")
+    people.registerTempTable("people")
 
     # SQL statements can be run by using the sql methods provided by sqlContext
-    teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND 
age <= 19")
+    teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 
19")
 
     for each in teenagers.collect():
         print(each[0])
 
-    sc.stop()
+    spark.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/0903a185/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 35c36b4..fb3e318 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -19,6 +19,7 @@ from __future__ import print_function
 import sys
 import warnings
 from functools import reduce
+from threading import RLock
 
 if sys.version >= '3':
     basestring = unicode = str
@@ -58,16 +59,98 @@ def _monkey_patch_RDD(sparkSession):
 
 
 class SparkSession(object):
-    """Main entry point for Spark SQL functionality.
+    """The entry point to programming Spark with the Dataset and DataFrame API.
 
     A SparkSession can be used create :class:`DataFrame`, register 
:class:`DataFrame` as
     tables, execute SQL over tables, cache tables, and read parquet files.
+    To create a SparkSession, use the following builder pattern:
+
+    >>> spark = SparkSession.builder \
+            .master("local") \
+            .appName("Word Count") \
+            .config("spark.some.config.option", "some-value") \
+            .getOrCreate()
 
     :param sparkContext: The :class:`SparkContext` backing this SparkSession.
     :param jsparkSession: An optional JVM Scala SparkSession. If set, we do 
not instantiate a new
         SparkSession in the JVM, instead we make all calls to this object.
     """
 
+    class Builder(object):
+        """Builder for :class:`SparkSession`.
+        """
+
+        _lock = RLock()
+        _options = {}
+
+        @since(2.0)
+        def config(self, key=None, value=None, conf=None):
+            """Sets a config option. Options set using this method are 
automatically propagated to
+            both :class:`SparkConf` and :class:`SparkSession`'s own 
configuration.
+
+            For an existing SparkConf, use `conf` parameter.
+            >>> from pyspark.conf import SparkConf
+            >>> SparkSession.builder.config(conf=SparkConf())
+            <pyspark.sql.session...
+
+            For a (key, value) pair, you can omit parameter names.
+            >>> SparkSession.builder.config("spark.some.config.option", 
"some-value")
+            <pyspark.sql.session...
+
+            :param key: a key name string for configuration property
+            :param value: a value for configuration property
+            :param conf: an instance of :class:`SparkConf`
+            """
+            with self._lock:
+                if conf is None:
+                    self._options[key] = str(value)
+                else:
+                    for (k, v) in conf.getAll():
+                        self._options[k] = v
+                return self
+
+        @since(2.0)
+        def master(self, master):
+            """Sets the Spark master URL to connect to, such as "local" to run 
locally, "local[4]"
+            to run locally with 4 cores, or "spark://master:7077" to run on a 
Spark standalone
+            cluster.
+
+            :param master: a url for spark master
+            """
+            return self.config("spark.master", master)
+
+        @since(2.0)
+        def appName(self, name):
+            """Sets a name for the application, which will be shown in the 
Spark web UI.
+
+            :param name: an application name
+            """
+            return self.config("spark.app.name", name)
+
+        @since(2.0)
+        def enableHiveSupport(self):
+            """Enables Hive support, including connectivity to a persistent 
Hive metastore, support
+            for Hive serdes, and Hive user-defined functions.
+            """
+            return self.config("spark.sql.catalogImplementation", "hive")
+
+        @since(2.0)
+        def getOrCreate(self):
+            """Gets an existing :class:`SparkSession` or, if there is no 
existing one, creates a new
+            one based on the options set in this builder.
+            """
+            with self._lock:
+                from pyspark.conf import SparkConf
+                from pyspark.context import SparkContext
+                from pyspark.sql.context import SQLContext
+                sparkConf = SparkConf()
+                for key, value in self._options.items():
+                    sparkConf.set(key, value)
+                sparkContext = SparkContext.getOrCreate(sparkConf)
+                return SQLContext.getOrCreate(sparkContext).sparkSession
+
+    builder = Builder()
+
     _instantiatedContext = None
 
     @ignore_unicode_prefix
@@ -445,6 +528,12 @@ class SparkSession(object):
         """
         return DataFrameReader(self._wrapped)
 
+    @since(2.0)
+    def stop(self):
+        """Stop the underlying :class:`SparkContext`.
+        """
+        self._sc.stop()
+
 
 def _test():
     import os


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to