Hi Chris,

Thanks! The permission to the google doc is maybe not set up properly. I
cannot view the doc by default.

Li

On Mon, Apr 15, 2019 at 3:58 PM Chris Martin <ch...@cmartinit.co.uk> wrote:

> I've updated the jira so that the main body is now inside a google doc.
> Anyone should be able to comment- if you want/need write access please drop
> me a mail and I can add you.
>
> Ryan- regarding your specific point regarding why I'm not proposing to add
> this to the Scala API, I think the main point is that Scala users can
> already use Cogroup for Datasets.  For Scala this is probably a better
> solution as (as far as I know) there is no Scala DataFrame library that
> could be used in place of Pandas for manipulating  local DataFrames. As a
> result you'd probably be left with dealing with Iterators of Row objects,
> which almost certainly isn't what you'd want. This is similar to the
> existing grouped map Pandas Udfs for which there is no equivalent Scala Api.
>
> I do think there might be a place for allowing a (Scala) DataSet Cogroup
> to take some sort of grouping expression as the grouping key  (this would
> mean that you wouldn't have to marshal the key into a JVM object and could
> possible lend itself to some catalyst optimisations) but I don't think that
> this should be done as part of this SPIP.
>
> thanks,
>
> Chris
>
> On Mon, Apr 15, 2019 at 6:27 PM Ryan Blue <rb...@netflix.com> wrote:
>
>> I agree, it would be great to have a document to comment on.
>>
>> The main thing that stands out right now is that this is only for PySpark
>> and states that it will not be added to the Scala API. Why not make this
>> available since most of the work would be done?
>>
>> On Mon, Apr 15, 2019 at 7:50 AM Li Jin <ice.xell...@gmail.com> wrote:
>>
>>> Thank you Chris, this looks great.
>>>
>>> Would you mind share a google doc version of the proposal? I believe
>>> that's the preferred way of discussing proposals (Other people please
>>> correct me if I am wrong).
>>>
>>> Li
>>>
>>> On Mon, Apr 15, 2019 at 8:20 AM <ch...@cmartinit.co.uk> wrote:
>>>
>>>> Hi,
>>>>
>>>>  As promised I’ve raised SPARK-27463 for this.
>>>>
>>>> All feedback welcome!
>>>>
>>>> Chris
>>>>
>>>> On 9 Apr 2019, at 13:22, Chris Martin <ch...@cmartinit.co.uk> wrote:
>>>>
>>>> Thanks Bryan and Li, that is much appreciated.  Hopefully should have
>>>> the SPIP ready in the next couple of days.
>>>>
>>>> thanks,
>>>>
>>>> Chris
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Apr 8, 2019 at 7:18 PM Bryan Cutler <cutl...@gmail.com> wrote:
>>>>
>>>>> Chirs, an SPIP sounds good to me. I agree with Li that it wouldn't be
>>>>> too difficult to extend the currently functionality to transfer multiple
>>>>> DataFrames.  For the SPIP, I would keep it more high-level and I don't
>>>>> think it's necessary to include details of the Python worker, we can hash
>>>>> that out after the SPIP is approved.
>>>>>
>>>>> Bryan
>>>>>
>>>>> On Mon, Apr 8, 2019 at 10:43 AM Li Jin <ice.xell...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Chris, look forward to it.
>>>>>>
>>>>>> I think sending multiple dataframes to the python worker requires
>>>>>> some changes but shouldn't be too difficult. We can probably sth like:
>>>>>>
>>>>>>
>>>>>> [numberOfDataFrames][FirstDataFrameInArrowFormat][SecondDataFrameInArrowFormat]
>>>>>>
>>>>>> In:
>>>>>> https://github.com/apache/spark/blob/86d469aeaa492c0642db09b27bb0879ead5d7166/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala#L70
>>>>>>
>>>>>> And have ArrowPythonRunner take multiple input iterator/schema.
>>>>>>
>>>>>> Li
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 8, 2019 at 5:55 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Just to say, I really do think this is useful and am currently
>>>>>>> working on a SPIP to formally propose this. One concern I do have, 
>>>>>>> however,
>>>>>>> is that the current arrow serialization code is tied to passing through 
>>>>>>> a
>>>>>>> single dataframe as the udf parameter and so any modification to allow
>>>>>>> multiple dataframes may not be straightforward.  If anyone has any 
>>>>>>> ideas as
>>>>>>> to how this might be achieved in an elegant manner I’d be happy to hear
>>>>>>> them!
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>> On 26 Feb 2019, at 14:55, Li Jin <ice.xell...@gmail.com> wrote:
>>>>>>>
>>>>>>> Thank you both for the reply. Chris and I have very similar use
>>>>>>> cases for cogroup.
>>>>>>>
>>>>>>> One of the goals for groupby apply + pandas UDF was to avoid things
>>>>>>> like collect list and reshaping data between Spark and Pandas. Cogroup
>>>>>>> feels very similar and can be an extension to the groupby apply + pandas
>>>>>>> UDF functionality.
>>>>>>>
>>>>>>> I wonder if any PMC/committers have any thoughts/opinions on this?
>>>>>>>
>>>>>>> On Tue, Feb 26, 2019 at 2:17 AM <ch...@cmartinit.co.uk> wrote:
>>>>>>>
>>>>>>>> Just to add to this I’ve also implemented my own cogroup previously
>>>>>>>> and would welcome a cogroup for datafame.
>>>>>>>>
>>>>>>>> My specific use case was that I had a large amount of time series
>>>>>>>> data. Spark has very limited support for time series (specifically 
>>>>>>>> as-of
>>>>>>>> joins), but pandas has good support.
>>>>>>>>
>>>>>>>> My solution was to take my two dataframes and perform a group by
>>>>>>>> and collect list on each. The resulting arrays could be passed into a 
>>>>>>>> udf
>>>>>>>> where they could be marshaled into a couple of pandas dataframes and
>>>>>>>> processed using pandas excellent time series functionality.
>>>>>>>>
>>>>>>>> If cogroup was available natively on dataframes this would have
>>>>>>>> been a bit nicer. The ideal would have been some pandas udf version of
>>>>>>>> cogroup that gave me a pandas dataframe for each spark dataframe in the
>>>>>>>> cogroup!
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On 26 Feb 2019, at 00:38, Jonathan Winandy <
>>>>>>>> jonathan.wina...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> For info, in our team have defined our own cogroup on dataframe in
>>>>>>>> the past on different projects using different methods (rdd[row] based 
>>>>>>>> or
>>>>>>>> union all collect list based).
>>>>>>>>
>>>>>>>> I might be biased, but find the approach very useful in project to
>>>>>>>> simplify and speed up transformations, and remove a lot of intermediate
>>>>>>>> stages (distinct + join => just cogroup).
>>>>>>>>
>>>>>>>> Plus spark 2.4 introduced a lot of new operator for nested data.
>>>>>>>> That's a win!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, 21 Feb 2019, 17:38 Li Jin, <ice.xell...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I am wondering do other people have opinion/use case on cogroup?
>>>>>>>>>
>>>>>>>>> On Wed, Feb 20, 2019 at 5:03 PM Li Jin <ice.xell...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Alessandro,
>>>>>>>>>>
>>>>>>>>>> Thanks for the reply. I assume by "equi-join", you mean
>>>>>>>>>> "equality  full outer join" .
>>>>>>>>>>
>>>>>>>>>> Two issues I see with equity outer join is:
>>>>>>>>>> (1) equity outer join will give n * m rows for each key (n and m
>>>>>>>>>> being the corresponding number of rows in df1 and df2 for each key)
>>>>>>>>>> (2) User needs to do some extra processing to transform n * m
>>>>>>>>>> back to the desired shape (two sub dataframes with n and m rows)
>>>>>>>>>>
>>>>>>>>>> I think full outer join is an inefficient way to implement
>>>>>>>>>> cogroup. If the end goal is to have two separate dataframes for each 
>>>>>>>>>> key,
>>>>>>>>>> why joining them first and then unjoin them?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 20, 2019 at 5:52 AM Alessandro Solimando <
>>>>>>>>>> alessandro.solima...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello,
>>>>>>>>>>> I fail to see how an equi-join on the key columns is different
>>>>>>>>>>> than the cogroup you propose.
>>>>>>>>>>>
>>>>>>>>>>> I think the accepted answer can shed some light:
>>>>>>>>>>>
>>>>>>>>>>> https://stackoverflow.com/questions/43960583/whats-the-difference-between-join-and-cogroup-in-apache-spark
>>>>>>>>>>>
>>>>>>>>>>> Now you apply an udf on each iterable, one per key value
>>>>>>>>>>> (obtained with cogroup).
>>>>>>>>>>>
>>>>>>>>>>> You can achieve the same by:
>>>>>>>>>>> 1) join df1 and df2 on the key you want,
>>>>>>>>>>> 2) apply "groupby" on such key
>>>>>>>>>>> 3) finally apply a udaf (you can have a look here if you are not
>>>>>>>>>>> familiar with them
>>>>>>>>>>> https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html),
>>>>>>>>>>> that will process each group "in isolation".
>>>>>>>>>>>
>>>>>>>>>>> HTH,
>>>>>>>>>>> Alessandro
>>>>>>>>>>>
>>>>>>>>>>> On Tue, 19 Feb 2019 at 23:30, Li Jin <ice.xell...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> We have been using Pyspark's groupby().apply() quite a bit and
>>>>>>>>>>>> it has been very helpful in integrating Spark with our existing
>>>>>>>>>>>> pandas-heavy libraries.
>>>>>>>>>>>>
>>>>>>>>>>>> Recently, we have found more and more cases where
>>>>>>>>>>>> groupby().apply() is not sufficient - In some cases, we want to 
>>>>>>>>>>>> group two
>>>>>>>>>>>> dataframes by the same key, and apply a function which takes two
>>>>>>>>>>>> pd.DataFrame (also returns a pd.DataFrame) for each key. This 
>>>>>>>>>>>> feels very
>>>>>>>>>>>> much like the "cogroup" operation in the RDD API.
>>>>>>>>>>>>
>>>>>>>>>>>> It would be great to be able to do sth like this: (not actual
>>>>>>>>>>>> API, just to explain the use case):
>>>>>>>>>>>>
>>>>>>>>>>>> @pandas_udf(return_schema, ...)
>>>>>>>>>>>> def my_udf(pdf1, pdf2)
>>>>>>>>>>>>      # pdf1 and pdf2 are the subset of the original dataframes
>>>>>>>>>>>> that is associated with a particular key
>>>>>>>>>>>>      result = ... # some code that uses pdf1 and pdf2
>>>>>>>>>>>>      return result
>>>>>>>>>>>>
>>>>>>>>>>>> df3  = cogroup(df1, df2, key='some_key').apply(my_udf)
>>>>>>>>>>>>
>>>>>>>>>>>> I have searched around the problem and some people have
>>>>>>>>>>>> suggested to join the tables first. However, it's often not the 
>>>>>>>>>>>> same
>>>>>>>>>>>> pattern and hard to get it to work by using joins.
>>>>>>>>>>>>
>>>>>>>>>>>> I wonder what are people's thought on this?
>>>>>>>>>>>>
>>>>>>>>>>>> Li
>>>>>>>>>>>>
>>>>>>>>>>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

Reply via email to