This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b2448b  [SPARK-28056][PYTHON] add doc for SCALAR_ITER Pandas UDF
1b2448b is described below

commit 1b2448bc10a8ee732d08fa1abae6d64ae25e3a14
Author: Xiangrui Meng <m...@databricks.com>
AuthorDate: Mon Jun 17 20:51:36 2019 -0700

    [SPARK-28056][PYTHON] add doc for SCALAR_ITER Pandas UDF
    
    ## What changes were proposed in this pull request?
    
    Add docs for `SCALAR_ITER` Pandas UDF.
    
    cc: WeichenXu123 HyukjinKwon
    
    ## How was this patch tested?
    
    Tested example code manually.
    
    Closes #24897 from mengxr/SPARK-28056.
    
    Authored-by: Xiangrui Meng <m...@databricks.com>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 docs/sql-pyspark-pandas-with-arrow.md | 17 +++++++
 examples/src/main/python/sql/arrow.py | 86 +++++++++++++++++++++++++++++++++++
 2 files changed, 103 insertions(+)

diff --git a/docs/sql-pyspark-pandas-with-arrow.md 
b/docs/sql-pyspark-pandas-with-arrow.md
index 6cf280c..9cab4be 100644
--- a/docs/sql-pyspark-pandas-with-arrow.md
+++ b/docs/sql-pyspark-pandas-with-arrow.md
@@ -86,6 +86,23 @@ The following example shows how to create a scalar Pandas 
UDF that computes the
 </div>
 </div>
 
+### Scalar Iterator
+
+Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF 
above except that the
+underlying Python function takes an iterator of batches as input instead of a 
single batch and,
+instead of returning a single output batch, it yields output batches or 
returns an iterator of
+output batches.
+It is useful when the UDF execution requires initializing some states, e.g., 
loading an machine
+learning model file to apply inference to every input batch.
+
+The following example shows how to create scalar iterator Pandas UDFs:
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example scalar_iter_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+
 ### Grouped Map
 Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the 
"split-apply-combine" pattern.
 Split-apply-combine consists of three steps:
diff --git a/examples/src/main/python/sql/arrow.py 
b/examples/src/main/python/sql/arrow.py
index c1e2d29..ede121b 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -86,6 +86,92 @@ def scalar_pandas_udf_example(spark):
     # $example off:scalar_pandas_udf$
 
 
+def scalar_iter_pandas_udf_example(spark):
+    # $example on:scalar_iter_pandas_udf$
+    import pandas as pd
+
+    from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType
+
+    pdf = pd.DataFrame([1, 2, 3], columns=["x"])
+    df = spark.createDataFrame(pdf)
+
+    # When the UDF is called with a single column that is not StructType,
+    # the input to the underlying function is an iterator of pd.Series.
+    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
+    def plus_one(batch_iter):
+        for x in batch_iter:
+            yield x + 1
+
+    df.select(plus_one(col("x"))).show()
+    # +-----------+
+    # |plus_one(x)|
+    # +-----------+
+    # |          2|
+    # |          3|
+    # |          4|
+    # +-----------+
+
+    # When the UDF is called with more than one columns,
+    # the input to the underlying function is an iterator of pd.Series tuple.
+    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
+    def multiply_two_cols(batch_iter):
+        for a, b in batch_iter:
+            yield a * b
+
+    df.select(multiply_two_cols(col("x"), col("x"))).show()
+    # +-----------------------+
+    # |multiply_two_cols(x, x)|
+    # +-----------------------+
+    # |                      1|
+    # |                      4|
+    # |                      9|
+    # +-----------------------+
+
+    # When the UDF is called with a single column that is StructType,
+    # the input to the underlying function is an iterator of pd.DataFrame.
+    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
+    def multiply_two_nested_cols(pdf_iter):
+        for pdf in pdf_iter:
+            yield pdf["a"] * pdf["b"]
+
+    df.select(
+        multiply_two_nested_cols(
+            struct(col("x").alias("a"), col("x").alias("b"))
+        ).alias("y")
+    ).show()
+    # +---+
+    # |  y|
+    # +---+
+    # |  1|
+    # |  4|
+    # |  9|
+    # +---+
+
+    # In the UDF, you can initialize some states before processing batches.
+    # Wrap your code with try/finally or use context managers to ensure
+    # the release of resources at the end.
+    y_bc = spark.sparkContext.broadcast(1)
+
+    @pandas_udf("long", PandasUDFType.SCALAR_ITER)
+    def plus_y(batch_iter):
+        y = y_bc.value  # initialize states
+        try:
+            for x in batch_iter:
+                yield x + y
+        finally:
+            pass  # release resources here, if any
+
+    df.select(plus_y(col("x"))).show()
+    # +---------+
+    # |plus_y(x)|
+    # +---------+
+    # |        2|
+    # |        3|
+    # |        4|
+    # +---------+
+    # $example off:scalar_iter_pandas_udf$
+
+
 def grouped_map_pandas_udf_example(spark):
     # $example on:grouped_map_pandas_udf$
     from pyspark.sql.functions import pandas_udf, PandasUDFType


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to