Thank you Chris, this looks great.

Would you mind share a google doc version of the proposal? I believe that's
the preferred way of discussing proposals (Other people please correct me
if I am wrong).

Li

On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:

> Hi,
>
>  As promised I’ve raised SPARK-27463 for this.
>
> All feedback welcome!
>
> Chris
>
> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>
> Thanks Bryan and Li, that is much appreciated.  Hopefully should have the
> SPIP ready in the next couple of days.
>
> thanks,
>
> Chris
>
>
>
>
> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> wrote:
>
>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be too
>> difficult to extend the currently functionality to transfer multiple
>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>> think it's necessary to include details of the Python worker, we can hash
>> that out after the SPIP is approved.
>>
>> Bryan
>>
>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> wrote:
>>
>>> Thanks Chris, look forward to it.
>>>
>>> I think sending multiple dataframes to the python worker requires some
>>> changes but shouldn't be too difficult. We can probably sth like:
>>>
>>>
>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>
>>> In:
>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>
>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>
>>> Li
>>>
>>>
>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>
>>>> Hi,
>>>>
>>>> Just to say, I really do think this is useful and am currently working
>>>> on a SPIP to formally propose this. One concern I do have, however, is that
>>>> the current arrow serialization code is tied to passing through a single
>>>> dataframe as the udf parameter and so any modification to allow multiple
>>>> dataframes may not be straightforward.  If anyone has any ideas as to how
>>>> this might be achieved in an elegant manner I’d be happy to hear them!
>>>>
>>>> Thanks,
>>>>
>>>> Chris
>>>>
>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote:
>>>>
>>>> Thank you both for the reply. Chris and I have very similar use cases
>>>> for cogroup.
>>>>
>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>> UDF functionality.
>>>>
>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>
>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>
>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>> and would welcome a cogroup for datafame.
>>>>>
>>>>> My specific use case was that I had a large amount of time series
>>>>> data. Spark has very limited support for time series (specifically as-of
>>>>> joins), but pandas has good support.
>>>>>
>>>>> My solution was to take my two dataframes and perform a group by and
>>>>> collect list on each. The resulting arrays could be passed into a udf 
>>>>> where
>>>>> they could be marshaled into a couple of pandas dataframes and processed
>>>>> using pandas excellent time series functionality.
>>>>>
>>>>> If cogroup was available natively on dataframes this would have been a
>>>>> bit nicer. The ideal would have been some pandas udf version of cogroup
>>>>> that gave me a pandas dataframe for each spark dataframe in the cogroup!
>>>>>
>>>>> Chris
>>>>>
>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <jonathan.wina...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> For info, in our team have defined our own cogroup on dataframe in the
>>>>> past on different projects using different methods (rdd[row] based or 
>>>>> union
>>>>> all collect list based).
>>>>>
>>>>> I might be biased, but find the approach very useful in project to
>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>> stages (distinct + join => just cogroup).
>>>>>
>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>> That's a win!
>>>>>
>>>>>
>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote:
>>>>>
>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>
>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>
>>>>>>> Alessandro,
>>>>>>>
>>>>>>> Thanks for the reply. I assume by "equi-join", you mean "equality
>>>>>>> full outer join" .
>>>>>>>
>>>>>>> Two issues I see with equity outer join is:
>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>> (2) User needs to do some extra processing to transform n * m back
>>>>>>> to the desired shape (two sub dataframes with n and m rows)
>>>>>>>
>>>>>>> I think full outer join is an inefficient way to implement cogroup.
>>>>>>> If the end goal is to have two separate dataframes for each key, why
>>>>>>> joining them first and then unjoin them?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>> alessandro.solima...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> I fail to see how an equi-join on the key columns is different than
>>>>>>>> the cogroup you propose.
>>>>>>>>
>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>
>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>
>>>>>>>> Now you apply an udf on each iterable, one per key value (obtained
>>>>>>>> with cogroup).
>>>>>>>>
>>>>>>>> You can achieve the same by:
>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>> 2) apply "groupby" on such key
>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>>>> familiar with them
>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>> that will process each group "in isolation".
>>>>>>>>
>>>>>>>> HTH,
>>>>>>>> Alessandro
>>>>>>>>
>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and it
>>>>>>>>> has been very helpful in integrating Spark with our existing 
>>>>>>>>> pandas-heavy
>>>>>>>>> libraries.
>>>>>>>>>
>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to group 
>>>>>>>>> two
>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This feels 
>>>>>>>>> very
>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>
>>>>>>>>> It would be great to be able to do sth like this: (not actual API,
>>>>>>>>> just to explain the use case):
>>>>>>>>>
>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>> that is associated with a particular key
>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>      return result
>>>>>>>>>
>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>
>>>>>>>>> I have searched around the problem and some people have suggested
>>>>>>>>> to join the tables first. However, it's often not the same pattern 
>>>>>>>>> and hard
>>>>>>>>> to get it to work by using joins.
>>>>>>>>>
>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>
>>>>>>>>> Li
>>>>>>>>>
>>>>>>>>>

Reply via email to