Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d68198d26 -> 6588e007e


[SPARK-22221][DOCS] Adding User Documentation for Arrow

## What changes were proposed in this pull request?

Adding user facing documentation for working with Arrow in Spark

Author: Bryan Cutler <cutl...@gmail.com>
Author: Li Jin <ice.xell...@gmail.com>
Author: hyukjinkwon <gurwls...@gmail.com>

Closes #19575 from BryanCutler/arrow-user-docs-SPARK-2221.

(cherry picked from commit 0d60b3213fe9a7ae5e9b208639f92011fdb2ca32)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6588e007
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6588e007
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6588e007

Branch: refs/heads/branch-2.3
Commit: 6588e007e8e10da7cc9771451eeb4d3a2bdc6e0e
Parents: d68198d
Author: Bryan Cutler <cutl...@gmail.com>
Authored: Mon Jan 29 10:25:25 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon Jan 29 10:25:54 2018 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md         | 134 ++++++++++++++++++++++++++++-
 examples/src/main/python/sql/arrow.py | 129 +++++++++++++++++++++++++++
 2 files changed, 262 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6588e007/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 502c0a8..d49c8d8 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1640,6 +1640,138 @@ Configuration of Hive is done by placing your 
`hive-site.xml`, `core-site.xml` a
 You may run `./bin/spark-sql --help` for a complete list of all available
 options.
 
+# PySpark Usage Guide for Pandas with Apache Arrow
+
+## Apache Arrow in Spark
+
+Apache Arrow is an in-memory columnar data format that is used in Spark to 
efficiently transfer
+data between JVM and Python processes. This currently is most beneficial to 
Python users that
+work with Pandas/NumPy data. Its usage is not automatic and might require some 
minor
+changes to configuration or code to take full advantage and ensure 
compatibility. This guide will
+give a high-level description of how to use Arrow in Spark and highlight any 
differences when
+working with Arrow-enabled data.
+
+### Ensure PyArrow Installed
+
+If you install PySpark using pip, then PyArrow can be brought in as an extra 
dependency of the
+SQL module with the command `pip install pyspark[sql]`. Otherwise, you must 
ensure that PyArrow
+is installed and available on all cluster nodes. The current supported version 
is 0.8.0.
+You can install using pip or conda from the conda-forge channel. See PyArrow
+[installation](https://arrow.apache.org/docs/python/install.html) for details.
+
+## Enabling for Conversion to/from Pandas
+
+Arrow is available as an optimization when converting a Spark DataFrame to a 
Pandas DataFrame
+using the call `toPandas()` and when creating a Spark DataFrame from a Pandas 
DataFrame with
+`createDataFrame(pandas_df)`. To use Arrow when executing these calls, users 
need to first set
+the Spark configuration 'spark.sql.execution.arrow.enabled' to 'true'. This is 
disabled by default.
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example dataframe_with_arrow python/sql/arrow.py %}
+</div>
+</div>
+
+Using the above optimizations with Arrow will produce the same results as when 
Arrow is not
+enabled. Note that even with Arrow, `toPandas()` results in the collection of 
all records in the
+DataFrame to the driver program and should be done on a small subset of the 
data. Not all Spark
+data types are currently supported and an error can be raised if a column has 
an unsupported type,
+see [Supported Types](#supported-sql-arrow-types). If an error occurs during 
`createDataFrame()`,
+Spark will fall back to create the DataFrame without Arrow.
+
+## Pandas UDFs (a.k.a. Vectorized UDFs)
+
+Pandas UDFs are user defined functions that are executed by Spark using Arrow 
to transfer data and
+Pandas to work with the data. A Pandas UDF is defined using the keyword 
`pandas_udf` as a decorator
+or to wrap the function, no additional configuration is required. Currently, 
there are two types of
+Pandas UDF: Scalar and Group Map.
+
+### Scalar
+
+Scalar Pandas UDFs are used for vectorizing scalar operations. They can be 
used with functions such
+as `select` and `withColumn`. The Python function should take `pandas.Series` 
as inputs and return
+a `pandas.Series` of the same length. Internally, Spark will execute a Pandas 
UDF by splitting
+columns into batches and calling the function for each batch as a subset of 
the data, then
+concatenating the results together.
+
+The following example shows how to create a scalar Pandas UDF that computes 
the product of 2 columns.
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example scalar_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+
+### Group Map
+Group map Pandas UDFs are used with `groupBy().apply()` which implements the 
"split-apply-combine" pattern.
+Split-apply-combine consists of three steps:
+* Split the data into groups by using `DataFrame.groupBy`.
+* Apply a function on each group. The input and output of the function are 
both `pandas.DataFrame`. The
+  input data contains all the rows and columns for each group.
+* Combine the results into a new `DataFrame`.
+
+To use `groupBy().apply()`, the user needs to define the following:
+* A Python function that defines the computation for each group.
+* A `StructType` object or a string that defines the schema of the output 
`DataFrame`.
+
+Note that all data for a group will be loaded into memory before the function 
is applied. This can
+lead to out of memory exceptons, especially if the group sizes are skewed. The 
configuration for
+[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and 
it is up to the user
+to ensure that the grouped data will fit into the available memory.
+
+The following example shows how to use `groupby().apply()` to subtract the 
mean from each value in the group.
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example group_map_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 and
+[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
+
+## Usage Notes
+
+### Supported SQL Types
+
+Currently, all Spark SQL data types are supported by Arrow-based conversion 
except `MapType`,
+`ArrayType` of `TimestampType`, and nested `StructType`.
+
+### Setting Arrow Batch Size
+
+Data partitions in Spark are converted into Arrow record batches, which can 
temporarily lead to
+high memory usage in the JVM. To avoid possible out of memory exceptions, the 
size of the Arrow
+record batches can be adjusted by setting the conf 
"spark.sql.execution.arrow.maxRecordsPerBatch"
+to an integer that will determine the maximum number of rows for each batch. 
The default value is
+10,000 records per batch. If the number of columns is large, the value should 
be adjusted
+accordingly. Using this limit, each data partition will be made into 1 or more 
record batches for
+processing.
+
+### Timestamp with Time Zone Semantics
+
+Spark internally stores timestamps as UTC values, and timestamp data that is 
brought in without
+a specified time zone is converted as local time to UTC with microsecond 
resolution. When timestamp
+data is exported or displayed in Spark, the session time zone is used to 
localize the timestamp
+values. The session time zone is set with the configuration 
'spark.sql.session.timeZone' and will
+default to the JVM system local time zone if not set. Pandas uses a 
`datetime64` type with nanosecond
+resolution, `datetime64[ns]`, with optional time zone on a per-column basis.
+
+When timestamp data is transferred from Spark to Pandas it will be converted 
to nanoseconds
+and each column will be converted to the Spark session time zone then 
localized to that time
+zone, which removes the time zone and displays values as local time. This will 
occur
+when calling `toPandas()` or `pandas_udf` with timestamp columns.
+
+When timestamp data is transferred from Pandas to Spark, it will be converted 
to UTC microseconds. This
+occurs when calling `createDataFrame` with a Pandas DataFrame or when 
returning a timestamp from a
+`pandas_udf`. These conversions are done automatically to ensure Spark will 
have data in the
+expected format, so it is not necessary to do any of these conversions 
yourself. Any nanosecond
+values will be truncated.
+
+Note that a standard UDF (non-Pandas) will load timestamp data as Python 
datetime objects, which is
+different than a Pandas timestamp. It is recommended to use Pandas time series 
functionality when
+working with timestamps in `pandas_udf`s to get the best performance, see
+[here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for 
details.
+
 # Migration Guide
 
 ## Upgrading From Spark SQL 2.2 to 2.3
@@ -1788,7 +1920,7 @@ options.
     Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally 
does not cover all other combinations of scales and precisions because 
currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 
1.1 is inferred as double type.
   - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas 
related functionalities, such as `toPandas`, `createDataFrame` from Pandas 
DataFrame, etc.
   - In PySpark, the behavior of timestamp values for Pandas related 
functionalities was changed to respect session timezone. If you want to use the 
old behavior, you need to set a configuration 
`spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See 
[SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
-  - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces 
nulls with booleans. In prior Spark versions, PySpark just ignores it and 
returns the original Dataset/DataFrame.  
+  - In PySpark, `na.fill()` or `fillna` also accepts boolean and replaces 
nulls with booleans. In prior Spark versions, PySpark just ignores it and 
returns the original Dataset/DataFrame.
   - Since Spark 2.3, when either broadcast hash join or broadcast nested loop 
join is applicable, we prefer to broadcasting the table that is explicitly 
specified in a broadcast hint. For details, see the section [Broadcast 
Hint](#broadcast-hint-for-sql-queries) and 
[SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489).
   - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns 
an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it 
always returns as a string despite of input types. To keep the old behavior, 
set `spark.sql.function.concatBinaryAsString` to `true`.
   - Since Spark 2.3, when all inputs are binary, SQL `elt()` returns an output 
as binary. Otherwise, it returns as a string. Until Spark 2.3, it always 
returns as a string despite of input types. To keep the old behavior, set 
`spark.sql.function.eltOutputAsString` to `true`.

http://git-wip-us.apache.org/repos/asf/spark/blob/6588e007/examples/src/main/python/sql/arrow.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/arrow.py 
b/examples/src/main/python/sql/arrow.py
new file mode 100644
index 0000000..6c0028b
--- /dev/null
+++ b/examples/src/main/python/sql/arrow.py
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple example demonstrating Arrow in Spark.
+Run with:
+  ./bin/spark-submit examples/src/main/python/sql/arrow.py
+"""
+
+from __future__ import print_function
+
+from pyspark.sql import SparkSession
+from pyspark.sql.utils import require_minimum_pandas_version, 
require_minimum_pyarrow_version
+
+require_minimum_pandas_version()
+require_minimum_pyarrow_version()
+
+
+def dataframe_with_arrow_example(spark):
+    # $example on:dataframe_with_arrow$
+    import numpy as np
+    import pandas as pd
+
+    # Enable Arrow-based columnar data transfers
+    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
+
+    # Generate a Pandas DataFrame
+    pdf = pd.DataFrame(np.random.rand(100, 3))
+
+    # Create a Spark DataFrame from a Pandas DataFrame using Arrow
+    df = spark.createDataFrame(pdf)
+
+    # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
+    result_pdf = df.select("*").toPandas()
+    # $example off:dataframe_with_arrow$
+    print("Pandas DataFrame result statistics:\n%s\n" % 
str(result_pdf.describe()))
+
+
+def scalar_pandas_udf_example(spark):
+    # $example on:scalar_pandas_udf$
+    import pandas as pd
+
+    from pyspark.sql.functions import col, pandas_udf
+    from pyspark.sql.types import LongType
+
+    # Declare the function and create the UDF
+    def multiply_func(a, b):
+        return a * b
+
+    multiply = pandas_udf(multiply_func, returnType=LongType())
+
+    # The function for a pandas_udf should be able to execute with local 
Pandas data
+    x = pd.Series([1, 2, 3])
+    print(multiply_func(x, x))
+    # 0    1
+    # 1    4
+    # 2    9
+    # dtype: int64
+
+    # Create a Spark DataFrame, 'spark' is an existing SparkSession
+    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
+
+    # Execute function as a Spark vectorized UDF
+    df.select(multiply(col("x"), col("x"))).show()
+    # +-------------------+
+    # |multiply_func(x, x)|
+    # +-------------------+
+    # |                  1|
+    # |                  4|
+    # |                  9|
+    # +-------------------+
+    # $example off:scalar_pandas_udf$
+
+
+def group_map_pandas_udf_example(spark):
+    # $example on:group_map_pandas_udf$
+    from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+    df = spark.createDataFrame(
+        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+        ("id", "v"))
+
+    @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+    def substract_mean(pdf):
+        # pdf is a pandas.DataFrame
+        v = pdf.v
+        return pdf.assign(v=v - v.mean())
+
+    df.groupby("id").apply(substract_mean).show()
+    # +---+----+
+    # | id|   v|
+    # +---+----+
+    # |  1|-0.5|
+    # |  1| 0.5|
+    # |  2|-3.0|
+    # |  2|-1.0|
+    # |  2| 4.0|
+    # +---+----+
+    # $example off:group_map_pandas_udf$
+
+
+if __name__ == "__main__":
+    spark = SparkSession \
+        .builder \
+        .appName("Python Arrow-in-Spark example") \
+        .getOrCreate()
+
+    print("Running Pandas to/from conversion example")
+    dataframe_with_arrow_example(spark)
+    print("Running pandas_udf scalar example")
+    scalar_pandas_udf_example(spark)
+    print("Running pandas_udf group map example")
+    group_map_pandas_udf_example(spark)
+
+    spark.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to