This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ed6193a [SPARK-30722][PYTHON][DOCS] Update documentation for Pandas
UDF with Python type hints
ed6193a is described below
commit ed6193ad68a55a85f51f3ebda08f53cbcf023a24
Author: HyukjinKwon <[email protected]>
AuthorDate: Wed Feb 12 10:49:46 2020 +0900
[SPARK-30722][PYTHON][DOCS] Update documentation for Pandas UDF with Python
type hints
### What changes were proposed in this pull request?
This PR targets to document the Pandas UDF redesign with type hints
introduced at SPARK-28264.
Mostly self-describing; however, there are few things to note for reviewers.
1. This PR replace the existing documentation of pandas UDFs to the newer
redesign to promote the Python type hints. I added some words that Spark 3.0
still keeps the compatibility though.
2. This PR proposes to name non-pandas UDFs as "Pandas Function API"
3. SCALAR_ITER become two separate sections to reduce confusion:
- `Iterator[pd.Series]` -> `Iterator[pd.Series]`
- `Iterator[Tuple[pd.Series, ...]]` -> `Iterator[pd.Series]`
4. I removed some examples that look overkill to me.
5. I also removed some information in the doc, that seems duplicating or
too much.
### Why are the changes needed?
To document new redesign in pandas UDF.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests should cover.
Closes #27466 from HyukjinKwon/SPARK-30722.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit aa6a60530e63ab3bb8b117f8738973d1b26a2cc7)
Signed-off-by: HyukjinKwon <[email protected]>
---
dev/sparktestsupport/modules.py | 1 -
docs/sql-pyspark-pandas-with-arrow.md | 233 +++++++++-----
examples/src/main/python/sql/arrow.py | 258 ++++++++--------
python/pyspark/sql/pandas/functions.py | 538 ++++++++++++++-------------------
python/pyspark/sql/pandas/group_ops.py | 99 ++++--
python/pyspark/sql/pandas/map_ops.py | 6 +-
python/pyspark/sql/udf.py | 16 +-
7 files changed, 609 insertions(+), 542 deletions(-)
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 40f2ca2..391e4bb 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -364,7 +364,6 @@ pyspark_sql = Module(
"pyspark.sql.avro.functions",
"pyspark.sql.pandas.conversion",
"pyspark.sql.pandas.map_ops",
- "pyspark.sql.pandas.functions",
"pyspark.sql.pandas.group_ops",
"pyspark.sql.pandas.types",
"pyspark.sql.pandas.serializers",
diff --git a/docs/sql-pyspark-pandas-with-arrow.md
b/docs/sql-pyspark-pandas-with-arrow.md
index 7eb8a74..92a5157 100644
--- a/docs/sql-pyspark-pandas-with-arrow.md
+++ b/docs/sql-pyspark-pandas-with-arrow.md
@@ -35,7 +35,7 @@ working with Arrow-enabled data.
If you install PySpark using pip, then PyArrow can be brought in as an extra
dependency of the
SQL module with the command `pip install pyspark[sql]`. Otherwise, you must
ensure that PyArrow
-is installed and available on all cluster nodes. The current supported version
is 0.12.1.
+is installed and available on all cluster nodes. The current supported version
is 0.15.1+.
You can install using pip or conda from the conda-forge channel. See PyArrow
[installation](https://arrow.apache.org/docs/python/install.html) for details.
@@ -65,132 +65,216 @@ Spark will fall back to create the DataFrame without
Arrow.
## Pandas UDFs (a.k.a. Vectorized UDFs)
-Pandas UDFs are user defined functions that are executed by Spark using Arrow
to transfer data and
-Pandas to work with the data. A Pandas UDF is defined using the keyword
`pandas_udf` as a decorator
-or to wrap the function, no additional configuration is required. Currently,
there are two types of
-Pandas UDF: Scalar and Grouped Map.
+Pandas UDFs are user defined functions that are executed by Spark using
+Arrow to transfer data and Pandas to work with the data, which allows
vectorized operations. A Pandas
+UDF is defined using the `pandas_udf` as a decorator or to wrap the function,
and no additional
+configuration is required. A Pandas UDF behaves as a regular PySpark function
API in general.
-### Scalar
+Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From
Spark 3.0
+with Python 3.6+, you can also use [Python type
hints](https://www.python.org/dev/peps/pep-0484).
+Using Python type hints are preferred and using `PandasUDFType` will be
deprecated in
+the future release.
-Scalar Pandas UDFs are used for vectorizing scalar operations. They can be
used with functions such
-as `select` and `withColumn`. The Python function should take `pandas.Series`
as inputs and return
-a `pandas.Series` of the same length. Internally, Spark will execute a Pandas
UDF by splitting
-columns into batches and calling the function for each batch as a subset of
the data, then
-concatenating the results together.
+Note that the type hint should use `pandas.Series` in all cases but there is
one variant
+that `pandas.DataFrame` should be used for its input or output type hint
instead when the input
+or output column is of `StructType`. The following example shows a Pandas UDF
which takes long
+column, string column and struct column, and outputs a struct column. It
requires the function to
+specify the type hints of `pandas.Series` and `pandas.DataFrame` as below:
-The following example shows how to create a scalar Pandas UDF that computes
the product of 2 columns.
+<p>
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example ser_to_frame_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+</p>
+
+In the following sections, it describes the cominations of the supported type
hints. For simplicity,
+`pandas.DataFrame` variant is omitted.
+
+### Series to Series
+
+The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`.
+
+By using `pandas_udf` with the function having such type hints, it creates a
Pandas UDF where the given
+function takes one or more `pandas.Series` and outputs one `pandas.Series`.
The output of the function should
+always be of the same length as the input. Internally, PySpark will execute a
Pandas UDF by splitting
+columns into batches and calling the function for each batch as a subset of
the data, then concatenating
+the results together.
+
+The following example shows how to create this Pandas UDF that computes the
product of 2 columns.
<div class="codetabs">
<div data-lang="python" markdown="1">
-{% include_example scalar_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_ser_pandas_udf python/sql/arrow.py %}
</div>
</div>
-### Scalar Iterator
+For detailed usage, please see
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Iterator of Series to Iterator of Series
+
+The type hint can be expressed as `Iterator[pandas.Series]` ->
`Iterator[pandas.Series]`.
+
+By using `pandas_udf` with the function having such type hints, it creates a
Pandas UDF where the given
+function takes an iterator of `pandas.Series` and outputs an iterator of
`pandas.Series`. The output of each
+series from the function should always be of the same length as the input. In
this case, the created
+Pandas UDF requires one input column when the Pandas UDF is called. To use
multiple input columns,
+a different type hint is required. See Iterator of Multiple Series to Iterator
of Series.
+
+It is useful when the UDF execution requires initializing some states although
internally it works
+identically as Series to Series case. The pseudocode below illustrates the
example.
+
+{% highlight python %}
+@pandas_udf("long")
+def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+ # Do some expensive initialization with a state
+ state = very_expensive_initialization()
+ for x in iterator:
+ # Use that state for whole iterator.
+ yield calculate_with_state(x, state)
-Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF
above except that the
-underlying Python function takes an iterator of batches as input instead of a
single batch and,
-instead of returning a single output batch, it yields output batches or
returns an iterator of
-output batches.
-It is useful when the UDF execution requires initializing some states, e.g.,
loading an machine
-learning model file to apply inference to every input batch.
+df.select(calculate("value")).show()
+{% endhighlight %}
-The following example shows how to create scalar iterator Pandas UDFs:
+The following example shows how to create this Pandas UDF:
<div class="codetabs">
<div data-lang="python" markdown="1">
-{% include_example scalar_iter_pandas_udf python/sql/arrow.py %}
+{% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %}
</div>
</div>
-### Grouped Map
-Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the
"split-apply-combine" pattern.
-Split-apply-combine consists of three steps:
-* Split the data into groups by using `DataFrame.groupBy`.
-* Apply a function on each group. The input and output of the function are
both `pandas.DataFrame`. The
- input data contains all the rows and columns for each group.
-* Combine the results into a new `DataFrame`.
+For detailed usage, please see
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
-To use `groupBy().apply()`, the user needs to define the following:
-* A Python function that defines the computation for each group.
-* A `StructType` object or a string that defines the schema of the output
`DataFrame`.
+### Iterator of Multiple Series to Iterator of Series
-The column labels of the returned `pandas.DataFrame` must either match the
field names in the
-defined output schema if specified as strings, or match the field data types
by position if not
-strings, e.g. integer indices. See
[pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
-on how to label columns when constructing a `pandas.DataFrame`.
+The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` ->
`Iterator[pandas.Series]`.
-Note that all data for a group will be loaded into memory before the function
is applied. This can
-lead to out of memory exceptions, especially if the group sizes are skewed.
The configuration for
-[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and
it is up to the user
-to ensure that the grouped data will fit into the available memory.
+By using `pandas_udf` with the function having such type hints, it creates a
Pandas UDF where the
+given function takes an iterator of a tuple of multiple `pandas.Series` and
outputs an iterator of `pandas.Series`.
+In this case, the created pandas UDF requires multiple input columns as many
as the series in the tuple
+when the Pandas UDF is called. It works identically as Iterator of Series to
Iterator of Series case except the parameter difference.
-The following example shows how to use `groupby().apply()` to subtract the
mean from each value in the group.
+The following example shows how to create this Pandas UDF:
<div class="codetabs">
<div data-lang="python" markdown="1">
-{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
+{% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %}
</div>
</div>
-For detailed usage, please see
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
and
-[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
+For detailed usage, please see
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Series to Scalar
-### Grouped Aggregate
+The type hint can be expressed as `pandas.Series`, ... -> `Any`.
-Grouped aggregate Pandas UDFs are similar to Spark aggregate functions.
Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and
-[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It
defines an aggregation from one or more `pandas.Series`
-to a scalar value, where each `pandas.Series` represents a column within the
group or window.
+By using `pandas_udf` with the function having such type hints, it creates a
Pandas UDF similar
+to PySpark's aggregate functions. The given function takes `pandas.Series` and
returns a scalar value.
+The return type should be a primitive data type, and the returned scalar can
be either a python
+primitive type, e.g., `int` or `float` or a numpy data type, e.g.,
`numpy.int64` or `numpy.float64`.
+`Any` should ideally be a specific scalar type accordingly.
-Note that this type of UDF does not support partial aggregation and all data
for a group or window will be loaded into memory. Also,
-only unbounded window is supported with Grouped aggregate Pandas UDFs
currently.
+This UDF can be also used with `groupBy().agg()` and
[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window).
+It defines an aggregation from one or more `pandas.Series` to a scalar value,
where each `pandas.Series`
+represents a column within the group or window.
-The following example shows how to use this type of UDF to compute mean with
groupBy and window operations:
+Note that this type of UDF does not support partial aggregation and all data
for a group or window
+will be loaded into memory. Also, only unbounded window is supported with
Grouped aggregate Pandas
+UDFs currently. The following example shows how to use this type of UDF to
compute mean with a group-by
+and window operations:
<div class="codetabs">
<div data-lang="python" markdown="1">
-{% include_example grouped_agg_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %}
</div>
</div>
For detailed usage, please see
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
-### Map Iterator
+## Pandas Function APIs
+
+Pandas function APIs can directly apply a Python native function against the
whole DataFrame by
+using Pandas instances. Internally it works similarly with Pandas UDFs by
Spark using Arrow to transfer
+data and Pandas to work with the data, which allows vectorized operations. A
Pandas function API behaves
+as a regular API under PySpark `DataFrame` in general.
+
+From Spark 3.0, Grouped map pandas UDF is now categorized as a separate Pandas
Function API,
+`DataFrame.groupby().applyInPandas()`. It is still possible to use it with
`PandasUDFType`
+and `DataFrame.groupby().apply()` as it was; however, it is preferred to use
+`DataFrame.groupby().applyInPandas()` directly. Using `PandasUDFType` will be
deprecated
+in the future.
+
+### Grouped Map
+
+Grouped map operations with Pandas instances are supported by
`DataFrame.groupby().applyInPandas()`
+which requires a Python function that takes a `pandas.DataFrame` and return
another `pandas.DataFrame`.
+It maps each group to each `pandas.DataFrame` in the Python function.
+
+This API implements the "split-apply-combine" pattern which consists of three
steps:
+* Split the data into groups by using `DataFrame.groupBy`.
+* Apply a function on each group. The input and output of the function are
both `pandas.DataFrame`. The
+ input data contains all the rows and columns for each group.
+* Combine the results into a new PySpark `DataFrame`.
-Map iterator Pandas UDFs are used to transform data with an iterator of
batches. Map iterator
-Pandas UDFs can be used with
-[`pyspark.sql.DataFrame.mapInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
-It defines a map function that transforms an iterator of `pandas.DataFrame` to
another.
+To use `groupBy().applyInPandas()`, the user needs to define the following:
+* A Python function that defines the computation for each group.
+* A `StructType` object or a string that defines the schema of the output
PySpark `DataFrame`.
+
+The column labels of the returned `pandas.DataFrame` must either match the
field names in the
+defined output schema if specified as strings, or match the field data types
by position if not
+strings, e.g. integer indices. See
[pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
+on how to label columns when constructing a `pandas.DataFrame`.
-It can return the output of arbitrary length in contrast to the scalar Pandas
UDF. It maps an iterator of `pandas.DataFrame`s,
-that represents the current `DataFrame`, using the map iterator UDF and
returns the result as a `DataFrame`.
+Note that all data for a group will be loaded into memory before the function
is applied. This can
+lead to out of memory exceptions, especially if the group sizes are skewed.
The configuration for
+[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and
it is up to the user
+to ensure that the grouped data will fit into the available memory.
-The following example shows how to create map iterator Pandas UDFs:
+The following example shows how to use `groupby().applyInPandas()` to subtract
the mean from each value
+in the group.
<div class="codetabs">
<div data-lang="python" markdown="1">
-{% include_example map_iter_pandas_udf python/sql/arrow.py %}
+{% include_example grouped_apply_in_pandas python/sql/arrow.py %}
</div>
</div>
-For detailed usage, please see
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
and
-[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
+For detailed usage, please see
[`pyspark.sql.GroupedData.applyInPandas`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas).
+### Map
+
+Map operations with Pandas instances are supported by
`DataFrame.mapInPandas()` which maps an iterator
+of `pandas.DataFrame`s to another iterator of `pandas.DataFrame`s that
represents the current
+PySpark `DataFrame` and returns the result as a PySpark `DataFrame`. The
functions takes and outputs
+an iterator of `pandas.DataFrame`. It can return the output of arbitrary
length in contrast to some
+Pandas UDFs although internally it works similarly with Series to Series
Pandas UDF.
+
+The following example shows how to use `mapInPandas()`:
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example map_in_pandas python/sql/arrow.py %}
+</div>
+</div>
-### Cogrouped Map
+For detailed usage, please see
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
-Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key
and then a python function applied to
-each cogroup. They are used with `groupBy().cogroup().apply()` which consists
of the following steps:
+### Co-grouped Map
+Co-grouped map operations with Pandas instances are supported by
`DataFrame.groupby().cogroup().applyInPandas()` which
+allows two PySpark `DataFrame`s to be cogrouped by a common key and then a
Python function applied to each
+cogroup. It consists of the following steps:
* Shuffle the data such that the groups of each dataframe which share a key
are cogrouped together.
-* Apply a function to each cogroup. The input of the function is two
`pandas.DataFrame` (with an optional Tuple
-representing the key). The output of the function is a `pandas.DataFrame`.
-* Combine the pandas.DataFrames from all groups into a new `DataFrame`.
+* Apply a function to each cogroup. The input of the function is two
`pandas.DataFrame` (with an optional tuple
+representing the key). The output of the function is a `pandas.DataFrame`.
+* Combine the `pandas.DataFrame`s from all groups into a new PySpark
`DataFrame`.
-To use `groupBy().cogroup().apply()`, the user needs to define the following:
+To use `groupBy().cogroup().applyInPandas()`, the user needs to define the
following:
* A Python function that defines the computation for each cogroup.
-* A `StructType` object or a string that defines the schema of the output
`DataFrame`.
+* A `StructType` object or a string that defines the schema of the output
PySpark `DataFrame`.
The column labels of the returned `pandas.DataFrame` must either match the
field names in the
defined output schema if specified as strings, or match the field data types
by position if not
@@ -201,16 +285,15 @@ Note that all data for a cogroup will be loaded into
memory before the function
memory exceptions, especially if the group sizes are skewed. The configuration
for [maxRecordsPerBatch](#setting-arrow-batch-size)
is not applied and it is up to the user to ensure that the cogrouped data will
fit into the available memory.
-The following example shows how to use `groupby().cogroup().apply()` to
perform an asof join between two datasets.
+The following example shows how to use `groupby().cogroup().applyInPandas()`
to perform an asof join between two datasets.
<div class="codetabs">
<div data-lang="python" markdown="1">
-{% include_example cogrouped_map_pandas_udf python/sql/arrow.py %}
+{% include_example cogrouped_apply_in_pandas python/sql/arrow.py %}
</div>
</div>
-For detailed usage, please see
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
and
-[`pyspark.sql.CoGroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.CoGroupedData.apply).
+For detailed usage, please see
[`pyspark.sql.PandasCogroupedOps.applyInPandas()`](api/python/pyspark.sql.html#pyspark.sql.PandasCogroupedOps.applyInPandas).
## Usage Notes
diff --git a/examples/src/main/python/sql/arrow.py
b/examples/src/main/python/sql/arrow.py
index 1c98317..b7d8467 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -23,12 +23,19 @@ Run with:
from __future__ import print_function
+import sys
+
from pyspark.sql import SparkSession
from pyspark.sql.pandas.utils import require_minimum_pandas_version,
require_minimum_pyarrow_version
require_minimum_pandas_version()
require_minimum_pyarrow_version()
+if sys.version_info < (3, 6):
+ raise Exception(
+ "Running this example file requires Python 3.6+; however, "
+ "your Python version was:\n %s" % sys.version)
+
def dataframe_with_arrow_example(spark):
# $example on:dataframe_with_arrow$
@@ -50,15 +57,45 @@ def dataframe_with_arrow_example(spark):
print("Pandas DataFrame result statistics:\n%s\n" %
str(result_pdf.describe()))
-def scalar_pandas_udf_example(spark):
- # $example on:scalar_pandas_udf$
+def ser_to_frame_pandas_udf_example(spark):
+ # $example on:ser_to_frame_pandas_udf$
+ import pandas as pd
+
+ from pyspark.sql.functions import pandas_udf
+
+ @pandas_udf("col1 string, col2 long")
+ def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
+ s3['col2'] = s1 + s2.str.len()
+ return s3
+
+ # Create a Spark DataFrame that has three columns including a sturct
column.
+ df = spark.createDataFrame(
+ [[1, "a string", ("a nested string",)]],
+ "long_col long, string_col string, struct_col struct<col1:string>")
+
+ df.printSchema()
+ # root
+ # |-- long_column: long (nullable = true)
+ # |-- string_column: string (nullable = true)
+ # |-- struct_column: struct (nullable = true)
+ # | |-- col1: string (nullable = true)
+
+ df.select(func("long_col", "string_col", "struct_col")).printSchema()
+ # |-- func(long_col, string_col, struct_col): struct (nullable = true)
+ # | |-- col1: string (nullable = true)
+ # | |-- col2: long (nullable = true)
+ # $example off:ser_to_frame_pandas_udf$$
+
+
+def ser_to_ser_pandas_udf_example(spark):
+ # $example on:ser_to_ser_pandas_udf$
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
- def multiply_func(a, b):
+ def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
@@ -83,26 +120,27 @@ def scalar_pandas_udf_example(spark):
# | 4|
# | 9|
# +-------------------+
- # $example off:scalar_pandas_udf$
+ # $example off:ser_to_ser_pandas_udf$
-def scalar_iter_pandas_udf_example(spark):
- # $example on:scalar_iter_pandas_udf$
+def iter_ser_to_iter_ser_pandas_udf_example(spark):
+ # $example on:iter_ser_to_iter_ser_pandas_udf$
+ from typing import Iterator
+
import pandas as pd
- from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType
+ from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
- # When the UDF is called with a single column that is not StructType,
- # the input to the underlying function is an iterator of pd.Series.
- @pandas_udf("long", PandasUDFType.SCALAR_ITER)
- def plus_one(batch_iter):
- for x in batch_iter:
+ # Declare the function and create the UDF
+ @pandas_udf("long")
+ def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+ for x in iterator:
yield x + 1
- df.select(plus_one(col("x"))).show()
+ df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
@@ -110,15 +148,28 @@ def scalar_iter_pandas_udf_example(spark):
# | 3|
# | 4|
# +-----------+
+ # $example off:iter_ser_to_iter_ser_pandas_udf$
+
+
+def iter_sers_to_iter_ser_pandas_udf_example(spark):
+ # $example on:iter_sers_to_iter_ser_pandas_udf$
+ from typing import Iterator, Tuple
+
+ import pandas as pd
- # When the UDF is called with more than one columns,
- # the input to the underlying function is an iterator of pd.Series tuple.
- @pandas_udf("long", PandasUDFType.SCALAR_ITER)
- def multiply_two_cols(batch_iter):
- for a, b in batch_iter:
+ from pyspark.sql.functions import pandas_udf
+
+ pdf = pd.DataFrame([1, 2, 3], columns=["x"])
+ df = spark.createDataFrame(pdf)
+
+ # Declare the function and create the UDF
+ @pandas_udf("long")
+ def multiply_two_cols(
+ iterator: Iterator[Tuple[pd.Series, pd.Series]]) ->
Iterator[pd.Series]:
+ for a, b in iterator:
yield a * b
- df.select(multiply_two_cols(col("x"), col("x"))).show()
+ df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
@@ -126,92 +177,32 @@ def scalar_iter_pandas_udf_example(spark):
# | 4|
# | 9|
# +-----------------------+
+ # $example off:iter_sers_to_iter_ser_pandas_udf$
- # When the UDF is called with a single column that is StructType,
- # the input to the underlying function is an iterator of pd.DataFrame.
- @pandas_udf("long", PandasUDFType.SCALAR_ITER)
- def multiply_two_nested_cols(pdf_iter):
- for pdf in pdf_iter:
- yield pdf["a"] * pdf["b"]
-
- df.select(
- multiply_two_nested_cols(
- struct(col("x").alias("a"), col("x").alias("b"))
- ).alias("y")
- ).show()
- # +---+
- # | y|
- # +---+
- # | 1|
- # | 4|
- # | 9|
- # +---+
-
- # In the UDF, you can initialize some states before processing batches.
- # Wrap your code with try/finally or use context managers to ensure
- # the release of resources at the end.
- y_bc = spark.sparkContext.broadcast(1)
-
- @pandas_udf("long", PandasUDFType.SCALAR_ITER)
- def plus_y(batch_iter):
- y = y_bc.value # initialize states
- try:
- for x in batch_iter:
- yield x + y
- finally:
- pass # release resources here, if any
-
- df.select(plus_y(col("x"))).show()
- # +---------+
- # |plus_y(x)|
- # +---------+
- # | 2|
- # | 3|
- # | 4|
- # +---------+
- # $example off:scalar_iter_pandas_udf$
-
-
-def grouped_map_pandas_udf_example(spark):
- # $example on:grouped_map_pandas_udf$
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
- df = spark.createDataFrame(
- [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
- ("id", "v"))
-
- @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
- def subtract_mean(pdf):
- # pdf is a pandas.DataFrame
- v = pdf.v
- return pdf.assign(v=v - v.mean())
-
- df.groupby("id").apply(subtract_mean).show()
- # +---+----+
- # | id| v|
- # +---+----+
- # | 1|-0.5|
- # | 1| 0.5|
- # | 2|-3.0|
- # | 2|-1.0|
- # | 2| 4.0|
- # +---+----+
- # $example off:grouped_map_pandas_udf$
+def ser_to_scalar_pandas_udf_example(spark):
+ # $example on:ser_to_scalar_pandas_udf$
+ import pandas as pd
-def grouped_agg_pandas_udf_example(spark):
- # $example on:grouped_agg_pandas_udf$
- from pyspark.sql.functions import pandas_udf, PandasUDFType
+ from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
- @pandas_udf("double", PandasUDFType.GROUPED_AGG)
- def mean_udf(v):
+ # Declare the function and create the UDF
+ @pandas_udf("double")
+ def mean_udf(v: pd.Series) -> float:
return v.mean()
+ df.select(mean_udf(df['v'])).show()
+ # +-----------+
+ # |mean_udf(v)|
+ # +-----------+
+ # | 4.2|
+ # +-----------+
+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
@@ -233,37 +224,54 @@ def grouped_agg_pandas_udf_example(spark):
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
- # $example off:grouped_agg_pandas_udf$
+ # $example off:ser_to_scalar_pandas_udf$
-def map_iter_pandas_udf_example(spark):
- # $example on:map_iter_pandas_udf$
- import pandas as pd
+def grouped_apply_in_pandas_example(spark):
+ # $example on:grouped_apply_in_pandas$
+ df = spark.createDataFrame(
+ [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+ ("id", "v"))
- from pyspark.sql.functions import pandas_udf, PandasUDFType
+ def subtract_mean(pdf):
+ # pdf is a pandas.DataFrame
+ v = pdf.v
+ return pdf.assign(v=v - v.mean())
+
+ df.groupby("id").applyInPandas(subtract_mean, schema="id long, v
double").show()
+ # +---+----+
+ # | id| v|
+ # +---+----+
+ # | 1|-0.5|
+ # | 1| 0.5|
+ # | 2|-3.0|
+ # | 2|-1.0|
+ # | 2| 4.0|
+ # +---+----+
+ # $example off:grouped_apply_in_pandas$
+
+def map_in_pandas_example(spark):
+ # $example on:map_in_pandas$
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
- @pandas_udf(df.schema, PandasUDFType.MAP_ITER)
- def filter_func(batch_iter):
- for pdf in batch_iter:
+ def filter_func(iterator):
+ for pdf in iterator:
yield pdf[pdf.id == 1]
- df.mapInPandas(filter_func).show()
+ df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
- # $example off:map_iter_pandas_udf$
+ # $example off:map_in_pandas$
-def cogrouped_map_pandas_udf_example(spark):
- # $example on:cogrouped_map_pandas_udf$
+def cogrouped_apply_in_pandas_example(spark):
+ # $example on:cogrouped_apply_in_pandas$
import pandas as pd
- from pyspark.sql.functions import pandas_udf, PandasUDFType
-
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],
("time", "id", "v1"))
@@ -272,11 +280,11 @@ def cogrouped_map_pandas_udf_example(spark):
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
- @pandas_udf("time int, id int, v1 double, v2 string",
PandasUDFType.COGROUPED_MAP)
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
- df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
+ df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+ asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
@@ -285,7 +293,7 @@ def cogrouped_map_pandas_udf_example(spark):
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
- # $example off:cogrouped_map_pandas_udf$
+ # $example off:cogrouped_apply_in_pandas$
if __name__ == "__main__":
@@ -296,17 +304,21 @@ if __name__ == "__main__":
print("Running Pandas to/from conversion example")
dataframe_with_arrow_example(spark)
- print("Running pandas_udf scalar example")
- scalar_pandas_udf_example(spark)
- print("Running pandas_udf scalar iterator example")
- scalar_iter_pandas_udf_example(spark)
- print("Running pandas_udf grouped map example")
- grouped_map_pandas_udf_example(spark)
- print("Running pandas_udf grouped agg example")
- grouped_agg_pandas_udf_example(spark)
- print("Running pandas_udf map iterator example")
- map_iter_pandas_udf_example(spark)
- print("Running pandas_udf cogrouped map example")
- cogrouped_map_pandas_udf_example(spark)
+ print("Running pandas_udf example: Series to Frame")
+ ser_to_frame_pandas_udf_example(spark)
+ print("Running pandas_udf example: Series to Series")
+ ser_to_ser_pandas_udf_example(spark)
+ print("Running pandas_udf example: Iterator of Series to Iterator of
Seires")
+ iter_ser_to_iter_ser_pandas_udf_example(spark)
+ print("Running pandas_udf example: Iterator of Multiple Series to Iterator
of Series")
+ iter_sers_to_iter_ser_pandas_udf_example(spark)
+ print("Running pandas_udf example: Series to Scalar")
+ ser_to_scalar_pandas_udf_example(spark)
+ print("Running pandas function example: Grouped Map")
+ grouped_apply_in_pandas_example(spark)
+ print("Running pandas function example: Map")
+ map_in_pandas_example(spark)
+ print("Running pandas function example: Co-grouped Map")
+ cogrouped_apply_in_pandas_example(spark)
spark.stop()
diff --git a/python/pyspark/sql/pandas/functions.py
b/python/pyspark/sql/pandas/functions.py
index 3060278..31aa321 100644
--- a/python/pyspark/sql/pandas/functions.py
+++ b/python/pyspark/sql/pandas/functions.py
@@ -43,303 +43,228 @@ class PandasUDFType(object):
@since(2.3)
def pandas_udf(f=None, returnType=None, functionType=None):
"""
- Creates a vectorized user defined function (UDF).
+ Creates a pandas user defined function (a.k.a. vectorized user defined
function).
+
+ Pandas UDFs are user defined functions that are executed by Spark using
Arrow to transfer
+ data and Pandas to work with the data, which allows vectorized operations.
A Pandas UDF
+ is defined using the `pandas_udf` as a decorator or to wrap the function,
and no
+ additional configuration is required. A Pandas UDF behaves as a regular
PySpark function
+ API in general.
:param f: user-defined function. A python function if used as a standalone
function
:param returnType: the return type of the user-defined function. The value
can be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type
string.
:param functionType: an enum value in
:class:`pyspark.sql.functions.PandasUDFType`.
- Default: SCALAR.
-
- .. seealso:: :meth:`pyspark.sql.DataFrame.mapInPandas`
- .. seealso:: :meth:`pyspark.sql.GroupedData.applyInPandas`
- .. seealso:: :meth:`pyspark.sql.PandasCogroupedOps.applyInPandas`
-
- The function type of the UDF can be one of the following:
-
- 1. SCALAR
-
- A scalar UDF defines a transformation: One or more `pandas.Series` -> A
`pandas.Series`.
- The length of the returned `pandas.Series` must be of the same as the
input `pandas.Series`.
- If the return type is :class:`StructType`, the returned value should be
a `pandas.DataFrame`.
-
- :class:`MapType`, nested :class:`StructType` are currently not
supported as output types.
-
- Scalar UDFs can be used with :meth:`pyspark.sql.DataFrame.withColumn`
and
- :meth:`pyspark.sql.DataFrame.select`.
-
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
- >>> from pyspark.sql.types import IntegerType, StringType
- >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) # doctest:
+SKIP
- >>> @pandas_udf(StringType()) # doctest: +SKIP
- ... def to_upper(s):
- ... return s.str.upper()
- ...
- >>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP
- ... def add_one(x):
- ... return x + 1
- ...
- >>> df = spark.createDataFrame([(1, "John Doe", 21)],
- ... ("id", "name", "age")) # doctest: +SKIP
- >>> df.select(slen("name").alias("slen(name)"), to_upper("name"),
add_one("age")) \\
- ... .show() # doctest: +SKIP
- +----------+--------------+------------+
- |slen(name)|to_upper(name)|add_one(age)|
- +----------+--------------+------------+
- | 8| JOHN DOE| 22|
- +----------+--------------+------------+
- >>> @pandas_udf("first string, last string") # doctest: +SKIP
- ... def split_expand(n):
- ... return n.str.split(expand=True)
- >>> df.select(split_expand("name")).show() # doctest: +SKIP
- +------------------+
- |split_expand(name)|
- +------------------+
- | [John, Doe]|
- +------------------+
-
- .. note:: The length of `pandas.Series` within a scalar UDF is not that
of the whole input
- column, but is the length of an internal batch used for each call
to the function.
- Therefore, this can be used, for example, to ensure the length of
each returned
- `pandas.Series`, and can not be used as the column length.
-
- 2. SCALAR_ITER
-
- A scalar iterator UDF is semantically the same as the scalar Pandas UDF
above except that the
- wrapped Python function takes an iterator of batches as input instead
of a single batch and,
- instead of returning a single output batch, it yields output batches or
explicitly returns an
- generator or an iterator of output batches.
- It is useful when the UDF execution requires initializing some state,
e.g., loading a machine
- learning model file to apply inference to every input batch.
-
- .. note:: It is not guaranteed that one invocation of a scalar iterator
UDF will process all
- batches from one partition, although it is currently implemented
this way.
- Your code shall not rely on this behavior because it might change
in the future for
- further optimization, e.g., one invocation processes multiple
partitions.
-
- Scalar iterator UDFs are used with
:meth:`pyspark.sql.DataFrame.withColumn` and
- :meth:`pyspark.sql.DataFrame.select`.
-
- >>> import pandas as pd # doctest: +SKIP
- >>> from pyspark.sql.functions import col, pandas_udf, struct,
PandasUDFType
- >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"]) # doctest: +SKIP
- >>> df = spark.createDataFrame(pdf) # doctest: +SKIP
-
- When the UDF is called with a single column that is not `StructType`,
the input to the
- underlying function is an iterator of `pd.Series`.
-
- >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
- ... def plus_one(batch_iter):
- ... for x in batch_iter:
- ... yield x + 1
- ...
- >>> df.select(plus_one(col("x"))).show() # doctest: +SKIP
- +-----------+
- |plus_one(x)|
- +-----------+
- | 2|
- | 3|
- | 4|
- +-----------+
-
- When the UDF is called with more than one columns, the input to the
underlying function is an
- iterator of `pd.Series` tuple.
-
- >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
- ... def multiply_two_cols(batch_iter):
- ... for a, b in batch_iter:
- ... yield a * b
- ...
- >>> df.select(multiply_two_cols(col("x"), col("x"))).show() # doctest:
+SKIP
- +-----------------------+
- |multiply_two_cols(x, x)|
- +-----------------------+
- | 1|
- | 4|
- | 9|
- +-----------------------+
-
- When the UDF is called with a single column that is `StructType`, the
input to the underlying
- function is an iterator of `pd.DataFrame`.
-
- >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
- ... def multiply_two_nested_cols(pdf_iter):
- ... for pdf in pdf_iter:
- ... yield pdf["a"] * pdf["b"]
- ...
- >>> df.select(
- ... multiply_two_nested_cols(
- ... struct(col("x").alias("a"), col("x").alias("b"))
- ... ).alias("y")
- ... ).show() # doctest: +SKIP
- +---+
- | y|
- +---+
- | 1|
- | 4|
- | 9|
- +---+
-
- In the UDF, you can initialize some states before processing batches,
wrap your code with
- `try ... finally ...` or use context managers to ensure the release of
resources at the end
- or in case of early termination.
-
- >>> y_bc = spark.sparkContext.broadcast(1) # doctest: +SKIP
- >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP
- ... def plus_y(batch_iter):
- ... y = y_bc.value # initialize some state
- ... try:
- ... for x in batch_iter:
- ... yield x + y
- ... finally:
- ... pass # release resources here, if any
- ...
- >>> df.select(plus_y(col("x"))).show() # doctest: +SKIP
- +---------+
- |plus_y(x)|
- +---------+
- | 2|
- | 3|
- | 4|
- +---------+
-
- 3. GROUPED_MAP
-
- A grouped map UDF defines transformation: A `pandas.DataFrame` -> A
`pandas.DataFrame`
- The returnType should be a :class:`StructType` describing the schema of
the returned
- `pandas.DataFrame`. The column labels of the returned
`pandas.DataFrame` must either match
- the field names in the defined returnType schema if specified as
strings, or match the
- field data types by position if not strings, e.g. integer indices.
- The length of the returned `pandas.DataFrame` can be arbitrary.
-
- Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
-
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
- >>> df = spark.createDataFrame(
- ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
- ... ("id", "v")) # doctest: +SKIP
- >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) #
doctest: +SKIP
- ... def normalize(pdf):
- ... v = pdf.v
- ... return pdf.assign(v=(v - v.mean()) / v.std())
- >>> df.groupby("id").apply(normalize).show() # doctest: +SKIP
- +---+-------------------+
- | id| v|
- +---+-------------------+
- | 1|-0.7071067811865475|
- | 1| 0.7071067811865475|
- | 2|-0.8320502943378437|
- | 2|-0.2773500981126146|
- | 2| 1.1094003924504583|
- +---+-------------------+
-
- Alternatively, the user can define a function that takes two arguments.
- In this case, the grouping key(s) will be passed as the first argument
and the data will
- be passed as the second argument. The grouping key(s) will be passed as
a tuple of numpy
- data types, e.g., `numpy.int32` and `numpy.float64`. The data will
still be passed in
- as a `pandas.DataFrame` containing all columns from the original Spark
DataFrame.
- This is useful when the user does not want to hardcode grouping key(s)
in the function.
-
- >>> import pandas as pd # doctest: +SKIP
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
- >>> df = spark.createDataFrame(
- ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
- ... ("id", "v")) # doctest: +SKIP
- >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) #
doctest: +SKIP
- ... def mean_udf(key, pdf):
- ... # key is a tuple of one numpy.int64, which is the value
- ... # of 'id' for the current group
- ... return pd.DataFrame([key + (pdf.v.mean(),)])
- >>> df.groupby('id').apply(mean_udf).show() # doctest: +SKIP
- +---+---+
- | id| v|
- +---+---+
- | 1|1.5|
- | 2|6.0|
- +---+---+
- >>> @pandas_udf(
- ... "id long, `ceil(v / 2)` long, v double",
- ... PandasUDFType.GROUPED_MAP) # doctest: +SKIP
- >>> def sum_udf(key, pdf):
- ... # key is a tuple of two numpy.int64s, which is the values
- ... # of 'id' and 'ceil(df.v / 2)' for the current group
- ... return pd.DataFrame([key + (pdf.v.sum(),)])
- >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show() # doctest:
+SKIP
- +---+-----------+----+
- | id|ceil(v / 2)| v|
- +---+-----------+----+
- | 2| 5|10.0|
- | 1| 1| 3.0|
- | 2| 3| 5.0|
- | 2| 2| 3.0|
- +---+-----------+----+
-
- .. note:: If returning a new `pandas.DataFrame` constructed with a
dictionary, it is
- recommended to explicitly index the columns by name to ensure the
positions are correct,
- or alternatively use an `OrderedDict`.
- For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id',
'a'])` or
- `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
-
- .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
-
- 4. GROUPED_AGG
-
- A grouped aggregate UDF defines a transformation: One or more
`pandas.Series` -> A scalar
- The `returnType` should be a primitive data type, e.g.,
:class:`DoubleType`.
- The returned scalar can be either a python primitive type, e.g., `int`
or `float`
- or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
-
- :class:`MapType` and :class:`StructType` are currently not supported as
output types.
-
- Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg`
and
- :class:`pyspark.sql.Window`
-
- This example shows using grouped aggregated UDFs with groupby:
-
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
- >>> df = spark.createDataFrame(
- ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
- ... ("id", "v"))
- >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP
- ... def mean_udf(v):
- ... return v.mean()
- >>> df.groupby("id").agg(mean_udf(df['v'])).show() # doctest: +SKIP
- +---+-----------+
- | id|mean_udf(v)|
- +---+-----------+
- | 1| 1.5|
- | 2| 6.0|
- +---+-----------+
-
- This example shows using grouped aggregated UDFs as window functions.
-
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
- >>> from pyspark.sql import Window
- >>> df = spark.createDataFrame(
- ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
- ... ("id", "v"))
- >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP
- ... def mean_udf(v):
- ... return v.mean()
- >>> w = (Window.partitionBy('id')
- ... .orderBy('v')
- ... .rowsBetween(-1, 0))
- >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() #
doctest: +SKIP
- +---+----+------+
- | id| v|mean_v|
- +---+----+------+
- | 1| 1.0| 1.0|
- | 1| 2.0| 1.5|
- | 2| 3.0| 3.0|
- | 2| 5.0| 4.0|
- | 2|10.0| 7.5|
- +---+----+------+
-
- .. note:: For performance reasons, the input series to window functions
are not copied.
+ Default: SCALAR.
+
+ .. note:: This parameter exists for compatibility. Using Python type
hints is encouraged.
+
+ In order to use this API, customarily the below are imported:
+
+ >>> import pandas as pd
+ >>> from pyspark.sql.functions import pandas_udf
+
+ From Spark 3.0 with Python 3.6+, `Python type hints
<https://www.python.org/dev/peps/pep-0484>`_
+ detect the function types as below:
+
+ >>> @pandas_udf(IntegerType())
+ ... def slen(s: pd.Series) -> pd.Series:
+ ... return s.str.len()
+
+ Prior to Spark 3.0, the pandas UDF used `functionType` to decide the
execution type as below:
+
+ >>> from pyspark.sql.functions import PandasUDFType
+ >>> from pyspark.sql.types import IntegerType
+ >>> @pandas_udf(IntegerType(), PandasUDFType.SCALAR)
+ ... def slen(s):
+ ... return s.str.len()
+
+ It is preferred to specify type hints for the pandas UDF instead of
specifying pandas UDF
+ type via `functionType` which will be deprecated in the future releases.
+
+ Note that the type hint should use `pandas.Series` in all cases but there
is one variant
+ that `pandas.DataFrame` should be used for its input or output type hint
instead when the input
+ or output column is of :class:`pyspark.sql.types.StructType`. The
following example shows
+ a Pandas UDF which takes long column, string column and struct column, and
outputs a struct
+ column. It requires the function to specify the type hints of
`pandas.Series` and
+ `pandas.DataFrame` as below:
+
+ >>> @pandas_udf("col1 string, col2 long")
+ >>> def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) ->
pd.DataFrame:
+ ... s3['col2'] = s1 + s2.str.len()
+ ... return s3
+ ...
+ >>> # Create a Spark DataFrame that has three columns including a sturct
column.
+ ... df = spark.createDataFrame(
+ ... [[1, "a string", ("a nested string",)]],
+ ... "long_col long, string_col string, struct_col struct<col1:string>")
+ >>> df.printSchema()
+ root
+ |-- long_column: long (nullable = true)
+ |-- string_column: string (nullable = true)
+ |-- struct_column: struct (nullable = true)
+ | |-- col1: string (nullable = true)
+ >>> df.select(func("long_col", "string_col", "struct_col")).printSchema()
+ |-- func(long_col, string_col, struct_col): struct (nullable = true)
+ | |-- col1: string (nullable = true)
+ | |-- col2: long (nullable = true)
+
+ In the following sections, it describes the cominations of the supported
type hints. For
+ simplicity, `pandas.DataFrame` variant is omitted.
+
+ * Series to Series
+ `pandas.Series`, ... -> `pandas.Series`
+
+ The function takes one or more `pandas.Series` and outputs one
`pandas.Series`.
+ The output of the function should always be of the same length as the
input.
+
+ >>> @pandas_udf("string")
+ ... def to_upper(s: pd.Series) -> pd.Series:
+ ... return s.str.upper()
+ ...
+ >>> df = spark.createDataFrame([("John Doe",)], ("name",))
+ >>> df.select(to_upper("name")).show()
+ +--------------+
+ |to_upper(name)|
+ +--------------+
+ | JOHN DOE|
+ +--------------+
+
+ >>> @pandas_udf("first string, last string")
+ ... def split_expand(s: pd.Series) -> pd.DataFrame:
+ ... return s.str.split(expand=True)
+ ...
+ >>> df = spark.createDataFrame([("John Doe",)], ("name",))
+ >>> df.select(split_expand("name")).show()
+ +------------------+
+ |split_expand(name)|
+ +------------------+
+ | [John, Doe]|
+ +------------------+
+
+ .. note:: The length of the input is not that of the whole input
column, but is the
+ length of an internal batch used for each call to the function.
+
+ * Iterator of Series to Iterator of Series
+ `Iterator[pandas.Series]` -> `Iterator[pandas.Series]`
+
+ The function takes an iterator of `pandas.Series` and outputs an
iterator of
+ `pandas.Series`. In this case, the created pandas UDF instance
requires one input
+ column when this is called as a PySpark column. The output of each
series from
+ the function should always be of the same length as the input.
+
+ It is useful when the UDF execution
+ requires initializing some states although internally it works
identically as
+ Series to Series case. The pseudocode below illustrates the example.
+
+ .. highlight:: python
+ .. code-block:: python
+
+ @pandas_udf("long")
+ def calculate(iterator: Iterator[pd.Series]) ->
Iterator[pd.Series]:
+ # Do some expensive initialization with a state
+ state = very_expensive_initialization()
+ for x in iterator:
+ # Use that state for whole iterator.
+ yield calculate_with_state(x, state)
+
+ df.select(calculate("value")).show()
+
+ >>> from typing import Iterator
+ >>> @pandas_udf("long")
+ ... def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+ ... for s in iterator:
+ ... yield s + 1
+ ...
+ >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
+ >>> df.select(plus_one(df.v)).show()
+ +-----------+
+ |plus_one(v)|
+ +-----------+
+ | 2|
+ | 3|
+ | 4|
+ +-----------+
+
+ .. note:: The length of each series is the length of a batch
internally used.
+
+ * Iterator of Multiple Series to Iterator of Series
+ `Iterator[Tuple[pandas.Series, ...]]` -> `Iterator[pandas.Series]`
+
+ The function takes an iterator of a tuple of multiple `pandas.Series`
and outputs an
+ iterator of `pandas.Series`. In this case, the created pandas UDF
instance requires
+ input columns as many as the series when this is called as a PySpark
column.
+ It works identically as Iterator of Series to Iterator of Series case
except
+ the parameter difference. The output of each series from the function
should always
+ be of the same length as the input.
+
+ >>> from typing import Iterator, Tuple
+ >>> from pyspark.sql.functions import struct, col
+ >>> @pandas_udf("long")
+ ... def multiply(iterator: Iterator[Tuple[pd.Series, pd.DataFrame]])
-> Iterator[pd.Series]:
+ ... for s1, df in iterator:
+ ... yield s1 * df.v
+ ...
+ >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
+ >>> df.withColumn('output', multiply(col("v"),
struct(col("v")))).show()
+ +---+------+
+ | v|output|
+ +---+------+
+ | 1| 1|
+ | 2| 4|
+ | 3| 9|
+ +---+------+
+
+ .. note:: The length of each series is the length of a batch
internally used.
+
+ * Series to Scalar
+ `pandas.Series`, ... -> `Any`
+
+ The function takes `pandas.Series` and returns a scalar value. The
`returnType`
+ should be a primitive data type, and the returned scalar can be either
a python primitive
+ type, e.g., int or float or a numpy data type, e.g., numpy.int64 or
numpy.float64.
+ `Any` should ideally be a specific scalar type accordingly.
+
+ >>> @pandas_udf("double")
+ ... def mean_udf(v: pd.Series) -> float:
+ ... return v.mean()
+ ...
+ >>> df = spark.createDataFrame(
+ ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id",
"v"))
+ >>> df.groupby("id").agg(mean_udf(df['v'])).show()
+ +---+-----------+
+ | id|mean_udf(v)|
+ +---+-----------+
+ | 1| 1.5|
+ | 2| 6.0|
+ +---+-----------+
+
+ This UDF can also be used as window functions as below:
+
+ >>> from pyspark.sql import Window
+ >>> @pandas_udf("double")
+ ... def mean_udf(v: pd.Series) -> float:
+ ... return v.mean()
+ ...
+ >>> df = spark.createDataFrame(
+ ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id",
"v"))
+ >>> w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
+ >>> df.withColumn('mean_v', mean_udf("v").over(w)).show()
+ +---+----+------+
+ | id| v|mean_v|
+ +---+----+------+
+ | 1| 1.0| 1.0|
+ | 1| 2.0| 1.5|
+ | 2| 3.0| 3.0|
+ | 2| 5.0| 4.0|
+ | 2|10.0| 7.5|
+ +---+----+------+
+
+ .. note:: For performance reasons, the input series to window
functions are not copied.
Therefore, mutating the input series is not allowed and will cause
incorrect results.
For the same reason, users should also not rely on the index of
the input series.
- .. seealso:: :meth:`pyspark.sql.GroupedData.agg` and
:class:`pyspark.sql.Window`
+ .. seealso:: :meth:`pyspark.sql.GroupedData.agg` and
:class:`pyspark.sql.Window`
.. note:: The user-defined functions do not support conditional
expressions or short circuiting
in boolean expressions and it ends up with being executed all
internally. If the functions
@@ -348,10 +273,21 @@ def pandas_udf(f=None, returnType=None,
functionType=None):
.. note:: The user-defined functions do not take keyword arguments on the
calling side.
.. note:: The data type of returned `pandas.Series` from the user-defined
functions should be
- matched with defined returnType (see :meth:`types.to_arrow_type` and
+ matched with defined `returnType` (see :meth:`types.to_arrow_type` and
:meth:`types.from_arrow_type`). When there is mismatch between them,
Spark might do
conversion on returned data. The conversion is not guaranteed to be
correct and results
should be checked for accuracy by users.
+
+ .. note:: Currently,
+ :class:`pyspark.sql.types.MapType`,
+ :class:`pyspark.sql.types.ArrayType` of
:class:`pyspark.sql.types.TimestampType` and
+ nested :class:`pyspark.sql.types.StructType`
+ are currently not supported as output types.
+
+ .. seealso:: :meth:`pyspark.sql.DataFrame.mapInPandas`
+ .. seealso:: :meth:`pyspark.sql.GroupedData.applyInPandas`
+ .. seealso:: :meth:`pyspark.sql.PandasCogroupedOps.applyInPandas`
+ .. seealso:: :meth:`pyspark.sql.UDFRegistration.register`
"""
# The following table shows most of Pandas data and SQL type conversions
in Pandas UDFs that
@@ -480,25 +416,3 @@ def _create_pandas_udf(f, returnType, evalType):
"or three arguments (key, left, right).")
return _create_udf(f, returnType, evalType)
-
-
-def _test():
- import doctest
- from pyspark.sql import SparkSession
- import pyspark.sql.pandas.functions
- globs = pyspark.sql.pandas.functions.__dict__.copy()
- spark = SparkSession.builder\
- .master("local[4]")\
- .appName("sql.pandas.functions tests")\
- .getOrCreate()
- globs['spark'] = spark
- (failure_count, test_count) = doctest.testmod(
- pyspark.sql.pandas.functions, globs=globs,
- optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE |
doctest.REPORT_NDIFF)
- spark.stop()
- if failure_count:
- sys.exit(-1)
-
-
-if __name__ == "__main__":
- _test()
diff --git a/python/pyspark/sql/pandas/group_ops.py
b/python/pyspark/sql/pandas/group_ops.py
index 3152271..b93f051 100644
--- a/python/pyspark/sql/pandas/group_ops.py
+++ b/python/pyspark/sql/pandas/group_ops.py
@@ -88,29 +88,27 @@ class PandasGroupedOpsMixin(object):
to the user-function and the returned `pandas.DataFrame` are combined
as a
:class:`DataFrame`.
- The returned `pandas.DataFrame` can be of arbitrary length and its
schema must match the
- returnType of the pandas udf.
-
- .. note:: This function requires a full shuffle. All the data of a
group will be loaded
- into memory, so the user should be aware of the potential OOM risk
if data is skewed
- and certain groups are too large to fit in memory.
+ The `schema` should be a :class:`StructType` describing the schema of
the returned
+ `pandas.DataFrame`. The column labels of the returned
`pandas.DataFrame` must either match
+ the field names in the defined schema if specified as strings, or
match the
+ field data types by position if not strings, e.g. integer indices.
+ The length of the returned `pandas.DataFrame` can be arbitrary.
:param func: a Python native function that takes a `pandas.DataFrame`,
and outputs a
`pandas.DataFrame`.
:param schema: the return type of the `func` in PySpark. The value can
be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type
string.
- .. note:: Experimental
-
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+ >>> import pandas as pd # doctest: +SKIP
+ >>> from pyspark.sql.functions import pandas_udf, ceil
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
- ... ("id", "v"))
+ ... ("id", "v")) # doctest: +SKIP
>>> def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
- >>> df.groupby("id").applyInPandas(normalize, schema="id long, v
double").show()
- ... # doctest: +SKIP
+ >>> df.groupby("id").applyInPandas(
+ ... normalize, schema="id long, v double").show() # doctest: +SKIP
+---+-------------------+
| id| v|
+---+-------------------+
@@ -121,8 +119,56 @@ class PandasGroupedOpsMixin(object):
| 2| 1.1094003924504583|
+---+-------------------+
- .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+ Alternatively, the user can pass a function that takes two arguments.
+ In this case, the grouping key(s) will be passed as the first argument
and the data will
+ be passed as the second argument. The grouping key(s) will be passed
as a tuple of numpy
+ data types, e.g., `numpy.int32` and `numpy.float64`. The data will
still be passed in
+ as a `pandas.DataFrame` containing all columns from the original Spark
DataFrame.
+ This is useful when the user does not want to hardcode grouping key(s)
in the function.
+
+ >>> df = spark.createDataFrame(
+ ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+ ... ("id", "v")) # doctest: +SKIP
+ >>> def mean_func(key, pdf):
+ ... # key is a tuple of one numpy.int64, which is the value
+ ... # of 'id' for the current group
+ ... return pd.DataFrame([key + (pdf.v.mean(),)])
+ >>> df.groupby('id').applyInPandas(
+ ... mean_func, schema="id long, v double").show() # doctest: +SKIP
+ +---+---+
+ | id| v|
+ +---+---+
+ | 1|1.5|
+ | 2|6.0|
+ +---+---+
+ >>> def sum_func(key, pdf):
+ ... # key is a tuple of two numpy.int64s, which is the values
+ ... # of 'id' and 'ceil(df.v / 2)' for the current group
+ ... return pd.DataFrame([key + (pdf.v.sum(),)])
+ >>> df.groupby(df.id, ceil(df.v / 2)).applyInPandas(
+ ... sum_func, schema="id long, `ceil(v / 2)` long, v
double").show() # doctest: +SKIP
+ +---+-----------+----+
+ | id|ceil(v / 2)| v|
+ +---+-----------+----+
+ | 2| 5|10.0|
+ | 1| 1| 3.0|
+ | 2| 3| 5.0|
+ | 2| 2| 3.0|
+ +---+-----------+----+
+
+ .. note:: This function requires a full shuffle. All the data of a
group will be loaded
+ into memory, so the user should be aware of the potential OOM risk
if data is skewed
+ and certain groups are too large to fit in memory.
+
+ .. note:: If returning a new `pandas.DataFrame` constructed with a
dictionary, it is
+ recommended to explicitly index the columns by name to ensure the
positions are correct,
+ or alternatively use an `OrderedDict`.
+ For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id',
'a'])` or
+ `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
+ .. note:: Experimental
+
+ .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
"""
from pyspark.sql import GroupedData
from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -176,14 +222,11 @@ class PandasCogroupedOps(object):
`pandas.DataFrame` to the user-function and the returned
`pandas.DataFrame` are combined as
a :class:`DataFrame`.
- The returned `pandas.DataFrame` can be of arbitrary length and its
schema must match the
- returnType of the pandas udf.
-
- .. note:: This function requires a full shuffle. All the data of a
cogroup will be loaded
- into memory, so the user should be aware of the potential OOM risk
if data is skewed
- and certain groups are too large to fit in memory.
-
- .. note:: Experimental
+ The `schema` should be a :class:`StructType` describing the schema of
the returned
+ `pandas.DataFrame`. The column labels of the returned
`pandas.DataFrame` must either match
+ the field names in the defined schema if specified as strings, or
match the
+ field data types by position if not strings, e.g. integer indices.
+ The length of the returned `pandas.DataFrame` can be arbitrary.
:param func: a Python native function that takes two
`pandas.DataFrame`\\s, and
outputs a `pandas.DataFrame`, or that takes one tuple (grouping
keys) and two
@@ -191,7 +234,7 @@ class PandasCogroupedOps(object):
:param schema: the return type of the `func` in PySpark. The value can
be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type
string.
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+ >>> from pyspark.sql.functions import pandas_udf
>>> df1 = spark.createDataFrame(
... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],
... ("time", "id", "v1"))
@@ -232,6 +275,18 @@ class PandasCogroupedOps(object):
|20000102| 1|3.0| x|
+--------+---+---+---+
+ .. note:: This function requires a full shuffle. All the data of a
cogroup will be loaded
+ into memory, so the user should be aware of the potential OOM risk
if data is skewed
+ and certain groups are too large to fit in memory.
+
+ .. note:: If returning a new `pandas.DataFrame` constructed with a
dictionary, it is
+ recommended to explicitly index the columns by name to ensure the
positions are correct,
+ or alternatively use an `OrderedDict`.
+ For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id',
'a'])` or
+ `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
+
+ .. note:: Experimental
+
.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
"""
diff --git a/python/pyspark/sql/pandas/map_ops.py
b/python/pyspark/sql/pandas/map_ops.py
index 75cacd7..9835e88 100644
--- a/python/pyspark/sql/pandas/map_ops.py
+++ b/python/pyspark/sql/pandas/map_ops.py
@@ -45,10 +45,10 @@ class PandasMapOpsMixin(object):
:param schema: the return type of the `func` in PySpark. The value can
be either a
:class:`pyspark.sql.types.DataType` object or a DDL-formatted type
string.
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+ >>> from pyspark.sql.functions import pandas_udf
>>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
- >>> def filter_func(batch_iter):
- ... for pdf in batch_iter:
+ >>> def filter_func(iterator):
+ ... for pdf in iterator:
... yield pdf[pdf.id == 1]
>>> df.mapInPandas(filter_func, df.schema).show() # doctest: +SKIP
+---+---+
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 433c5fc..10546ec 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -297,17 +297,18 @@ class UDFRegistration(object):
>>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP
[Row(random_udf()=82)]
- >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
- >>> @pandas_udf("integer", PandasUDFType.SCALAR) # doctest: +SKIP
- ... def add_one(x):
- ... return x + 1
+ >>> import pandas as pd # doctest: +SKIP
+ >>> from pyspark.sql.functions import pandas_udf
+ >>> @pandas_udf("integer") # doctest: +SKIP
+ ... def add_one(s: pd.Series) -> pd.Series:
+ ... return s + 1
...
>>> _ = spark.udf.register("add_one", add_one) # doctest: +SKIP
>>> spark.sql("SELECT add_one(id) FROM range(3)").collect() #
doctest: +SKIP
[Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
- >>> @pandas_udf("integer", PandasUDFType.GROUPED_AGG) # doctest:
+SKIP
- ... def sum_udf(v):
+ >>> @pandas_udf("integer") # doctest: +SKIP
+ ... def sum_udf(v: pd.Series) -> int:
... return v.sum()
...
>>> _ = spark.udf.register("sum_udf", sum_udf) # doctest: +SKIP
@@ -414,6 +415,9 @@ def _test():
.appName("sql.udf tests")\
.getOrCreate()
globs['spark'] = spark
+ # Hack to skip the unit tests in register. These are currently being
tested in proper tests.
+ # We should reenable this test once we completely drop Python 2.
+ del pyspark.sql.udf.UDFRegistration.register
(failure_count, test_count) = doctest.testmod(
pyspark.sql.udf, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]