[
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan resolved SPARK-21190.
---------------------------------
Resolution: Fixed
Fix Version/s: 2.3.0
Issue resolved by pull request 18659
[https://github.com/apache/spark/pull/18659]
> SPIP: Vectorized UDFs in Python
> -------------------------------
>
> Key: SPARK-21190
> URL: https://issues.apache.org/jira/browse/SPARK-21190
> Project: Spark
> Issue Type: New Feature
> Components: PySpark, SQL
> Affects Versions: 2.2.0
> Reporter: Reynold Xin
> Assignee: Reynold Xin
> Labels: SPIP
> Fix For: 2.3.0
>
> Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users.
> Spark currently exposes a row-at-a-time interface for defining and executing
> user-defined functions (UDFs). This introduces high overhead in serialization
> and deserialization, and also makes it difficult to leverage Python libraries
> (e.g. numpy, Pandas) that are written in native code.
>
> This proposal advocates introducing new APIs to support vectorized UDFs in
> Python, in which a block of data is transferred over to Python in some
> columnar format for execution.
>
>
> *Target Personas*
> Data scientists, data engineers, library developers.
>
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g.
> numpy, Pandas) for data manipulation in these UDFs
>
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be
> done in future SPIPs. Nonetheless, it would be good to consider these future
> use cases during API design, so we can achieve some consistency when rolling
> out new APIs.
>
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time
> thinking about the API (wrote it down in 5 mins) and I am not attached to
> this design at all. The main purpose of the SPIP is to get feedback on use
> cases and see how they can impact API design.
>
> A few things to consider are:
>
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static,
> analysis time typing. This means users would need to specify the return type
> of their UDFs.
>
> 2. Ratio of input rows to output rows. We propose initially we require number
> of output rows to be the same as the number of input rows. In the future, we
> can consider relaxing this constraint with support for vectorized aggregate
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of
> nulls?
>
> Proposed API sketch (using examples):
>
> Use case 1. A function that defines all the columns of a DataFrame (similar
> to a “map” function):
>
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
> """ Some user-defined function.
>
> :param input: A Pandas DataFrame with two columns, a and b.
> :return: :class: A Pandas data frame.
> """
> input[c] = input[a] + input[b]
> Input[d] = input[a] - input[b]
> return input
>
> spark.range(1000).selectExpr("id a", "id / 2 b")
> .mapBatches(my_func_on_entire_df)
> {code}
>
> Use case 2. A function that defines only one column (similar to existing
> UDFs):
>
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
> """ Some user-defined function.
>
> :param input: A Pandas DataFrame with two columns, a and b.
> :return: :class: A numpy array
> """
> return input[a] + input[b]
>
> my_func = udf(my_func_that_returns_one_column)
>
> df = spark.range(1000).selectExpr("id a", "id / 2 b")
> df.withColumn("c", my_func(df.a, df.b))
> {code}
>
>
>
> *Optional Design Sketch*
> I’m more concerned about getting proper feedback for API design. The
> implementation should be pretty straightforward and is not a huge concern at
> this point. We can leverage the same implementation for faster toPandas
> (using Arrow).
>
>
> *Optional Rejected Designs*
> See above.
>
>
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]