This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ed6193a  [SPARK-30722][PYTHON][DOCS] Update documentation for Pandas 
UDF with Python type hints
ed6193a is described below

commit ed6193ad68a55a85f51f3ebda08f53cbcf023a24
Author: HyukjinKwon <[email protected]>
AuthorDate: Wed Feb 12 10:49:46 2020 +0900

    [SPARK-30722][PYTHON][DOCS] Update documentation for Pandas UDF with Python 
type hints
    
    ### What changes were proposed in this pull request?
    
    This PR targets to document the Pandas UDF redesign with type hints 
introduced at SPARK-28264.
    Mostly self-describing; however, there are few things to note for reviewers.
    
    1. This PR replace the existing documentation of pandas UDFs to the newer 
redesign to promote the Python type hints. I added some words that Spark 3.0 
still keeps the compatibility though.
    
    2. This PR proposes to name non-pandas UDFs as "Pandas Function API"
    
    3. SCALAR_ITER become two separate sections to reduce confusion:
      - `Iterator[pd.Series]` -> `Iterator[pd.Series]`
      - `Iterator[Tuple[pd.Series, ...]]` -> `Iterator[pd.Series]`
    
    4. I removed some examples that look overkill to me.
    
    5. I also removed some information in the doc, that seems duplicating or 
too much.
    
    ### Why are the changes needed?
    
    To document new redesign in pandas UDF.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests should cover.
    
    Closes #27466 from HyukjinKwon/SPARK-30722.
    
    Authored-by: HyukjinKwon <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit aa6a60530e63ab3bb8b117f8738973d1b26a2cc7)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 dev/sparktestsupport/modules.py        |   1 -
 docs/sql-pyspark-pandas-with-arrow.md  | 233 +++++++++-----
 examples/src/main/python/sql/arrow.py  | 258 ++++++++--------
 python/pyspark/sql/pandas/functions.py | 538 ++++++++++++++-------------------
 python/pyspark/sql/pandas/group_ops.py |  99 ++++--
 python/pyspark/sql/pandas/map_ops.py   |   6 +-
 python/pyspark/sql/udf.py              |  16 +-
 7 files changed, 609 insertions(+), 542 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 40f2ca2..391e4bb 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -364,7 +364,6 @@ pyspark_sql = Module(
         "pyspark.sql.avro.functions",
         "pyspark.sql.pandas.conversion",
         "pyspark.sql.pandas.map_ops",
-        "pyspark.sql.pandas.functions",
         "pyspark.sql.pandas.group_ops",
         "pyspark.sql.pandas.types",
         "pyspark.sql.pandas.serializers",
diff --git a/docs/sql-pyspark-pandas-with-arrow.md 
b/docs/sql-pyspark-pandas-with-arrow.md
index 7eb8a74..92a5157 100644
--- a/docs/sql-pyspark-pandas-with-arrow.md
+++ b/docs/sql-pyspark-pandas-with-arrow.md
@@ -35,7 +35,7 @@ working with Arrow-enabled data.
 
 If you install PySpark using pip, then PyArrow can be brought in as an extra 
dependency of the
 SQL module with the command `pip install pyspark[sql]`. Otherwise, you must 
ensure that PyArrow
-is installed and available on all cluster nodes. The current supported version 
is 0.12.1.
+is installed and available on all cluster nodes. The current supported version 
is 0.15.1+.
 You can install using pip or conda from the conda-forge channel. See PyArrow
 [installation](https://arrow.apache.org/docs/python/install.html) for details.
 
@@ -65,132 +65,216 @@ Spark will fall back to create the DataFrame without 
Arrow.
 
 ## Pandas UDFs (a.k.a. Vectorized UDFs)
 
-Pandas UDFs are user defined functions that are executed by Spark using Arrow 
to transfer data and
-Pandas to work with the data. A Pandas UDF is defined using the keyword 
`pandas_udf` as a decorator
-or to wrap the function, no additional configuration is required. Currently, 
there are two types of
-Pandas UDF: Scalar and Grouped Map.
+Pandas UDFs are user defined functions that are executed by Spark using
+Arrow to transfer data and Pandas to work with the data, which allows 
vectorized operations. A Pandas
+UDF is defined using the `pandas_udf` as a decorator or to wrap the function, 
and no additional
+configuration is required. A Pandas UDF behaves as a regular PySpark function 
API in general.
 
-### Scalar
+Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From 
Spark 3.0
+with Python 3.6+, you can also use [Python type 
hints](https://www.python.org/dev/peps/pep-0484).
+Using Python type hints are preferred and using `PandasUDFType` will be 
deprecated in
+the future release.
 
-Scalar Pandas UDFs are used for vectorizing scalar operations. They can be 
used with functions such
-as `select` and `withColumn`. The Python function should take `pandas.Series` 
as inputs and return
-a `pandas.Series` of the same length. Internally, Spark will execute a Pandas 
UDF by splitting
-columns into batches and calling the function for each batch as a subset of 
the data, then
-concatenating the results together.
+Note that the type hint should use `pandas.Series` in all cases but there is 
one variant
+that `pandas.DataFrame` should be used for its input or output type hint 
instead when the input
+or output column is of `StructType`. The following example shows a Pandas UDF 
which takes long
+column, string column and struct column, and outputs a struct column. It 
requires the function to
+specify the type hints of `pandas.Series` and `pandas.DataFrame` as below:
 
-The following example shows how to create a scalar Pandas UDF that computes 
the product of 2 columns.
+<p>
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example ser_to_frame_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+</p>
+
+In the following sections, it describes the cominations of the supported type 
hints. For simplicity,
+`pandas.DataFrame` variant is omitted.
+
+### Series to Series
+
+The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`.
+
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the given
+function takes one or more `pandas.Series` and outputs one `pandas.Series`. 
The output of the function should
+always be of the same length as the input. Internally, PySpark will execute a 
Pandas UDF by splitting
+columns into batches and calling the function for each batch as a subset of 
the data, then concatenating
+the results together.
+
+The following example shows how to create this Pandas UDF that computes the 
product of 2 columns.
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example scalar_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-### Scalar Iterator
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Iterator of Series to Iterator of Series
+
+The type hint can be expressed as `Iterator[pandas.Series]` -> 
`Iterator[pandas.Series]`.
+
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the given
+function takes an iterator of `pandas.Series` and outputs an iterator of 
`pandas.Series`. The output of each
+series from the function should always be of the same length as the input. In 
this case, the created
+Pandas UDF requires one input column when the Pandas UDF is called. To use 
multiple input columns,
+a different type hint is required. See Iterator of Multiple Series to Iterator 
of Series.
+
+It is useful when the UDF execution requires initializing some states although 
internally it works
+identically as Series to Series case. The pseudocode below illustrates the 
example.
+
+{% highlight python %}
+@pandas_udf("long")
+def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+    # Do some expensive initialization with a state
+    state = very_expensive_initialization()
+    for x in iterator:
+        # Use that state for whole iterator.
+        yield calculate_with_state(x, state)
 
-Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF 
above except that the
-underlying Python function takes an iterator of batches as input instead of a 
single batch and,
-instead of returning a single output batch, it yields output batches or 
returns an iterator of
-output batches.
-It is useful when the UDF execution requires initializing some states, e.g., 
loading an machine
-learning model file to apply inference to every input batch.
+df.select(calculate("value")).show()
+{% endhighlight %}
 
-The following example shows how to create scalar iterator Pandas UDFs:
+The following example shows how to create this Pandas UDF:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example scalar_iter_pandas_udf python/sql/arrow.py %}
+{% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-### Grouped Map
-Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the 
"split-apply-combine" pattern.
-Split-apply-combine consists of three steps:
-* Split the data into groups by using `DataFrame.groupBy`.
-* Apply a function on each group. The input and output of the function are 
both `pandas.DataFrame`. The
-  input data contains all the rows and columns for each group.
-* Combine the results into a new `DataFrame`.
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 
-To use `groupBy().apply()`, the user needs to define the following:
-* A Python function that defines the computation for each group.
-* A `StructType` object or a string that defines the schema of the output 
`DataFrame`.
+### Iterator of Multiple Series to Iterator of Series
 
-The column labels of the returned `pandas.DataFrame` must either match the 
field names in the
-defined output schema if specified as strings, or match the field data types 
by position if not
-strings, e.g. integer indices. See 
[pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
-on how to label columns when constructing a `pandas.DataFrame`.
+The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> 
`Iterator[pandas.Series]`.
 
-Note that all data for a group will be loaded into memory before the function 
is applied. This can
-lead to out of memory exceptions, especially if the group sizes are skewed. 
The configuration for
-[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and 
it is up to the user
-to ensure that the grouped data will fit into the available memory.
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the
+given function takes an iterator of a tuple of multiple `pandas.Series` and 
outputs an iterator of `pandas.Series`.
+In this case, the created pandas UDF requires multiple input columns as many 
as the series in the tuple
+when the Pandas UDF is called. It works identically as Iterator of Series to 
Iterator of Series case except the parameter difference.
 
-The following example shows how to use `groupby().apply()` to subtract the 
mean from each value in the group.
+The following example shows how to create this Pandas UDF:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
+{% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 and
-[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Series to Scalar
 
-### Grouped Aggregate
+The type hint can be expressed as `pandas.Series`, ... -> `Any`.
 
-Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. 
Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and
-[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It 
defines an aggregation from one or more `pandas.Series`
-to a scalar value, where each `pandas.Series` represents a column within the 
group or window.
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF similar
+to PySpark's aggregate functions. The given function takes `pandas.Series` and 
returns a scalar value.
+The return type should be a primitive data type, and the returned scalar can 
be either a python
+primitive type, e.g., `int` or `float` or a numpy data type, e.g., 
`numpy.int64` or `numpy.float64`.
+`Any` should ideally be a specific scalar type accordingly.
 
-Note that this type of UDF does not support partial aggregation and all data 
for a group or window will be loaded into memory. Also,
-only unbounded window is supported with Grouped aggregate Pandas UDFs 
currently.
+This UDF can be also used with `groupBy().agg()` and 
[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window).
+It defines an aggregation from one or more `pandas.Series` to a scalar value, 
where each `pandas.Series`
+represents a column within the group or window.
 
-The following example shows how to use this type of UDF to compute mean with 
groupBy and window operations:
+Note that this type of UDF does not support partial aggregation and all data 
for a group or window
+will be loaded into memory. Also, only unbounded window is supported with 
Grouped aggregate Pandas
+UDFs currently. The following example shows how to use this type of UDF to 
compute mean with a group-by
+and window operations:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example grouped_agg_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
 For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 
 
-### Map Iterator
+## Pandas Function APIs
+
+Pandas function APIs can directly apply a Python native function against the 
whole DataFrame by
+using Pandas instances. Internally it works similarly with Pandas UDFs by 
Spark using Arrow to transfer
+data and Pandas to work with the data, which allows vectorized operations. A 
Pandas function API behaves
+as a regular API under PySpark `DataFrame` in general.
+
+From Spark 3.0, Grouped map pandas UDF is now categorized as a separate Pandas 
Function API,
+`DataFrame.groupby().applyInPandas()`. It is still possible to use it with 
`PandasUDFType`
+and `DataFrame.groupby().apply()` as it was; however, it is preferred to use
+`DataFrame.groupby().applyInPandas()` directly. Using `PandasUDFType` will be 
deprecated
+in the future.
+
+### Grouped Map
+
+Grouped map operations with Pandas instances are supported by 
`DataFrame.groupby().applyInPandas()`
+which requires a Python function that takes a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+It maps each group to each `pandas.DataFrame` in the Python function.
+
+This API implements the "split-apply-combine" pattern which consists of three 
steps:
+* Split the data into groups by using `DataFrame.groupBy`.
+* Apply a function on each group. The input and output of the function are 
both `pandas.DataFrame`. The
+  input data contains all the rows and columns for each group.
+* Combine the results into a new PySpark `DataFrame`.
 
-Map iterator Pandas UDFs are used to transform data with an iterator of 
batches. Map iterator
-Pandas UDFs can be used with 
-[`pyspark.sql.DataFrame.mapInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
-It defines a map function that transforms an iterator of `pandas.DataFrame` to 
another.
+To use `groupBy().applyInPandas()`, the user needs to define the following:
+* A Python function that defines the computation for each group.
+* A `StructType` object or a string that defines the schema of the output 
PySpark `DataFrame`.
+
+The column labels of the returned `pandas.DataFrame` must either match the 
field names in the
+defined output schema if specified as strings, or match the field data types 
by position if not
+strings, e.g. integer indices. See 
[pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
+on how to label columns when constructing a `pandas.DataFrame`.
 
-It can return the output of arbitrary length in contrast to the scalar Pandas 
UDF. It maps an iterator of `pandas.DataFrame`s,
-that represents the current `DataFrame`, using the map iterator UDF and 
returns the result as a `DataFrame`.
+Note that all data for a group will be loaded into memory before the function 
is applied. This can
+lead to out of memory exceptions, especially if the group sizes are skewed. 
The configuration for
+[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and 
it is up to the user
+to ensure that the grouped data will fit into the available memory.
 
-The following example shows how to create map iterator Pandas UDFs:
+The following example shows how to use `groupby().applyInPandas()` to subtract 
the mean from each value
+in the group.
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example map_iter_pandas_udf python/sql/arrow.py %}
+{% include_example grouped_apply_in_pandas python/sql/arrow.py %}
 </div>
 </div>
 
-For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 and
-[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
+For detailed usage, please see 
[`pyspark.sql.GroupedData.applyInPandas`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas).
 
+### Map
+
+Map operations with Pandas instances are supported by 
`DataFrame.mapInPandas()` which maps an iterator
+of `pandas.DataFrame`s to another iterator of `pandas.DataFrame`s that 
represents the current
+PySpark `DataFrame` and returns the result as a PySpark `DataFrame`. The 
functions takes and outputs
+an iterator of `pandas.DataFrame`. It can return the output of arbitrary 
length in contrast to some
+Pandas UDFs although internally it works similarly with Series to Series 
Pandas UDF.
+
+The following example shows how to use `mapInPandas()`:
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example map_in_pandas python/sql/arrow.py %}
+</div>
+</div>
 
-### Cogrouped Map
+For detailed usage, please see 
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
 
-Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key 
and then a python function applied to
-each cogroup.  They are used with `groupBy().cogroup().apply()` which consists 
of the following steps:
+### Co-grouped Map
 
+Co-grouped map operations with Pandas instances are supported by 
`DataFrame.groupby().cogroup().applyInPandas()` which
+allows two PySpark `DataFrame`s to be cogrouped by a common key and then a 
Python function applied to each
+cogroup. It consists of the following steps:
 * Shuffle the data such that the groups of each dataframe which share a key 
are cogrouped together.
-* Apply a function to each cogroup.  The input of the function is two 
`pandas.DataFrame` (with an optional Tuple
-representing the key).  The output of the function is a `pandas.DataFrame`.
-* Combine the pandas.DataFrames from all groups into a new `DataFrame`. 
+* Apply a function to each cogroup. The input of the function is two 
`pandas.DataFrame` (with an optional tuple
+representing the key). The output of the function is a `pandas.DataFrame`.
+* Combine the `pandas.DataFrame`s from all groups into a new PySpark 
`DataFrame`. 
 
-To use `groupBy().cogroup().apply()`, the user needs to define the following:
+To use `groupBy().cogroup().applyInPandas()`, the user needs to define the 
following:
 * A Python function that defines the computation for each cogroup.
-* A `StructType` object or a string that defines the schema of the output 
`DataFrame`.
+* A `StructType` object or a string that defines the schema of the output 
PySpark `DataFrame`.
 
 The column labels of the returned `pandas.DataFrame` must either match the 
field names in the
 defined output schema if specified as strings, or match the field data types 
by position if not
@@ -201,16 +285,15 @@ Note that all data for a cogroup will be loaded into 
memory before the function
 memory exceptions, especially if the group sizes are skewed. The configuration 
for [maxRecordsPerBatch](#setting-arrow-batch-size)
 is not applied and it is up to the user to ensure that the cogrouped data will 
fit into the available memory.
 
-The following example shows how to use `groupby().cogroup().apply()` to 
perform an asof join between two datasets.
+The following example shows how to use `groupby().cogroup().applyInPandas()` 
to perform an asof join between two datasets.
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example cogrouped_map_pandas_udf python/sql/arrow.py %}
+{% include_example cogrouped_apply_in_pandas python/sql/arrow.py %}
 </div>
 </div>
 
-For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 and
-[`pyspark.sql.CoGroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.CoGroupedData.apply).
+For detailed usage, please see 
[`pyspark.sql.PandasCogroupedOps.applyInPandas()`](api/python/pyspark.sql.html#pyspark.sql.PandasCogroupedOps.applyInPandas).
 
 
 ## Usage Notes
diff --git a/examples/src/main/python/sql/arrow.py 
b/examples/src/main/python/sql/arrow.py
index 1c98317..b7d8467 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -23,12 +23,19 @@ Run with:
 
 from __future__ import print_function
 
+import sys
+
 from pyspark.sql import SparkSession
 from pyspark.sql.pandas.utils import require_minimum_pandas_version, 
require_minimum_pyarrow_version
 
 require_minimum_pandas_version()
 require_minimum_pyarrow_version()
 
+if sys.version_info < (3, 6):
+    raise Exception(
+        "Running this example file requires Python 3.6+; however, "
+        "your Python version was:\n %s" % sys.version)
+
 
 def dataframe_with_arrow_example(spark):
     # $example on:dataframe_with_arrow$
@@ -50,15 +57,45 @@ def dataframe_with_arrow_example(spark):
     print("Pandas DataFrame result statistics:\n%s\n" % 
str(result_pdf.describe()))
 
 
-def scalar_pandas_udf_example(spark):
-    # $example on:scalar_pandas_udf$
+def ser_to_frame_pandas_udf_example(spark):
+    # $example on:ser_to_frame_pandas_udf$
+    import pandas as pd
+
+    from pyspark.sql.functions import pandas_udf
+
+    @pandas_udf("col1 string, col2 long")
+    def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
+        s3['col2'] = s1 + s2.str.len()
+        return s3
+
+    # Create a Spark DataFrame that has three columns including a sturct 
column.
+    df = spark.createDataFrame(
+        [[1, "a string", ("a nested string",)]],
+        "long_col long, string_col string, struct_col struct<col1:string>")
+
+    df.printSchema()
+    # root
+    # |-- long_column: long (nullable = true)
+    # |-- string_column: string (nullable = true)
+    # |-- struct_column: struct (nullable = true)
+    # |    |-- col1: string (nullable = true)
+
+    df.select(func("long_col", "string_col", "struct_col")).printSchema()
+    # |-- func(long_col, string_col, struct_col): struct (nullable = true)
+    # |    |-- col1: string (nullable = true)
+    # |    |-- col2: long (nullable = true)
+    # $example off:ser_to_frame_pandas_udf$$
+
+
+def ser_to_ser_pandas_udf_example(spark):
+    # $example on:ser_to_ser_pandas_udf$
     import pandas as pd
 
     from pyspark.sql.functions import col, pandas_udf
     from pyspark.sql.types import LongType
 
     # Declare the function and create the UDF
-    def multiply_func(a, b):
+    def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
         return a * b
 
     multiply = pandas_udf(multiply_func, returnType=LongType())
@@ -83,26 +120,27 @@ def scalar_pandas_udf_example(spark):
     # |                  4|
     # |                  9|
     # +-------------------+
-    # $example off:scalar_pandas_udf$
+    # $example off:ser_to_ser_pandas_udf$
 
 
-def scalar_iter_pandas_udf_example(spark):
-    # $example on:scalar_iter_pandas_udf$
+def iter_ser_to_iter_ser_pandas_udf_example(spark):
+    # $example on:iter_ser_to_iter_ser_pandas_udf$
+    from typing import Iterator
+
     import pandas as pd
 
-    from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType
+    from pyspark.sql.functions import pandas_udf
 
     pdf = pd.DataFrame([1, 2, 3], columns=["x"])
     df = spark.createDataFrame(pdf)
 
-    # When the UDF is called with a single column that is not StructType,
-    # the input to the underlying function is an iterator of pd.Series.
-    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
-    def plus_one(batch_iter):
-        for x in batch_iter:
+    # Declare the function and create the UDF
+    @pandas_udf("long")
+    def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+        for x in iterator:
             yield x + 1
 
-    df.select(plus_one(col("x"))).show()
+    df.select(plus_one("x")).show()
     # +-----------+
     # |plus_one(x)|
     # +-----------+
@@ -110,15 +148,28 @@ def scalar_iter_pandas_udf_example(spark):
     # |          3|
     # |          4|
     # +-----------+
+    # $example off:iter_ser_to_iter_ser_pandas_udf$
+
+
+def iter_sers_to_iter_ser_pandas_udf_example(spark):
+    # $example on:iter_sers_to_iter_ser_pandas_udf$
+    from typing import Iterator, Tuple
+
+    import pandas as pd
 
-    # When the UDF is called with more than one columns,
-    # the input to the underlying function is an iterator of pd.Series tuple.
-    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
-    def multiply_two_cols(batch_iter):
-        for a, b in batch_iter:
+    from pyspark.sql.functions import pandas_udf
+
+    pdf = pd.DataFrame([1, 2, 3], columns=["x"])
+    df = spark.createDataFrame(pdf)
+
+    # Declare the function and create the UDF
+    @pandas_udf("long")
+    def multiply_two_cols(
+            iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> 
Iterator[pd.Series]:
+        for a, b in iterator:
             yield a * b
 
-    df.select(multiply_two_cols(col("x"), col("x"))).show()
+    df.select(multiply_two_cols("x", "x")).show()
     # +-----------------------+
     # |multiply_two_cols(x, x)|
     # +-----------------------+
@@ -126,92 +177,32 @@ def scalar_iter_pandas_udf_example(spark):
     # |                      4|
     # |                      9|
     # +-----------------------+
+    # $example off:iter_sers_to_iter_ser_pandas_udf$
 
-    # When the UDF is called with a single column that is StructType,
-    # the input to the underlying function is an iterator of pd.DataFrame.
-    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
-    def multiply_two_nested_cols(pdf_iter):
-        for pdf in pdf_iter:
-            yield pdf["a"] * pdf["b"]
-
-    df.select(
-        multiply_two_nested_cols(
-            struct(col("x").alias("a"), col("x").alias("b"))
-        ).alias("y")
-    ).show()
-    # +---+
-    # |  y|
-    # +---+
-    # |  1|
-    # |  4|
-    # |  9|
-    # +---+
-
-    # In the UDF, you can initialize some states before processing batches.
-    # Wrap your code with try/finally or use context managers to ensure
-    # the release of resources at the end.
-    y_bc = spark.sparkContext.broadcast(1)
-
-    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
-    def plus_y(batch_iter):
-        y = y_bc.value  # initialize states
-        try:
-            for x in batch_iter:
-                yield x + y
-        finally:
-            pass  # release resources here, if any
-
-    df.select(plus_y(col("x"))).show()
-    # +---------+
-    # |plus_y(x)|
-    # +---------+
-    # |        2|
-    # |        3|
-    # |        4|
-    # +---------+
-    # $example off:scalar_iter_pandas_udf$
-
-
-def grouped_map_pandas_udf_example(spark):
-    # $example on:grouped_map_pandas_udf$
-    from pyspark.sql.functions import pandas_udf, PandasUDFType
-
-    df = spark.createDataFrame(
-        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-        ("id", "v"))
-
-    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
-    def subtract_mean(pdf):
-        # pdf is a pandas.DataFrame
-        v = pdf.v
-        return pdf.assign(v=v - v.mean())
-
-    df.groupby("id").apply(subtract_mean).show()
-    # +---+----+
-    # | id|   v|
-    # +---+----+
-    # |  1|-0.5|
-    # |  1| 0.5|
-    # |  2|-3.0|
-    # |  2|-1.0|
-    # |  2| 4.0|
-    # +---+----+
-    # $example off:grouped_map_pandas_udf$
 
+def ser_to_scalar_pandas_udf_example(spark):
+    # $example on:ser_to_scalar_pandas_udf$
+    import pandas as pd
 
-def grouped_agg_pandas_udf_example(spark):
-    # $example on:grouped_agg_pandas_udf$
-    from pyspark.sql.functions import pandas_udf, PandasUDFType
+    from pyspark.sql.functions import pandas_udf
     from pyspark.sql import Window
 
     df = spark.createDataFrame(
         [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         ("id", "v"))
 
-    @pandas_udf("double", PandasUDFType.GROUPED_AGG)
-    def mean_udf(v):
+    # Declare the function and create the UDF
+    @pandas_udf("double")
+    def mean_udf(v: pd.Series) -> float:
         return v.mean()
 
+    df.select(mean_udf(df['v'])).show()
+    # +-----------+
+    # |mean_udf(v)|
+    # +-----------+
+    # |        4.2|
+    # +-----------+
+
     df.groupby("id").agg(mean_udf(df['v'])).show()
     # +---+-----------+
     # | id|mean_udf(v)|
@@ -233,37 +224,54 @@ def grouped_agg_pandas_udf_example(spark):
     # |  2| 5.0|   6.0|
     # |  2|10.0|   6.0|
     # +---+----+------+
-    # $example off:grouped_agg_pandas_udf$
+    # $example off:ser_to_scalar_pandas_udf$
 
 
-def map_iter_pandas_udf_example(spark):
-    # $example on:map_iter_pandas_udf$
-    import pandas as pd
+def grouped_apply_in_pandas_example(spark):
+    # $example on:grouped_apply_in_pandas$
+    df = spark.createDataFrame(
+        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+        ("id", "v"))
 
-    from pyspark.sql.functions import pandas_udf, PandasUDFType
+    def subtract_mean(pdf):
+        # pdf is a pandas.DataFrame
+        v = pdf.v
+        return pdf.assign(v=v - v.mean())
+
+    df.groupby("id").applyInPandas(subtract_mean, schema="id long, v 
double").show()
+    # +---+----+
+    # | id|   v|
+    # +---+----+
+    # |  1|-0.5|
+    # |  1| 0.5|
+    # |  2|-3.0|
+    # |  2|-1.0|
+    # |  2| 4.0|
+    # +---+----+
+    # $example off:grouped_apply_in_pandas$
 
+
+def map_in_pandas_example(spark):
+    # $example on:map_in_pandas$
     df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
 
-    @pandas_udf(df.schema, PandasUDFType.MAP_ITER)
-    def filter_func(batch_iter):
-        for pdf in batch_iter:
+    def filter_func(iterator):
+        for pdf in iterator:
             yield pdf[pdf.id == 1]
 
-    df.mapInPandas(filter_func).show()
+    df.mapInPandas(filter_func, schema=df.schema).show()
     # +---+---+
     # | id|age|
     # +---+---+
     # |  1| 21|
     # +---+---+
-    # $example off:map_iter_pandas_udf$
+    # $example off:map_in_pandas$
 
 
-def cogrouped_map_pandas_udf_example(spark):
-    # $example on:cogrouped_map_pandas_udf$
+def cogrouped_apply_in_pandas_example(spark):
+    # $example on:cogrouped_apply_in_pandas$
     import pandas as pd
 
-    from pyspark.sql.functions import pandas_udf, PandasUDFType
-
     df1 = spark.createDataFrame(
         [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), 
(20000102, 2, 4.0)],
         ("time", "id", "v1"))
@@ -272,11 +280,11 @@ def cogrouped_map_pandas_udf_example(spark):
         [(20000101, 1, "x"), (20000101, 2, "y")],
         ("time", "id", "v2"))
 
-    @pandas_udf("time int, id int, v1 double, v2 string", 
PandasUDFType.COGROUPED_MAP)
     def asof_join(l, r):
         return pd.merge_asof(l, r, on="time", by="id")
 
-    df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
+    df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+        asof_join, schema="time int, id int, v1 double, v2 string").show()
     # +--------+---+---+---+
     # |    time| id| v1| v2|
     # +--------+---+---+---+
@@ -285,7 +293,7 @@ def cogrouped_map_pandas_udf_example(spark):
     # |20000101|  2|2.0|  y|
     # |20000102|  2|4.0|  y|
     # +--------+---+---+---+
-    # $example off:cogrouped_map_pandas_udf$
+    # $example off:cogrouped_apply_in_pandas$
 
 
 if __name__ == "__main__":
@@ -296,17 +304,21 @@ if __name__ == "__main__":
 
     print("Running Pandas to/from conversion example")
     dataframe_with_arrow_example(spark)
-    print("Running pandas_udf scalar example")
-    scalar_pandas_udf_example(spark)
-    print("Running pandas_udf scalar iterator example")
-    scalar_iter_pandas_udf_example(spark)
-    print("Running pandas_udf grouped map example")
-    grouped_map_pandas_udf_example(spark)
-    print("Running pandas_udf grouped agg example")
-    grouped_agg_pandas_udf_example(spark)
-    print("Running pandas_udf map iterator example")
-    map_iter_pandas_udf_example(spark)
-    print("Running pandas_udf cogrouped map example")
-    cogrouped_map_pandas_udf_example(spark)
+    print("Running pandas_udf example: Series to Frame")
+    ser_to_frame_pandas_udf_example(spark)
+    print("Running pandas_udf example: Series to Series")
+    ser_to_ser_pandas_udf_example(spark)
+    print("Running pandas_udf example: Iterator of Series to Iterator of 
Seires")
+    iter_ser_to_iter_ser_pandas_udf_example(spark)
+    print("Running pandas_udf example: Iterator of Multiple Series to Iterator 
of Series")
+    iter_sers_to_iter_ser_pandas_udf_example(spark)
+    print("Running pandas_udf example: Series to Scalar")
+    ser_to_scalar_pandas_udf_example(spark)
+    print("Running pandas function example: Grouped Map")
+    grouped_apply_in_pandas_example(spark)
+    print("Running pandas function example: Map")
+    map_in_pandas_example(spark)
+    print("Running pandas function example: Co-grouped Map")
+    cogrouped_apply_in_pandas_example(spark)
 
     spark.stop()
diff --git a/python/pyspark/sql/pandas/functions.py 
b/python/pyspark/sql/pandas/functions.py
index 3060278..31aa321 100644
--- a/python/pyspark/sql/pandas/functions.py
+++ b/python/pyspark/sql/pandas/functions.py
@@ -43,303 +43,228 @@ class PandasUDFType(object):
 @since(2.3)
 def pandas_udf(f=None, returnType=None, functionType=None):
     """
-    Creates a vectorized user defined function (UDF).
+    Creates a pandas user defined function (a.k.a. vectorized user defined 
function).
+
+    Pandas UDFs are user defined functions that are executed by Spark using 
Arrow to transfer
+    data and Pandas to work with the data, which allows vectorized operations. 
A Pandas UDF
+    is defined using the `pandas_udf` as a decorator or to wrap the function, 
and no
+    additional configuration is required. A Pandas UDF behaves as a regular 
PySpark function
+    API in general.
 
     :param f: user-defined function. A python function if used as a standalone 
function
     :param returnType: the return type of the user-defined function. The value 
can be either a
         :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
     :param functionType: an enum value in 
:class:`pyspark.sql.functions.PandasUDFType`.
-                         Default: SCALAR.
-
-    .. seealso:: :meth:`pyspark.sql.DataFrame.mapInPandas`
-    .. seealso:: :meth:`pyspark.sql.GroupedData.applyInPandas`
-    .. seealso:: :meth:`pyspark.sql.PandasCogroupedOps.applyInPandas`
-
-    The function type of the UDF can be one of the following:
-
-    1. SCALAR
-
-       A scalar UDF defines a transformation: One or more `pandas.Series` -> A 
`pandas.Series`.
-       The length of the returned `pandas.Series` must be of the same as the 
input `pandas.Series`.
-       If the return type is :class:`StructType`, the returned value should be 
a `pandas.DataFrame`.
-
-       :class:`MapType`, nested :class:`StructType` are currently not 
supported as output types.
-
-       Scalar UDFs can be used with :meth:`pyspark.sql.DataFrame.withColumn` 
and
-       :meth:`pyspark.sql.DataFrame.select`.
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> from pyspark.sql.types import IntegerType, StringType
-       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())  # doctest: 
+SKIP
-       >>> @pandas_udf(StringType())  # doctest: +SKIP
-       ... def to_upper(s):
-       ...     return s.str.upper()
-       ...
-       >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
-       ... def add_one(x):
-       ...     return x + 1
-       ...
-       >>> df = spark.createDataFrame([(1, "John Doe", 21)],
-       ...                            ("id", "name", "age"))  # doctest: +SKIP
-       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
-       ...     .show()  # doctest: +SKIP
-       +----------+--------------+------------+
-       |slen(name)|to_upper(name)|add_one(age)|
-       +----------+--------------+------------+
-       |         8|      JOHN DOE|          22|
-       +----------+--------------+------------+
-       >>> @pandas_udf("first string, last string")  # doctest: +SKIP
-       ... def split_expand(n):
-       ...     return n.str.split(expand=True)
-       >>> df.select(split_expand("name")).show()  # doctest: +SKIP
-       +------------------+
-       |split_expand(name)|
-       +------------------+
-       |       [John, Doe]|
-       +------------------+
-
-       .. note:: The length of `pandas.Series` within a scalar UDF is not that 
of the whole input
-           column, but is the length of an internal batch used for each call 
to the function.
-           Therefore, this can be used, for example, to ensure the length of 
each returned
-           `pandas.Series`, and can not be used as the column length.
-
-    2. SCALAR_ITER
-
-       A scalar iterator UDF is semantically the same as the scalar Pandas UDF 
above except that the
-       wrapped Python function takes an iterator of batches as input instead 
of a single batch and,
-       instead of returning a single output batch, it yields output batches or 
explicitly returns an
-       generator or an iterator of output batches.
-       It is useful when the UDF execution requires initializing some state, 
e.g., loading a machine
-       learning model file to apply inference to every input batch.
-
-       .. note:: It is not guaranteed that one invocation of a scalar iterator 
UDF will process all
-           batches from one partition, although it is currently implemented 
this way.
-           Your code shall not rely on this behavior because it might change 
in the future for
-           further optimization, e.g., one invocation processes multiple 
partitions.
-
-       Scalar iterator UDFs are used with 
:meth:`pyspark.sql.DataFrame.withColumn` and
-       :meth:`pyspark.sql.DataFrame.select`.
-
-       >>> import pandas as pd  # doctest: +SKIP
-       >>> from pyspark.sql.functions import col, pandas_udf, struct, 
PandasUDFType
-       >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"])  # doctest: +SKIP
-       >>> df = spark.createDataFrame(pdf)  # doctest: +SKIP
-
-       When the UDF is called with a single column that is not `StructType`, 
the input to the
-       underlying function is an iterator of `pd.Series`.
-
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def plus_one(batch_iter):
-       ...     for x in batch_iter:
-       ...         yield x + 1
-       ...
-       >>> df.select(plus_one(col("x"))).show()  # doctest: +SKIP
-       +-----------+
-       |plus_one(x)|
-       +-----------+
-       |          2|
-       |          3|
-       |          4|
-       +-----------+
-
-       When the UDF is called with more than one columns, the input to the 
underlying function is an
-       iterator of `pd.Series` tuple.
-
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def multiply_two_cols(batch_iter):
-       ...     for a, b in batch_iter:
-       ...         yield a * b
-       ...
-       >>> df.select(multiply_two_cols(col("x"), col("x"))).show()  # doctest: 
+SKIP
-       +-----------------------+
-       |multiply_two_cols(x, x)|
-       +-----------------------+
-       |                      1|
-       |                      4|
-       |                      9|
-       +-----------------------+
-
-       When the UDF is called with a single column that is `StructType`, the 
input to the underlying
-       function is an iterator of `pd.DataFrame`.
-
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def multiply_two_nested_cols(pdf_iter):
-       ...    for pdf in pdf_iter:
-       ...        yield pdf["a"] * pdf["b"]
-       ...
-       >>> df.select(
-       ...     multiply_two_nested_cols(
-       ...         struct(col("x").alias("a"), col("x").alias("b"))
-       ...     ).alias("y")
-       ... ).show()  # doctest: +SKIP
-       +---+
-       |  y|
-       +---+
-       |  1|
-       |  4|
-       |  9|
-       +---+
-
-       In the UDF, you can initialize some states before processing batches, 
wrap your code with
-       `try ... finally ...` or use context managers to ensure the release of 
resources at the end
-       or in case of early termination.
-
-       >>> y_bc = spark.sparkContext.broadcast(1)  # doctest: +SKIP
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def plus_y(batch_iter):
-       ...     y = y_bc.value  # initialize some state
-       ...     try:
-       ...         for x in batch_iter:
-       ...             yield x + y
-       ...     finally:
-       ...         pass  # release resources here, if any
-       ...
-       >>> df.select(plus_y(col("x"))).show()  # doctest: +SKIP
-       +---------+
-       |plus_y(x)|
-       +---------+
-       |        2|
-       |        3|
-       |        4|
-       +---------+
-
-    3. GROUPED_MAP
-
-       A grouped map UDF defines transformation: A `pandas.DataFrame` -> A 
`pandas.DataFrame`
-       The returnType should be a :class:`StructType` describing the schema of 
the returned
-       `pandas.DataFrame`. The column labels of the returned 
`pandas.DataFrame` must either match
-       the field names in the defined returnType schema if specified as 
strings, or match the
-       field data types by position if not strings, e.g. integer indices.
-       The length of the returned `pandas.DataFrame` can be arbitrary.
-
-       Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))  # doctest: +SKIP
-       >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # 
doctest: +SKIP
-       ... def normalize(pdf):
-       ...     v = pdf.v
-       ...     return pdf.assign(v=(v - v.mean()) / v.std())
-       >>> df.groupby("id").apply(normalize).show()  # doctest: +SKIP
-       +---+-------------------+
-       | id|                  v|
-       +---+-------------------+
-       |  1|-0.7071067811865475|
-       |  1| 0.7071067811865475|
-       |  2|-0.8320502943378437|
-       |  2|-0.2773500981126146|
-       |  2| 1.1094003924504583|
-       +---+-------------------+
-
-       Alternatively, the user can define a function that takes two arguments.
-       In this case, the grouping key(s) will be passed as the first argument 
and the data will
-       be passed as the second argument. The grouping key(s) will be passed as 
a tuple of numpy
-       data types, e.g., `numpy.int32` and `numpy.float64`. The data will 
still be passed in
-       as a `pandas.DataFrame` containing all columns from the original Spark 
DataFrame.
-       This is useful when the user does not want to hardcode grouping key(s) 
in the function.
-
-       >>> import pandas as pd  # doctest: +SKIP
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))  # doctest: +SKIP
-       >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # 
doctest: +SKIP
-       ... def mean_udf(key, pdf):
-       ...     # key is a tuple of one numpy.int64, which is the value
-       ...     # of 'id' for the current group
-       ...     return pd.DataFrame([key + (pdf.v.mean(),)])
-       >>> df.groupby('id').apply(mean_udf).show()  # doctest: +SKIP
-       +---+---+
-       | id|  v|
-       +---+---+
-       |  1|1.5|
-       |  2|6.0|
-       +---+---+
-       >>> @pandas_udf(
-       ...    "id long, `ceil(v / 2)` long, v double",
-       ...    PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
-       >>> def sum_udf(key, pdf):
-       ...     # key is a tuple of two numpy.int64s, which is the values
-       ...     # of 'id' and 'ceil(df.v / 2)' for the current group
-       ...     return pd.DataFrame([key + (pdf.v.sum(),)])
-       >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show()  # doctest: 
+SKIP
-       +---+-----------+----+
-       | id|ceil(v / 2)|   v|
-       +---+-----------+----+
-       |  2|          5|10.0|
-       |  1|          1| 3.0|
-       |  2|          3| 5.0|
-       |  2|          2| 3.0|
-       +---+-----------+----+
-
-       .. note:: If returning a new `pandas.DataFrame` constructed with a 
dictionary, it is
-           recommended to explicitly index the columns by name to ensure the 
positions are correct,
-           or alternatively use an `OrderedDict`.
-           For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 
'a'])` or
-           `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
-
-       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
-
-    4. GROUPED_AGG
-
-       A grouped aggregate UDF defines a transformation: One or more 
`pandas.Series` -> A scalar
-       The `returnType` should be a primitive data type, e.g., 
:class:`DoubleType`.
-       The returned scalar can be either a python primitive type, e.g., `int` 
or `float`
-       or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
-
-       :class:`MapType` and :class:`StructType` are currently not supported as 
output types.
-
-       Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg` 
and
-       :class:`pyspark.sql.Window`
-
-       This example shows using grouped aggregated UDFs with groupby:
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))
-       >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
-       ... def mean_udf(v):
-       ...     return v.mean()
-       >>> df.groupby("id").agg(mean_udf(df['v'])).show()  # doctest: +SKIP
-       +---+-----------+
-       | id|mean_udf(v)|
-       +---+-----------+
-       |  1|        1.5|
-       |  2|        6.0|
-       +---+-----------+
-
-       This example shows using grouped aggregated UDFs as window functions.
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> from pyspark.sql import Window
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))
-       >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
-       ... def mean_udf(v):
-       ...     return v.mean()
-       >>> w = (Window.partitionBy('id')
-       ...            .orderBy('v')
-       ...            .rowsBetween(-1, 0))
-       >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()  # 
doctest: +SKIP
-       +---+----+------+
-       | id|   v|mean_v|
-       +---+----+------+
-       |  1| 1.0|   1.0|
-       |  1| 2.0|   1.5|
-       |  2| 3.0|   3.0|
-       |  2| 5.0|   4.0|
-       |  2|10.0|   7.5|
-       +---+----+------+
-
-       .. note:: For performance reasons, the input series to window functions 
are not copied.
+        Default: SCALAR.
+
+        .. note:: This parameter exists for compatibility. Using Python type 
hints is encouraged.
+
+    In order to use this API, customarily the below are imported:
+
+    >>> import pandas as pd
+    >>> from pyspark.sql.functions import pandas_udf
+
+    From Spark 3.0 with Python 3.6+, `Python type hints 
<https://www.python.org/dev/peps/pep-0484>`_
+    detect the function types as below:
+
+    >>> @pandas_udf(IntegerType())
+    ... def slen(s: pd.Series) -> pd.Series:
+    ...     return s.str.len()
+
+    Prior to Spark 3.0, the pandas UDF used `functionType` to decide the 
execution type as below:
+
+    >>> from pyspark.sql.functions import PandasUDFType
+    >>> from pyspark.sql.types import IntegerType
+    >>> @pandas_udf(IntegerType(), PandasUDFType.SCALAR)
+    ... def slen(s):
+    ...     return s.str.len()
+
+    It is preferred to specify type hints for the pandas UDF instead of 
specifying pandas UDF
+    type via `functionType` which will be deprecated in the future releases.
+
+    Note that the type hint should use `pandas.Series` in all cases but there 
is one variant
+    that `pandas.DataFrame` should be used for its input or output type hint 
instead when the input
+    or output column is of :class:`pyspark.sql.types.StructType`. The 
following example shows
+    a Pandas UDF which takes long column, string column and struct column, and 
outputs a struct
+    column. It requires the function to specify the type hints of 
`pandas.Series` and
+    `pandas.DataFrame` as below:
+
+    >>> @pandas_udf("col1 string, col2 long")
+    >>> def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> 
pd.DataFrame:
+    ...     s3['col2'] = s1 + s2.str.len()
+    ...     return s3
+    ...
+    >>> # Create a Spark DataFrame that has three columns including a sturct 
column.
+    ... df = spark.createDataFrame(
+    ...     [[1, "a string", ("a nested string",)]],
+    ...     "long_col long, string_col string, struct_col struct<col1:string>")
+    >>> df.printSchema()
+    root
+    |-- long_column: long (nullable = true)
+    |-- string_column: string (nullable = true)
+    |-- struct_column: struct (nullable = true)
+    |    |-- col1: string (nullable = true)
+    >>> df.select(func("long_col", "string_col", "struct_col")).printSchema()
+    |-- func(long_col, string_col, struct_col): struct (nullable = true)
+    |    |-- col1: string (nullable = true)
+    |    |-- col2: long (nullable = true)
+
+    In the following sections, it describes the cominations of the supported 
type hints. For
+    simplicity, `pandas.DataFrame` variant is omitted.
+
+    * Series to Series
+        `pandas.Series`, ... -> `pandas.Series`
+
+        The function takes one or more `pandas.Series` and outputs one 
`pandas.Series`.
+        The output of the function should always be of the same length as the 
input.
+
+        >>> @pandas_udf("string")
+        ... def to_upper(s: pd.Series) -> pd.Series:
+        ...     return s.str.upper()
+        ...
+        >>> df = spark.createDataFrame([("John Doe",)], ("name",))
+        >>> df.select(to_upper("name")).show()
+        +--------------+
+        |to_upper(name)|
+        +--------------+
+        |      JOHN DOE|
+        +--------------+
+
+        >>> @pandas_udf("first string, last string")
+        ... def split_expand(s: pd.Series) -> pd.DataFrame:
+        ...     return s.str.split(expand=True)
+        ...
+        >>> df = spark.createDataFrame([("John Doe",)], ("name",))
+        >>> df.select(split_expand("name")).show()
+        +------------------+
+        |split_expand(name)|
+        +------------------+
+        |       [John, Doe]|
+        +------------------+
+
+        .. note:: The length of the input is not that of the whole input 
column, but is the
+            length of an internal batch used for each call to the function.
+
+    * Iterator of Series to Iterator of Series
+        `Iterator[pandas.Series]` -> `Iterator[pandas.Series]`
+
+        The function takes an iterator of `pandas.Series` and outputs an 
iterator of
+        `pandas.Series`. In this case, the created pandas UDF instance 
requires one input
+        column when this is called as a PySpark column. The output of each 
series from
+        the function should always be of the same length as the input.
+
+        It is useful when the UDF execution
+        requires initializing some states although internally it works 
identically as
+        Series to Series case. The pseudocode below illustrates the example.
+
+        .. highlight:: python
+        .. code-block:: python
+
+            @pandas_udf("long")
+            def calculate(iterator: Iterator[pd.Series]) -> 
Iterator[pd.Series]:
+                # Do some expensive initialization with a state
+                state = very_expensive_initialization()
+                for x in iterator:
+                    # Use that state for whole iterator.
+                    yield calculate_with_state(x, state)
+
+            df.select(calculate("value")).show()
+
+        >>> from typing import Iterator
+        >>> @pandas_udf("long")
+        ... def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+        ...     for s in iterator:
+        ...         yield s + 1
+        ...
+        >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
+        >>> df.select(plus_one(df.v)).show()
+        +-----------+
+        |plus_one(v)|
+        +-----------+
+        |          2|
+        |          3|
+        |          4|
+        +-----------+
+
+        .. note:: The length of each series is the length of a batch 
internally used.
+
+    * Iterator of Multiple Series to Iterator of Series
+        `Iterator[Tuple[pandas.Series, ...]]` -> `Iterator[pandas.Series]`
+
+        The function takes an iterator of a tuple of multiple `pandas.Series` 
and outputs an
+        iterator of `pandas.Series`. In this case, the created pandas UDF 
instance requires
+        input columns as many as the series when this is called as a PySpark 
column.
+        It works identically as Iterator of Series to Iterator of Series case 
except
+        the parameter difference. The output of each series from the function 
should always
+        be of the same length as the input.
+
+        >>> from typing import Iterator, Tuple
+        >>> from pyspark.sql.functions import struct, col
+        >>> @pandas_udf("long")
+        ... def multiply(iterator: Iterator[Tuple[pd.Series, pd.DataFrame]]) 
-> Iterator[pd.Series]:
+        ...     for s1, df in iterator:
+        ...         yield s1 * df.v
+        ...
+        >>> df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
+        >>> df.withColumn('output', multiply(col("v"), 
struct(col("v")))).show()
+        +---+------+
+        |  v|output|
+        +---+------+
+        |  1|     1|
+        |  2|     4|
+        |  3|     9|
+        +---+------+
+
+        .. note:: The length of each series is the length of a batch 
internally used.
+
+    * Series to Scalar
+        `pandas.Series`, ... -> `Any`
+
+        The function takes `pandas.Series` and returns a scalar value. The 
`returnType`
+        should be a primitive data type, and the returned scalar can be either 
a python primitive
+        type, e.g., int or float or a numpy data type, e.g., numpy.int64 or 
numpy.float64.
+        `Any` should ideally be a specific scalar type accordingly.
+
+        >>> @pandas_udf("double")
+        ... def mean_udf(v: pd.Series) -> float:
+        ...     return v.mean()
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v"))
+        >>> df.groupby("id").agg(mean_udf(df['v'])).show()
+        +---+-----------+
+        | id|mean_udf(v)|
+        +---+-----------+
+        |  1|        1.5|
+        |  2|        6.0|
+        +---+-----------+
+
+        This UDF can also be used as window functions as below:
+
+        >>> from pyspark.sql import Window
+        >>> @pandas_udf("double")
+        ... def mean_udf(v: pd.Series) -> float:
+        ...     return v.mean()
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v"))
+        >>> w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
+        >>> df.withColumn('mean_v', mean_udf("v").over(w)).show()
+        +---+----+------+
+        | id|   v|mean_v|
+        +---+----+------+
+        |  1| 1.0|   1.0|
+        |  1| 2.0|   1.5|
+        |  2| 3.0|   3.0|
+        |  2| 5.0|   4.0|
+        |  2|10.0|   7.5|
+        +---+----+------+
+
+        .. note:: For performance reasons, the input series to window 
functions are not copied.
             Therefore, mutating the input series is not allowed and will cause 
incorrect results.
             For the same reason, users should also not rely on the index of 
the input series.
 
-       .. seealso:: :meth:`pyspark.sql.GroupedData.agg` and 
:class:`pyspark.sql.Window`
+        .. seealso:: :meth:`pyspark.sql.GroupedData.agg` and 
:class:`pyspark.sql.Window`
 
     .. note:: The user-defined functions do not support conditional 
expressions or short circuiting
         in boolean expressions and it ends up with being executed all 
internally. If the functions
@@ -348,10 +273,21 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
     .. note:: The user-defined functions do not take keyword arguments on the 
calling side.
 
     .. note:: The data type of returned `pandas.Series` from the user-defined 
functions should be
-        matched with defined returnType (see :meth:`types.to_arrow_type` and
+        matched with defined `returnType` (see :meth:`types.to_arrow_type` and
         :meth:`types.from_arrow_type`). When there is mismatch between them, 
Spark might do
         conversion on returned data. The conversion is not guaranteed to be 
correct and results
         should be checked for accuracy by users.
+
+    .. note:: Currently,
+        :class:`pyspark.sql.types.MapType`,
+        :class:`pyspark.sql.types.ArrayType` of 
:class:`pyspark.sql.types.TimestampType` and
+        nested :class:`pyspark.sql.types.StructType`
+        are currently not supported as output types.
+
+    .. seealso:: :meth:`pyspark.sql.DataFrame.mapInPandas`
+    .. seealso:: :meth:`pyspark.sql.GroupedData.applyInPandas`
+    .. seealso:: :meth:`pyspark.sql.PandasCogroupedOps.applyInPandas`
+    .. seealso:: :meth:`pyspark.sql.UDFRegistration.register`
     """
 
     # The following table shows most of Pandas data and SQL type conversions 
in Pandas UDFs that
@@ -480,25 +416,3 @@ def _create_pandas_udf(f, returnType, evalType):
             "or three arguments (key, left, right).")
 
     return _create_udf(f, returnType, evalType)
-
-
-def _test():
-    import doctest
-    from pyspark.sql import SparkSession
-    import pyspark.sql.pandas.functions
-    globs = pyspark.sql.pandas.functions.__dict__.copy()
-    spark = SparkSession.builder\
-        .master("local[4]")\
-        .appName("sql.pandas.functions tests")\
-        .getOrCreate()
-    globs['spark'] = spark
-    (failure_count, test_count) = doctest.testmod(
-        pyspark.sql.pandas.functions, globs=globs,
-        optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | 
doctest.REPORT_NDIFF)
-    spark.stop()
-    if failure_count:
-        sys.exit(-1)
-
-
-if __name__ == "__main__":
-    _test()
diff --git a/python/pyspark/sql/pandas/group_ops.py 
b/python/pyspark/sql/pandas/group_ops.py
index 3152271..b93f051 100644
--- a/python/pyspark/sql/pandas/group_ops.py
+++ b/python/pyspark/sql/pandas/group_ops.py
@@ -88,29 +88,27 @@ class PandasGroupedOpsMixin(object):
         to the user-function and the returned `pandas.DataFrame` are combined 
as a
         :class:`DataFrame`.
 
-        The returned `pandas.DataFrame` can be of arbitrary length and its 
schema must match the
-        returnType of the pandas udf.
-
-        .. note:: This function requires a full shuffle. All the data of a 
group will be loaded
-            into memory, so the user should be aware of the potential OOM risk 
if data is skewed
-            and certain groups are too large to fit in memory.
+        The `schema` should be a :class:`StructType` describing the schema of 
the returned
+        `pandas.DataFrame`. The column labels of the returned 
`pandas.DataFrame` must either match
+        the field names in the defined schema if specified as strings, or 
match the
+        field data types by position if not strings, e.g. integer indices.
+        The length of the returned `pandas.DataFrame` can be arbitrary.
 
         :param func: a Python native function that takes a `pandas.DataFrame`, 
and outputs a
             `pandas.DataFrame`.
         :param schema: the return type of the `func` in PySpark. The value can 
be either a
             :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
 
-        .. note:: Experimental
-
-        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> import pandas as pd  # doctest: +SKIP
+        >>> from pyspark.sql.functions import pandas_udf, ceil
         >>> df = spark.createDataFrame(
         ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-        ...     ("id", "v"))
+        ...     ("id", "v"))  # doctest: +SKIP
         >>> def normalize(pdf):
         ...     v = pdf.v
         ...     return pdf.assign(v=(v - v.mean()) / v.std())
-        >>> df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
-        ... # doctest: +SKIP
+        >>> df.groupby("id").applyInPandas(
+        ...     normalize, schema="id long, v double").show()  # doctest: +SKIP
         +---+-------------------+
         | id|                  v|
         +---+-------------------+
@@ -121,8 +119,56 @@ class PandasGroupedOpsMixin(object):
         |  2| 1.1094003924504583|
         +---+-------------------+
 
-        .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+        Alternatively, the user can pass a function that takes two arguments.
+        In this case, the grouping key(s) will be passed as the first argument 
and the data will
+        be passed as the second argument. The grouping key(s) will be passed 
as a tuple of numpy
+        data types, e.g., `numpy.int32` and `numpy.float64`. The data will 
still be passed in
+        as a `pandas.DataFrame` containing all columns from the original Spark 
DataFrame.
+        This is useful when the user does not want to hardcode grouping key(s) 
in the function.
+
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+        ...     ("id", "v"))  # doctest: +SKIP
+        >>> def mean_func(key, pdf):
+        ...     # key is a tuple of one numpy.int64, which is the value
+        ...     # of 'id' for the current group
+        ...     return pd.DataFrame([key + (pdf.v.mean(),)])
+        >>> df.groupby('id').applyInPandas(
+        ...     mean_func, schema="id long, v double").show()  # doctest: +SKIP
+        +---+---+
+        | id|  v|
+        +---+---+
+        |  1|1.5|
+        |  2|6.0|
+        +---+---+
+        >>> def sum_func(key, pdf):
+        ...     # key is a tuple of two numpy.int64s, which is the values
+        ...     # of 'id' and 'ceil(df.v / 2)' for the current group
+        ...     return pd.DataFrame([key + (pdf.v.sum(),)])
+        >>> df.groupby(df.id, ceil(df.v / 2)).applyInPandas(
+        ...     sum_func, schema="id long, `ceil(v / 2)` long, v 
double").show()  # doctest: +SKIP
+        +---+-----------+----+
+        | id|ceil(v / 2)|   v|
+        +---+-----------+----+
+        |  2|          5|10.0|
+        |  1|          1| 3.0|
+        |  2|          3| 5.0|
+        |  2|          2| 3.0|
+        +---+-----------+----+
+
+        .. note:: This function requires a full shuffle. All the data of a 
group will be loaded
+            into memory, so the user should be aware of the potential OOM risk 
if data is skewed
+            and certain groups are too large to fit in memory.
+
+        .. note:: If returning a new `pandas.DataFrame` constructed with a 
dictionary, it is
+            recommended to explicitly index the columns by name to ensure the 
positions are correct,
+            or alternatively use an `OrderedDict`.
+            For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 
'a'])` or
+            `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
 
+        .. note:: Experimental
+
+        .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
         """
         from pyspark.sql import GroupedData
         from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -176,14 +222,11 @@ class PandasCogroupedOps(object):
         `pandas.DataFrame` to the user-function and the returned 
`pandas.DataFrame` are combined as
         a :class:`DataFrame`.
 
-        The returned `pandas.DataFrame` can be of arbitrary length and its 
schema must match the
-        returnType of the pandas udf.
-
-        .. note:: This function requires a full shuffle. All the data of a 
cogroup will be loaded
-            into memory, so the user should be aware of the potential OOM risk 
if data is skewed
-            and certain groups are too large to fit in memory.
-
-        .. note:: Experimental
+        The `schema` should be a :class:`StructType` describing the schema of 
the returned
+        `pandas.DataFrame`. The column labels of the returned 
`pandas.DataFrame` must either match
+        the field names in the defined schema if specified as strings, or 
match the
+        field data types by position if not strings, e.g. integer indices.
+        The length of the returned `pandas.DataFrame` can be arbitrary.
 
         :param func: a Python native function that takes two 
`pandas.DataFrame`\\s, and
             outputs a `pandas.DataFrame`, or that takes one tuple (grouping 
keys) and two
@@ -191,7 +234,7 @@ class PandasCogroupedOps(object):
         :param schema: the return type of the `func` in PySpark. The value can 
be either a
             :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
 
-        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> from pyspark.sql.functions import pandas_udf
         >>> df1 = spark.createDataFrame(
         ...     [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), 
(20000102, 2, 4.0)],
         ...     ("time", "id", "v1"))
@@ -232,6 +275,18 @@ class PandasCogroupedOps(object):
         |20000102|  1|3.0|  x|
         +--------+---+---+---+
 
+        .. note:: This function requires a full shuffle. All the data of a 
cogroup will be loaded
+            into memory, so the user should be aware of the potential OOM risk 
if data is skewed
+            and certain groups are too large to fit in memory.
+
+        .. note:: If returning a new `pandas.DataFrame` constructed with a 
dictionary, it is
+            recommended to explicitly index the columns by name to ensure the 
positions are correct,
+            or alternatively use an `OrderedDict`.
+            For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 
'a'])` or
+            `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
+
+        .. note:: Experimental
+
         .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
 
         """
diff --git a/python/pyspark/sql/pandas/map_ops.py 
b/python/pyspark/sql/pandas/map_ops.py
index 75cacd7..9835e88 100644
--- a/python/pyspark/sql/pandas/map_ops.py
+++ b/python/pyspark/sql/pandas/map_ops.py
@@ -45,10 +45,10 @@ class PandasMapOpsMixin(object):
         :param schema: the return type of the `func` in PySpark. The value can 
be either a
             :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
 
-        >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+        >>> from pyspark.sql.functions import pandas_udf
         >>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
-        >>> def filter_func(batch_iter):
-        ...     for pdf in batch_iter:
+        >>> def filter_func(iterator):
+        ...     for pdf in iterator:
         ...         yield pdf[pdf.id == 1]
         >>> df.mapInPandas(filter_func, df.schema).show()  # doctest: +SKIP
         +---+---+
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 433c5fc..10546ec 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -297,17 +297,18 @@ class UDFRegistration(object):
             >>> spark.sql("SELECT random_udf()").collect()  # doctest: +SKIP
             [Row(random_udf()=82)]
 
-            >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-            >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
-            ... def add_one(x):
-            ...     return x + 1
+            >>> import pandas as pd  # doctest: +SKIP
+            >>> from pyspark.sql.functions import pandas_udf
+            >>> @pandas_udf("integer")  # doctest: +SKIP
+            ... def add_one(s: pd.Series) -> pd.Series:
+            ...     return s + 1
             ...
             >>> _ = spark.udf.register("add_one", add_one)  # doctest: +SKIP
             >>> spark.sql("SELECT add_one(id) FROM range(3)").collect()  # 
doctest: +SKIP
             [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
 
-            >>> @pandas_udf("integer", PandasUDFType.GROUPED_AGG)  # doctest: 
+SKIP
-            ... def sum_udf(v):
+            >>> @pandas_udf("integer")  # doctest: +SKIP
+            ... def sum_udf(v: pd.Series) -> int:
             ...     return v.sum()
             ...
             >>> _ = spark.udf.register("sum_udf", sum_udf)  # doctest: +SKIP
@@ -414,6 +415,9 @@ def _test():
         .appName("sql.udf tests")\
         .getOrCreate()
     globs['spark'] = spark
+    # Hack to skip the unit tests in register. These are currently being 
tested in proper tests.
+    # We should reenable this test once we completely drop Python 2.
+    del pyspark.sql.udf.UDFRegistration.register
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.udf, globs=globs,
         optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to