Li Jin created SPARK-23011:
------------------------------

             Summary: Prepend missing grouping columns in groupby apply
                 Key: SPARK-23011
                 URL: https://issues.apache.org/jira/browse/SPARK-23011
             Project: Spark
          Issue Type: Sub-task
          Components: PySpark
    Affects Versions: 2.2.0
            Reporter: Li Jin


The current semantics of groupby apply is that the output schema of groupby 
apply is the same as the output schema of the UDF. Because grouping column is 
usually useful to users, users often need to output grouping columns in the 
UDF. To further explain, consider the following example:

{code:java}
import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], 
columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)
{code}

Although the UDF (linear regression) has nothing to do with the grouping 
column, the user needs to deal with grouping column in the UDF. In other words, 
the UDF is tightly coupled with the grouping column.

Here I propose we prepend grouping columns that are not returned by the UDF to 
the result of groupby apply. With this change, users can write UDFs that are 
decoupled from the grouping column:

{code:java}
import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(*x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[model.params[i] for i in   x_columns]], 
columns=x_columns)

beta = df.groupby(group_column).apply(ols)
{code}





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to