Ah sorry- I've updated the link which should give you access. Can you try again now?
thanks, Chris On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ice.xell...@gmail.com> wrote: > Hi Chris, > > Thanks! The permission to the google doc is maybe not set up properly. I > cannot view the doc by default. > > Li > > On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk> > wrote: > >> I've updated the jira so that the main body is now inside a google doc. >> Anyone should be able to comment- if you want/need write access please drop >> me a mail and I can add you. >> >> Ryan- regarding your specific point regarding why I'm not proposing to >> add this to the Scala API, I think the main point is that Scala users can >> already use Cogroup for Datasets. For Scala this is probably a better >> solution as (as far as I know) there is no Scala DataFrame library that >> could be used in place of Pandas for manipulating local DataFrames. As a >> result you'd probably be left with dealing with Iterators of Row objects, >> which almost certainly isn't what you'd want. This is similar to the >> existing grouped map Pandas Udfs for which there is no equivalent Scala Api. >> >> I do think there might be a place for allowing a (Scala) DataSet Cogroup >> to take some sort of grouping expression as the grouping key (this would >> mean that you wouldn't have to marshal the key into a JVM object and could >> possible lend itself to some catalyst optimisations) but I don't think that >> this should be done as part of this SPIP. >> >> thanks, >> >> Chris >> >> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote: >> >>> I agree, it would be great to have a document to comment on. >>> >>> The main thing that stands out right now is that this is only for >>> PySpark and states that it will not be added to the Scala API. Why not make >>> this available since most of the work would be done? >>> >>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xell...@gmail.com> wrote: >>> >>>> Thank you Chris, this looks great. >>>> >>>> Would you mind share a google doc version of the proposal? I believe >>>> that's the preferred way of discussing proposals (Other people please >>>> correct me if I am wrong). >>>> >>>> Li >>>> >>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote: >>>> >>>>> Hi, >>>>> >>>>> As promised I’ve raised SPARK-27463 for this. >>>>> >>>>> All feedback welcome! >>>>> >>>>> Chris >>>>> >>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote: >>>>> >>>>> Thanks Bryan and Li, that is much appreciated. Hopefully should have >>>>> the SPIP ready in the next couple of days. >>>>> >>>>> thanks, >>>>> >>>>> Chris >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> wrote: >>>>> >>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be >>>>>> too difficult to extend the currently functionality to transfer multiple >>>>>> DataFrames. For the SPIP, I would keep it more high-level and I don't >>>>>> think it's necessary to include details of the Python worker, we can hash >>>>>> that out after the SPIP is approved. >>>>>> >>>>>> Bryan >>>>>> >>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> wrote: >>>>>> >>>>>>> Thanks Chris, look forward to it. >>>>>>> >>>>>>> I think sending multiple dataframes to the python worker requires >>>>>>> some changes but shouldn't be too difficult. We can probably sth like: >>>>>>> >>>>>>> >>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat] >>>>>>> >>>>>>> In: >>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70 >>>>>>> >>>>>>> And have ArrowPythonRunner take multiple input iterator/schema. >>>>>>> >>>>>>> Li >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Just to say, I really do think this is useful and am currently >>>>>>>> working on a SPIP to formally propose this. One concern I do have, >>>>>>>> however, >>>>>>>> is that the current arrow serialization code is tied to passing >>>>>>>> through a >>>>>>>> single dataframe as the udf parameter and so any modification to allow >>>>>>>> multiple dataframes may not be straightforward. If anyone has any >>>>>>>> ideas as >>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear >>>>>>>> them! >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Chris >>>>>>>> >>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote: >>>>>>>> >>>>>>>> Thank you both for the reply. Chris and I have very similar use >>>>>>>> cases for cogroup. >>>>>>>> >>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid things >>>>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup >>>>>>>> feels very similar and can be an extension to the groupby apply + >>>>>>>> pandas >>>>>>>> UDF functionality. >>>>>>>> >>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this? >>>>>>>> >>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote: >>>>>>>> >>>>>>>>> Just to add to this I’ve also implemented my own cogroup >>>>>>>>> previously and would welcome a cogroup for datafame. >>>>>>>>> >>>>>>>>> My specific use case was that I had a large amount of time series >>>>>>>>> data. Spark has very limited support for time series (specifically >>>>>>>>> as-of >>>>>>>>> joins), but pandas has good support. >>>>>>>>> >>>>>>>>> My solution was to take my two dataframes and perform a group by >>>>>>>>> and collect list on each. The resulting arrays could be passed into a >>>>>>>>> udf >>>>>>>>> where they could be marshaled into a couple of pandas dataframes and >>>>>>>>> processed using pandas excellent time series functionality. >>>>>>>>> >>>>>>>>> If cogroup was available natively on dataframes this would have >>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of >>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in >>>>>>>>> the >>>>>>>>> cogroup! >>>>>>>>> >>>>>>>>> Chris >>>>>>>>> >>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy < >>>>>>>>> jonathan.wina...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> For info, in our team have defined our own cogroup on dataframe in >>>>>>>>> the past on different projects using different methods (rdd[row] >>>>>>>>> based or >>>>>>>>> union all collect list based). >>>>>>>>> >>>>>>>>> I might be biased, but find the approach very useful in project to >>>>>>>>> simplify and speed up transformations, and remove a lot of >>>>>>>>> intermediate >>>>>>>>> stages (distinct + join => just cogroup). >>>>>>>>> >>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data. >>>>>>>>> That's a win! >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I am wondering do other people have opinion/use case on cogroup? >>>>>>>>>> >>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Alessandro, >>>>>>>>>>> >>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean >>>>>>>>>>> "equality full outer join" . >>>>>>>>>>> >>>>>>>>>>> Two issues I see with equity outer join is: >>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m >>>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key) >>>>>>>>>>> (2) User needs to do some extra processing to transform n * m >>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows) >>>>>>>>>>> >>>>>>>>>>> I think full outer join is an inefficient way to implement >>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for >>>>>>>>>>> each key, >>>>>>>>>>> why joining them first and then unjoin them? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando < >>>>>>>>>>> alessandro.solima...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello, >>>>>>>>>>>> I fail to see how an equi-join on the key columns is different >>>>>>>>>>>> than the cogroup you propose. >>>>>>>>>>>> >>>>>>>>>>>> I think the accepted answer can shed some light: >>>>>>>>>>>> >>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark >>>>>>>>>>>> >>>>>>>>>>>> Now you apply an udf on each iterable, one per key value >>>>>>>>>>>> (obtained with cogroup). >>>>>>>>>>>> >>>>>>>>>>>> You can achieve the same by: >>>>>>>>>>>> 1) join df1 and df2 on the key you want, >>>>>>>>>>>> 2) apply "groupby" on such key >>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are >>>>>>>>>>>> not familiar with them >>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html), >>>>>>>>>>>> that will process each group "in isolation". >>>>>>>>>>>> >>>>>>>>>>>> HTH, >>>>>>>>>>>> Alessandro >>>>>>>>>>>> >>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and >>>>>>>>>>>>> it has been very helpful in integrating Spark with our existing >>>>>>>>>>>>> pandas-heavy libraries. >>>>>>>>>>>>> >>>>>>>>>>>>> Recently, we have found more and more cases where >>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to >>>>>>>>>>>>> group two >>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two >>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This >>>>>>>>>>>>> feels very >>>>>>>>>>>>> much like the "cogroup" operation in the RDD API. >>>>>>>>>>>>> >>>>>>>>>>>>> It would be great to be able to do sth like this: (not actual >>>>>>>>>>>>> API, just to explain the use case): >>>>>>>>>>>>> >>>>>>>>>>>>> @pandas_udf(return_schema, ...) >>>>>>>>>>>>> def my_udf(pdf1, pdf2) >>>>>>>>>>>>> # pdf1 and pdf2 are the subset of the original dataframes >>>>>>>>>>>>> that is associated with a particular key >>>>>>>>>>>>> result = ... # some code that uses pdf1 and pdf2 >>>>>>>>>>>>> return result >>>>>>>>>>>>> >>>>>>>>>>>>> df3 = cogroup(df1, df2, key='some_key').apply(my_udf) >>>>>>>>>>>>> >>>>>>>>>>>>> I have searched around the problem and some people have >>>>>>>>>>>>> suggested to join the tables first. However, it's often not the >>>>>>>>>>>>> same >>>>>>>>>>>>> pattern and hard to get it to work by using joins. >>>>>>>>>>>>> >>>>>>>>>>>>> I wonder what are people's thought on this? >>>>>>>>>>>>> >>>>>>>>>>>>> Li >>>>>>>>>>>>> >>>>>>>>>>>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >>