[ https://issues.apache.org/jira/browse/SPARK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16329177#comment-16329177 ]
Apache Spark commented on SPARK-23011: -------------------------------------- User 'icexelloss' has created a pull request for this issue: https://github.com/apache/spark/pull/20295 > Support alternative function form with group aggregate pandas UDF > ----------------------------------------------------------------- > > Key: SPARK-23011 > URL: https://issues.apache.org/jira/browse/SPARK-23011 > Project: Spark > Issue Type: Sub-task > Components: PySpark > Affects Versions: 2.3.0 > Reporter: Li Jin > Priority: Major > > The current semantics of groupby apply is that the output schema of groupby > apply is the same as the output schema of the UDF. Because grouping column is > usually useful to users, users often need to output grouping columns in the > UDF. To further explain, consider the following example: > {code:java} > import statsmodels.api as sm > # df has four columns: id, y, x1, x2 > group_column = 'id' > y_column = 'y' > x_columns = ['x1', 'x2'] > schema = df.select(group_column, *x_columns).schema > @pandas_udf(schema, PandasUDFType.GROUP_MAP) > # Input/output are both a pandas.DataFrame > def ols(pdf): > group_key = pdf[group_column].iloc[0] > y = pdf[y_column] > X = pdf[x_columns] > X = sm.add_constant(X) > model = sm.OLS(y, X).fit() > return pd.DataFrame([[group_key] + [model.params[i] for i in > x_columns]], columns=[group_column] + x_columns) > beta = df.groupby(group_column).apply(ols) > {code} > Although the UDF (linear regression) has nothing to do with the grouping > column, the user needs to deal with grouping column in the UDF. In other > words, the UDF is tightly coupled with the grouping column. > > With discussion in > [https://github.com/apache/spark/pull/20211#discussion_r160524679,] we > reached consensus for supporting an alternative function form: > {code:java} > def foo(key, pdf): > key # this is a grouping key. > pdf # this is the Pandas DataFrame > pudf = pandas_udf(f=foo, returnType="id int, v double", > functionType=GROUP_MAP) > df.groupby(group_column).apply(pudf){code} > {code:java} > import statsmodels.api as sm > # df has four columns: id, y, x1, x2 > group_column = 'id' > y_column = 'y' > x_columns = ['x1', 'x2'] > schema = df.select(group_column, *x_columns).schema > @pandas_udf(schema, PandasUDFType.GROUP_MAP) > # Input/output are both a pandas.DataFrame > def ols(key, pdf): > y = pdf[y_column] > X = pdf[x_columns] > X = sm.add_constant(X) > model = sm.OLS(y, X).fit() > return pd.DataFrame([key + [model.params[i] for i in x_columns]]) > beta = df.groupby(group_column).apply(ols) > {code} > > In summary: > * Support alternative form f(key, pdf). The current form f(pdf) will still > be supported. (Through function inspection) > * In both cases, the udf output schema will be the final output schema of > the spark DataFrame. > * Key will be passed to user as a python tuple. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org