In 1.5, we will most likely just rewrite distinct in SQL to either use the Aggregate operator which will benefit from all the Tungsten optimizations, or have a Tungsten version of distinct for SQL/DataFrame.
On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot < o.girar...@lateral-thoughts.com> wrote: > Hi everyone, > there seems to be different implementations of the "distinct" feature in > DataFrames and RDD and some performance issue with the DataFrame distinct > API. > > In RDD.scala : > > def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] > = > withScope { map(x => (x, null)).reduceByKey((x, y) => x, > numPartitions).map(_._1) } > And in DataFrame : > > > case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode { > override def output: Seq[Attribute] = child.output override def > requiredChildDistribution: Seq[Distribution] = if (partial) > UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: > Nil *override def execute(): RDD[Row] = {** child.execute().mapPartitions { > iter =>** val hashSet = new scala.collection.mutable.HashSet[Row]()* * var > currentRow: Row = null** while (iter.hasNext) {** currentRow = > iter.next()** > if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())** }** > }* * hashSet.iterator** }** }*} > > > > > I can try to reproduce more clearly the performance issue, but do you have > any insights into why we can't have the same distinct strategy between > DataFrame and RDD ? > > Regards, > > Olivier. >