Thanks everyone for your contribution on this topic, I wanted to check-in to see if anyone has discovered a different or have an opinion on better approach to deduplicating data using pyspark. Would really appreciate any further insight on this.
Thanks, -Rishi On Wed, Jun 12, 2019 at 4:21 PM Yeikel <em...@yeikel.com> wrote: > Nicholas , thank you for your explanation. > > I am also interested in the example that Rishi is asking for. I am sure > mapPartitions may work , but as Vladimir suggests it may not be the best > option in terms of performance. > > @Vladimir Prus , are you aware of any example about writing a "custom > physical exec operator"? > > If anyone needs a further explanation for the follow up question Rishi > posted , please see the example below : > > > import org.apache.spark.sql.types._ > import org.apache.spark.sql.Row > > > val someData = Seq( > Row(1, 10), > Row(1, 20), > Row(1, 11) > ) > > val schema = List( > StructField("id", IntegerType, true), > StructField("score", IntegerType, true) > ) > > val df = spark.createDataFrame( > spark.sparkContext.parallelize(someData), > StructType(schema) > ) > > // Goal : Drop duplicates using the "id" as the primary key and keep the > highest "score". > > df.sort($"score".desc).dropDuplicates("id").show > > == Physical Plan == > *(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)]) > +- Exchange hashpartitioning(id#191, 200) > +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192, > false)]) > +- *(1) Sort [score#192 DESC NULLS LAST], true, 0 > +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200) > +- Scan ExistingRDD[id#191,score#192] > > This seems to work , but I don't know what are the implications if we use > this approach with a bigger dataset or what are the alternatives. From the > explain output I can see the two Exchanges , so it may not be the best > approach? > > > > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Regards, Rishi Shah