This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1b2448b [SPARK-28056][PYTHON] add doc for SCALAR_ITER Pandas UDF 1b2448b is described below commit 1b2448bc10a8ee732d08fa1abae6d64ae25e3a14 Author: Xiangrui Meng <m...@databricks.com> AuthorDate: Mon Jun 17 20:51:36 2019 -0700 [SPARK-28056][PYTHON] add doc for SCALAR_ITER Pandas UDF ## What changes were proposed in this pull request? Add docs for `SCALAR_ITER` Pandas UDF. cc: WeichenXu123 HyukjinKwon ## How was this patch tested? Tested example code manually. Closes #24897 from mengxr/SPARK-28056. Authored-by: Xiangrui Meng <m...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- docs/sql-pyspark-pandas-with-arrow.md | 17 +++++++ examples/src/main/python/sql/arrow.py | 86 +++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/docs/sql-pyspark-pandas-with-arrow.md b/docs/sql-pyspark-pandas-with-arrow.md index 6cf280c..9cab4be 100644 --- a/docs/sql-pyspark-pandas-with-arrow.md +++ b/docs/sql-pyspark-pandas-with-arrow.md @@ -86,6 +86,23 @@ The following example shows how to create a scalar Pandas UDF that computes the </div> </div> +### Scalar Iterator + +Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF above except that the +underlying Python function takes an iterator of batches as input instead of a single batch and, +instead of returning a single output batch, it yields output batches or returns an iterator of +output batches. +It is useful when the UDF execution requires initializing some states, e.g., loading an machine +learning model file to apply inference to every input batch. + +The following example shows how to create scalar iterator Pandas UDFs: + +<div class="codetabs"> +<div data-lang="python" markdown="1"> +{% include_example scalar_iter_pandas_udf python/sql/arrow.py %} +</div> +</div> + ### Grouped Map Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern. Split-apply-combine consists of three steps: diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py index c1e2d29..ede121b 100644 --- a/examples/src/main/python/sql/arrow.py +++ b/examples/src/main/python/sql/arrow.py @@ -86,6 +86,92 @@ def scalar_pandas_udf_example(spark): # $example off:scalar_pandas_udf$ +def scalar_iter_pandas_udf_example(spark): + # $example on:scalar_iter_pandas_udf$ + import pandas as pd + + from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType + + pdf = pd.DataFrame([1, 2, 3], columns=["x"]) + df = spark.createDataFrame(pdf) + + # When the UDF is called with a single column that is not StructType, + # the input to the underlying function is an iterator of pd.Series. + @pandas_udf("long", PandasUDFType.SCALAR_ITER) + def plus_one(batch_iter): + for x in batch_iter: + yield x + 1 + + df.select(plus_one(col("x"))).show() + # +-----------+ + # |plus_one(x)| + # +-----------+ + # | 2| + # | 3| + # | 4| + # +-----------+ + + # When the UDF is called with more than one columns, + # the input to the underlying function is an iterator of pd.Series tuple. + @pandas_udf("long", PandasUDFType.SCALAR_ITER) + def multiply_two_cols(batch_iter): + for a, b in batch_iter: + yield a * b + + df.select(multiply_two_cols(col("x"), col("x"))).show() + # +-----------------------+ + # |multiply_two_cols(x, x)| + # +-----------------------+ + # | 1| + # | 4| + # | 9| + # +-----------------------+ + + # When the UDF is called with a single column that is StructType, + # the input to the underlying function is an iterator of pd.DataFrame. + @pandas_udf("long", PandasUDFType.SCALAR_ITER) + def multiply_two_nested_cols(pdf_iter): + for pdf in pdf_iter: + yield pdf["a"] * pdf["b"] + + df.select( + multiply_two_nested_cols( + struct(col("x").alias("a"), col("x").alias("b")) + ).alias("y") + ).show() + # +---+ + # | y| + # +---+ + # | 1| + # | 4| + # | 9| + # +---+ + + # In the UDF, you can initialize some states before processing batches. + # Wrap your code with try/finally or use context managers to ensure + # the release of resources at the end. + y_bc = spark.sparkContext.broadcast(1) + + @pandas_udf("long", PandasUDFType.SCALAR_ITER) + def plus_y(batch_iter): + y = y_bc.value # initialize states + try: + for x in batch_iter: + yield x + y + finally: + pass # release resources here, if any + + df.select(plus_y(col("x"))).show() + # +---------+ + # |plus_y(x)| + # +---------+ + # | 2| + # | 3| + # | 4| + # +---------+ + # $example off:scalar_iter_pandas_udf$ + + def grouped_map_pandas_udf_example(spark): # $example on:grouped_map_pandas_udf$ from pyspark.sql.functions import pandas_udf, PandasUDFType --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org