I'll try to reproduce what has been reported to me first :) and I'll let
you know. Thanks !

Le jeu. 7 mai 2015 à 21:16, Michael Armbrust <mich...@databricks.com> a
écrit :

> I'd happily merge a PR that changes the distinct implementation to be more
> like Spark core, assuming it includes benchmarks that show better
> performance for both the "fits in memory case" and the "too big for memory
> case".
>
> On Thu, May 7, 2015 at 2:23 AM, Olivier Girardot <
> o.girar...@lateral-thoughts.com> wrote:
>
>> Ok, but for the moment, this seems to be killing performances on some
>> computations...
>> I'll try to give you precise figures on this between rdd and dataframe.
>>
>> Olivier.
>>
>> Le jeu. 7 mai 2015 à 10:08, Reynold Xin <r...@databricks.com> a écrit :
>>
>> > In 1.5, we will most likely just rewrite distinct in SQL to either use
>> the
>> > Aggregate operator which will benefit from all the Tungsten
>> optimizations,
>> > or have a Tungsten version of distinct for SQL/DataFrame.
>> >
>> > On Thu, May 7, 2015 at 1:32 AM, Olivier Girardot <
>> > o.girar...@lateral-thoughts.com> wrote:
>> >
>> >> Hi everyone,
>> >> there seems to be different implementations of the "distinct" feature
>> in
>> >> DataFrames and RDD and some performance issue with the DataFrame
>> distinct
>> >> API.
>> >>
>> >> In RDD.scala :
>> >>
>> >> def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null):
>> >> RDD[T] =
>> >> withScope { map(x => (x, null)).reduceByKey((x, y) => x,
>> >> numPartitions).map(_._1) }
>> >> And in DataFrame :
>> >>
>> >>
>> >> case class Distinct(partial: Boolean, child: SparkPlan) extends
>> UnaryNode
>> >> {
>> >> override def output: Seq[Attribute] = child.output override def
>> >> requiredChildDistribution: Seq[Distribution] = if (partial)
>> >> UnspecifiedDistribution :: Nil else
>> ClusteredDistribution(child.output) ::
>> >>
>> > Nil *override def execute(): RDD[Row] = {**
>> child.execute().mapPartitions {
>> >> iter =>** val hashSet = new scala.collection.mutable.HashSet[Row]()* *
>> var
>> >> currentRow: Row = null** while (iter.hasNext) {** currentRow =
>> >> iter.next()**
>> >> if (!hashSet.contains(currentRow)) {** hashSet.add(currentRow.copy())**
>> >> }**
>> >> }* * hashSet.iterator** }** }*}
>> >
>> >
>> >>
>> >>
>> >>
>> >>
>> >> I can try to reproduce more clearly the performance issue, but do you
>> have
>> >> any insights into why we can't have the same distinct strategy between
>> >> DataFrame and RDD ?
>> >>
>> >> Regards,
>> >>
>> >> Olivier.
>> >>
>> >
>>
>
>

Reply via email to