Hi all, We were in the process of porting an RDD program to one which uses Datasets. Most things were easy to transition, but one hole in functionality we found was the ability to reduce a Dataset by key, something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding the functionality ourselves involved creating a KeyValueGroupedDataset and calling reduceGroups to get the reduced Dataset.
class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) { def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2: Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] = ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1, func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) } } Note that the functions passed into .reduceGroups takes in the key-value pair. It'd be nicer to pass in a function that maps just the values, i.e. reduceGroups(func). This would require the ability to modify the values of the KeyValueGroupedDataset (which is returned by the .groupByKey call on a Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V => U)) does not currently exist. The more important issue, however, is the inefficiency of .reduceGroups. The function does not support partial aggregation (reducing map-side), and as a result requires shuffling all the data in the Dataset. A more efficient alternative that that we explored involved creating a Dataset from the KeyValueGroupedDataset by creating an Aggregator and passing it as a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the Aggregator necessitated the creation of a zero to create a valid monoid. However, the zero is dependent on the reduce function. The zero for a function such as addition on Ints would be different from the zero for taking the minimum over Ints, for example. The Aggregator requires that we not break the rule of reduce(a, zero) == a. To do this we had to create an Aggregator with a buffer type that stores the value along with a null flag (using Scala's nice Option syntax yielded some mysterious errors that I haven't worked through yet, unfortunately), used by the zero element to signal that it should not participate in the reduce function. -Andy