Paul Jones created SPARK-30477:
----------------------------------

             Summary: More KeyValueGroupedDataset methods should be composable
                 Key: SPARK-30477
                 URL: https://issues.apache.org/jira/browse/SPARK-30477
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.4
            Reporter: Paul Jones


Right now many `KeyValueGroupedDataset` do not return a 
`KeyValueGroupedDataset`. In some cases this means we have to do multiple 
`groupByKey`s into order to express certain patterns.


Setup

{code:scala}
def f: T => K
def g: U => K
def h: V => K
val ds1: Dataset[T] = ???
val ds2: Dataset[U] = ???
val ds3: Dataset[V] = ??? 
val kvDs1: KeyValueGroupedDataset[K, T] = ds1.groupByKey(f)
val kvDs2: KeyValueGroupedDataset[K, U] = ds2.groupByKey(g)
val kvDs3: KeyValueGroupedDataset[K, V] = ds3.groupByKey(h)
{code}

Example one: Combining multiple CoGrouped Dataset. 

{code:scala}
// Current
kvDs1
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
  .groupByKey((x: X) => ???: K)
  .coGroup(kvDs3)(k: K, it1: Iterator[X], it2: Iterator[Y]) => ???: Z)

// Wanted
trait KeyValueGroupedDataset[K, T] {
  def coGroupKeyValueGroupedDataset[U, X](r: KeyValueGroupedDataset)(K, 
Iterator[T], Iterator[U] => X): KeyValueGroupedDataset[K, X]
}

kvDs1
  .coGroupKeyValueGroupedDataset(kvDs2)(k: K, it1: Iterator[T], it2: 
Iterator[U]) => ???: X)
  .coGroupKeyValueGroupedDataset(kvDs3)(k: K, it1: Iterator[X], it2: 
Iterator[Y]) => ???: Z)
{code}

Example two: Combining a reduceGroups with a coGroup 
{code:scala}
// current
val newDs1: Dataset[X] = kvDs1
  .reduceGroups((l: T, r: T) => ???: T))
  .groupByKey {case (k, _) => k }.mapValues { case (_, v) => v }
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)

// wanted
trait KeyValueGroupedDataset[K, T] {
  def reduceGroupsKeyValueGroupedDataset(v: (V, V) => V): 
KeyValueGroupedDataset[K, V]
}

val newDs2: Dataset[X] = kvDs1
  .reduceGroupsKeyValueGroupedDataset((l: T, r: T) => ???: T))
  .coGroup(kvDs2)(k: K, it1: Iterator[T], it2: Iterator[U]) => ???: X)
{code}

In both cases not only are the ergonomics better, Spark will better able to 
optimize the code. 

For almost every method of `KeyValueGroupedDataset` we should have a matching 
method that returns a `KeyValueGroupedDataset`. 

We can also add a `.toDs` method which converts a `KeyValueGroupedDataset[K, 
V]` to a `Dataset[(K, V)]`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to