[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862828#comment-16862828
 ] 

Chris Martin commented on SPARK-27463:
--------------------------------------

Hi [~hyukjin.kwon]

Ah I see your concern now.  It think it’s fair to say that the cogrouping 
functionality proposed has no analogous API in Pandas.  In my opinion that’s 
understandable as Pandas is fundamentally a library for manipulating local data 
so the problems of colocating multiple DatafFrames don’t apply as they do in 
Spark.  That said, the inspiration behind the proposed API is clearly that of 
the Pandas groupby().apply() so I’d argue it is not without precedent.

I think the more direct comparison here is with the existing Dataset cogroup, 
where high level functionality is almost exactly the same (partition two 
distinct DatafFrames such that partitions are cogroup and apply a flatmap 
operation over them) with the differences being in the cogroup key definition 
(typed for datasets, untyped for pandas-udf), Input (Iterables for Datasets, 
Pandas DataFrames for pandas-udf) and Output (Iterable for Datasets, pandas 
Dataframe for pandas-udf). Now at this point one might observe that we have two 
different language-specific implementations of the same high level 
functionality.  This is true, however it’s been the case since the introduction 
of Pandas Udfs (see groupBy().apply() vs groupByKey().flatmapgroups()) and is 
imho a good thing; it allows us to provide functionality that plays to the 
strength of each individual language given that what is simple and idiomatic in 
Python is not in Scala and vice versa.

If, considering this, we agree that this cogroup functionality both useful and 
suitable as exposing via a Pandas UDF (and I hope we do, but please say if you 
disagree), the question now comes as to what we would like the api to be. At 
this point let’s consider the API as currently proposed in the design doc.

 
{code:java}
result = df1.cogroup(df2, on='id').apply(my_pandas_udf)
{code}

This API is concise and consistent with existing groupby.apply().  The 
disadvantage is that it isn’t consistent with Dataset’s cogroup and, as this 
API doesn’t exist in Pandas, it can’t be consistent with that (although I would 
argue that if Pandas did introduce such an API it would look a lot like this).

The alternative would be to implement something on RelationalGroupedData as 
described by Li in the post above (I think we can discount something based on 
KeyValueGroupedDataset as if my reading of the code is correct this would only 
apply for typed APIs which this isn’t).  The big advantage here is that this is 
much more consistent with the existing Dataset cogroup.  On the flip side it 
comes at the cost of a little more verbosity and IMHO is a little less 
pythonic/in the style of Pandas.  That being the case, I’m slightly in favour 
of the the API as currently proposed in the design doc, but am happy to be 
swayed to something else if the majority have a different opinion.

> Support Dataframe Cogroup via Pandas UDFs 
> ------------------------------------------
>
>                 Key: SPARK-27463
>                 URL: https://issues.apache.org/jira/browse/SPARK-27463
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark, SQL
>    Affects Versions: 3.0.0
>            Reporter: Chris Martin
>            Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to