[ 
https://issues.apache.org/jira/browse/SPARK-21190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105095#comment-16105095
 ] 

Li Jin commented on SPARK-21190:
--------------------------------

I think the use case 2 of what [~rxin] proposed originally is a good API to 
enable first. I think it can a bit better if the input of the user function is 
not a {{pandas.DataFrame}} but {{pandas.Series}} to match Spark columns. i.e., 
instead of:

{code}
@spark_udf(some way to describe the return schema)
def my_func(input):
  """ Some user-defined function.
 
  :param input: A Pandas DataFrame with two columns, a and b.
  :return: :class: A numpy array
  """
  return input[a] + input[b]
 
df = spark.range(1000).selectExpr("id a", "id / 2 b")
df.withColumn("c", my_func(df.a, df.b))
{code}

I think this is better:
{code}
@spark_udf(some way to describe the return schema)
def my_func(a, b):
  """ Some user-defined function.
 
  :param input: Two Pandas Series, a and b
  :return: :class: A Pandas Series
  """
  return a + b
 
df = spark.range(1000).selectExpr("id a", "id / 2 b")
df.withColumn("c", my_func(df.a, df.b))
{code}

> SPIP: Vectorized UDFs in Python
> -------------------------------
>
>                 Key: SPARK-21190
>                 URL: https://issues.apache.org/jira/browse/SPARK-21190
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark, SQL
>    Affects Versions: 2.2.0
>            Reporter: Reynold Xin
>            Assignee: Reynold Xin
>              Labels: SPIP
>         Attachments: SPIPVectorizedUDFsforPython (1).pdf
>
>
> *Background and Motivation*
> Python is one of the most popular programming languages among Spark users. 
> Spark currently exposes a row-at-a-time interface for defining and executing 
> user-defined functions (UDFs). This introduces high overhead in serialization 
> and deserialization, and also makes it difficult to leverage Python libraries 
> (e.g. numpy, Pandas) that are written in native code.
>  
> This proposal advocates introducing new APIs to support vectorized UDFs in 
> Python, in which a block of data is transferred over to Python in some 
> columnar format for execution.
>  
>  
> *Target Personas*
> Data scientists, data engineers, library developers.
>  
> *Goals*
> - Support vectorized UDFs that apply on chunks of the data frame
> - Low system overhead: Substantially reduce serialization and deserialization 
> overhead when compared with row-at-a-time interface
> - UDF performance: Enable users to leverage native libraries in Python (e.g. 
> numpy, Pandas) for data manipulation in these UDFs
>  
> *Non-Goals*
> The following are explicitly out of scope for the current SPIP, and should be 
> done in future SPIPs. Nonetheless, it would be good to consider these future 
> use cases during API design, so we can achieve some consistency when rolling 
> out new APIs.
>  
> - Define block oriented UDFs in other languages (that are not Python).
> - Define aggregate UDFs
> - Tight integration with machine learning frameworks
>  
> *Proposed API Changes*
> The following sketches some possibilities. I haven’t spent a lot of time 
> thinking about the API (wrote it down in 5 mins) and I am not attached to 
> this design at all. The main purpose of the SPIP is to get feedback on use 
> cases and see how they can impact API design.
>  
> A few things to consider are:
>  
> 1. Python is dynamically typed, whereas DataFrames/SQL requires static, 
> analysis time typing. This means users would need to specify the return type 
> of their UDFs.
>  
> 2. Ratio of input rows to output rows. We propose initially we require number 
> of output rows to be the same as the number of input rows. In the future, we 
> can consider relaxing this constraint with support for vectorized aggregate 
> UDFs.
> 3. How do we handle null values, since Pandas doesn't have the concept of 
> nulls?
>  
> Proposed API sketch (using examples):
>  
> Use case 1. A function that defines all the columns of a DataFrame (similar 
> to a “map” function):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_on_entire_df(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A Pandas data frame.
>   """
>   input[c] = input[a] + input[b]
>   Input[d] = input[a] - input[b]
>   return input
>  
> spark.range(1000).selectExpr("id a", "id / 2 b")
>   .mapBatches(my_func_on_entire_df)
> {code}
>  
> Use case 2. A function that defines only one column (similar to existing 
> UDFs):
>  
> {code}
> @spark_udf(some way to describe the return schema)
> def my_func_that_returns_one_column(input):
>   """ Some user-defined function.
>  
>   :param input: A Pandas DataFrame with two columns, a and b.
>   :return: :class: A numpy array
>   """
>   return input[a] + input[b]
>  
> my_func = udf(my_func_that_returns_one_column)
>  
> df = spark.range(1000).selectExpr("id a", "id / 2 b")
> df.withColumn("c", my_func(df.a, df.b))
> {code}
>  
>  
>  
> *Optional Design Sketch*
> I’m more concerned about getting proper feedback for API design. The 
> implementation should be pretty straightforward and is not a huge concern at 
> this point. We can leverage the same implementation for faster toPandas 
> (using Arrow).
>  
>  
> *Optional Rejected Designs*
> See above.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to