Ah sorry- I've updated the link which should give you access.  Can you try
again now?

thanks,

Chris



On Mon, Apr 15, 2019 at 9:49 PM Li Jin <ice.xell...@gmail.com> wrote:

> Hi Chris,
>
> Thanks! The permission to the google doc is maybe not set up properly. I
> cannot view the doc by default.
>
> Li
>
> On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk>
> wrote:
>
>> I've updated the jira so that the main body is now inside a google doc.
>> Anyone should be able to comment- if you want/need write access please drop
>> me a mail and I can add you.
>>
>> Ryan- regarding your specific point regarding why I'm not proposing to
>> add this to the Scala API, I think the main point is that Scala users can
>> already use Cogroup for Datasets.  For Scala this is probably a better
>> solution as (as far as I know) there is no Scala DataFrame library that
>> could be used in place of Pandas for manipulating  local DataFrames. As a
>> result you'd probably be left with dealing with Iterators of Row objects,
>> which almost certainly isn't what you'd want. This is similar to the
>> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>>
>> I do think there might be a place for allowing a (Scala) DataSet Cogroup
>> to take some sort of grouping expression as the grouping key  (this would
>> mean that you wouldn't have to marshal the key into a JVM object and could
>> possible lend itself to some catalyst optimisations) but I don't think that
>> this should be done as part of this SPIP.
>>
>> thanks,
>>
>> Chris
>>
>> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>>
>>> I agree, it would be great to have a document to comment on.
>>>
>>> The main thing that stands out right now is that this is only for
>>> PySpark and states that it will not be added to the Scala API. Why not make
>>> this available since most of the work would be done?
>>>
>>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xell...@gmail.com> wrote:
>>>
>>>> Thank you Chris, this looks great.
>>>>
>>>> Would you mind share a google doc version of the proposal? I believe
>>>> that's the preferred way of discussing proposals (Other people please
>>>> correct me if I am wrong).
>>>>
>>>> Li
>>>>
>>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>>
>>>>> All feedback welcome!
>>>>>
>>>>> Chris
>>>>>
>>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>>>>
>>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>>>> the SPIP ready in the next couple of days.
>>>>>
>>>>> thanks,
>>>>>
>>>>> Chris
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> wrote:
>>>>>
>>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>>>>> too difficult to extend the currently functionality to transfer multiple
>>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>>> that out after the SPIP is approved.
>>>>>>
>>>>>> Bryan
>>>>>>
>>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Chris, look forward to it.
>>>>>>>
>>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>>
>>>>>>>
>>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>>
>>>>>>> In:
>>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>>
>>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>>
>>>>>>> Li
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>>> working on a SPIP to formally propose this. One concern I do have, 
>>>>>>>> however,
>>>>>>>> is that the current arrow serialization code is tied to passing 
>>>>>>>> through a
>>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>>>> multiple dataframes may not be straightforward.  If anyone has any 
>>>>>>>> ideas as
>>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>>>> them!
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>>> cases for cogroup.
>>>>>>>>
>>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>>>>>> feels very similar and can be an extension to the groupby apply + 
>>>>>>>> pandas
>>>>>>>> UDF functionality.
>>>>>>>>
>>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>>>>
>>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>>
>>>>>>>>> Just to add to this I’ve also implemented my own cogroup
>>>>>>>>> previously and would welcome a cogroup for datafame.
>>>>>>>>>
>>>>>>>>> My specific use case was that I had a large amount of time series
>>>>>>>>> data. Spark has very limited support for time series (specifically 
>>>>>>>>> as-of
>>>>>>>>> joins), but pandas has good support.
>>>>>>>>>
>>>>>>>>> My solution was to take my two dataframes and perform a group by
>>>>>>>>> and collect list on each. The resulting arrays could be passed into a 
>>>>>>>>> udf
>>>>>>>>> where they could be marshaled into a couple of pandas dataframes and
>>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>>
>>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of
>>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in 
>>>>>>>>> the
>>>>>>>>> cogroup!
>>>>>>>>>
>>>>>>>>> Chris
>>>>>>>>>
>>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>>> jonathan.wina...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> For info, in our team have defined our own cogroup on dataframe in
>>>>>>>>> the past on different projects using different methods (rdd[row] 
>>>>>>>>> based or
>>>>>>>>> union all collect list based).
>>>>>>>>>
>>>>>>>>> I might be biased, but find the approach very useful in project to
>>>>>>>>> simplify and speed up transformations, and remove a lot of 
>>>>>>>>> intermediate
>>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>>
>>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>>>>> That's a win!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Alessandro,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>>
>>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>>
>>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for 
>>>>>>>>>>> each key,
>>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>>> alessandro.solima...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>> I fail to see how an equi-join on the key columns is different
>>>>>>>>>>>> than the cogroup you propose.
>>>>>>>>>>>>
>>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>>
>>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>>
>>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>>
>>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are
>>>>>>>>>>>> not familiar with them
>>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>>
>>>>>>>>>>>> HTH,
>>>>>>>>>>>> Alessandro
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and
>>>>>>>>>>>>> it has been very helpful in integrating Spark with our existing
>>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to 
>>>>>>>>>>>>> group two
>>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This 
>>>>>>>>>>>>> feels very
>>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It would be great to be able to do sth like this: (not actual
>>>>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>>>>
>>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>>      return result
>>>>>>>>>>>>>
>>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>>> suggested to join the tables first. However, it's often not the 
>>>>>>>>>>>>> same
>>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Li
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>

Reply via email to