Specifying a custom Partitioner on RDD creation in Spark 2
Hi, I'm currently creating RDDs using a pattern like follows: val rdd: RDD[String] = session.sparkContext.parallelize(longkeys).flatMap( key => { logInfo(s"job at key: ${key}") Source.fromBytes(S3Util.getBytes(S3Util.getClient(region, S3Util.getCredentialsProvider("INSTANCE", "")), bucket, key)) .getLines() } ) We've been using this pattern or similar to workaround issues regarding S3 and our hadoop version. However, this same pattern could might be applied to other types of data sources, which may not have a connector. This method has been working out fairly well, but I'd like more control regarding how the data is partitioned from the start. I tried to manually partition the data without a partitioner, but got JVM exceptions regarding my Arrays being to large for the JVM. val keyList = groupedKeys.keys.toList val rdd: RDD[String] = session.sparkContext.parallelize(keyList,keyList.length).flatMap { key => logInfo(s"job at day: ${key}") val byteArrayBuffer = new ArrayBuffer[Byte]() val objectKeyList: List[String] = groupedKeys(key) objectKeyList.foreach( objectKey => { logInfo(s"working on object: ${objectKey}") byteArrayBuffer.appendAll(S3Util.getBytes(S3Util.getClient(region, S3Util.getCredentialsProvider("INSTANCE", "")), bucket, objectKey)) } ) Source.fromBytes(byteArrayBuffer.toArray[Byte]).getLines() } Then I've defined a custom partitioner based on my source data: class dayPartitioner(keys: List[String]) extends Partitioner with Logger { val keyMap: Map[String, List[String]] = keys.groupBy(_.substring(0, 10)) val partitions = keyMap.keySet.size val partitionMap: Map[String, Int] = keyMap.keys.zipWithIndex.toMap override def getPartition(key: Any): Int = { val keyString = key.asInstanceOf[String] val partitionKey = keyString.substring(0, 10) partitionMap(partitionKey) } override def numPartitions: Int = partitions } } I'd like to know do I have to create a custom RDD class to specify my RDD and use it like in the pattern above? If so I'd also like a reference regarding doing this, to hopefully save me some headaches and gotchas from a naive approach. I've found one such example https://stackoverflow.com/a/25204589 but it's from an older version of Spark. I'm hoping maybe there is something more recent and more in-depth. I don't mind references to books or otherwise. Best, Colin Williams - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Custom Partitioner not picked
Hi All, When i am submitting a spark job on YARN with Custom Partitioner, it is not picked by Executors. Executors still using the default HashPartitioner. I added logs into both HashPartitioner (org/apache/spark/Partitioner.scala) and Custom Partitioner. The completed executor logs shows HashPartitioner. Below is the Spark application code with Custom Partitioner and the log line which is added into HashPartitioner class of Partition.scala log.info("HashPartitioner="+key+"---"+numPartitions+""+Utils.nonNegativeMod(key.hashCode, numPartitions)) The Executor logs has 16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42 16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42 How to make sure, the executors are picking the right partitioner. *Code:* package org.apache.spark class ExactPartitioner(partitions: Int) extends Partitioner with Logging{ def numPartitions: Int = partitions def getPartition(key: Any): Int = { * log.info <http://log.info>("ExactPartitioner="+key)* key match{ case "INFO" => 0 case "DEBUG" => 1 case "ERROR" => 2 case "WARN" => 3 case "FATAL" => 4 } } } object GroupByCLDB { def main(args: Array[String]) { val logFile = "/DATA" val sparkConf = new SparkConf().setAppName("GroupBy") sparkConf.set("spark.executor.memory","4g"); sparkConf.set("spark.executor.cores","2"); sparkConf.set("spark.executor.instances","2"); val sc = new SparkContext(sparkConf) val logData = sc.textFile(logFile) case class LogClass(one:String,two:String) def parse(line: String) = { val pieces = line.split(' ') val level = pieces(2).toString val one = pieces(0).toString val two = pieces(1).toString (level,LogClass(one,two)) } val output = logData.map(x => parse(x)) *val partitioned = output.partitionBy(new ExactPartitioner(5)).persist()val groups = partitioned.groupByKey(new ExactPartitioner(5))* groups.count() output.partitions.size partitioned.partitions.size } } Thanks, Prabhu Joseph
Re: map operation clears custom partitioner
You can use mapValues to ensure partitioning is not lost. From: Brian London <brianmlon...@gmail.com<mailto:brianmlon...@gmail.com>> Date: Monday, February 22, 2016 at 1:21 PM To: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: map operation clears custom partitioner It appears that when a custom partitioner is applied in a groupBy operation, it is not propagated through subsequent non-shuffle operations. Is this intentional? Is there any way to carry custom partitioning through maps? I've uploaded a gist that exhibits the behavior. https://gist.github.com/BrianLondon/c3c3355d1971971f3ec6
Re: map operation clears custom partitioner
The problem is that your new mapped values may be in the wrong partition, according to your partitioner. Look for methods that have a preservesPartitioning flag, which is a way to indicate that you know the partitioning remains correct. (Like, you partition by keys and didn't change the keys in mapping) On Mon, Feb 22, 2016 at 6:21 PM, Brian London <brianmlon...@gmail.com> wrote: > It appears that when a custom partitioner is applied in a groupBy operation, > it is not propagated through subsequent non-shuffle operations. Is this > intentional? Is there any way to carry custom partitioning through maps? > > I've uploaded a gist that exhibits the behavior. > https://gist.github.com/BrianLondon/c3c3355d1971971f3ec6 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
map operation clears custom partitioner
It appears that when a custom partitioner is applied in a groupBy operation, it is not propagated through subsequent non-shuffle operations. Is this intentional? Is there any way to carry custom partitioning through maps? I've uploaded a gist that exhibits the behavior. https://gist.github.com/BrianLondon/c3c3355d1971971f3ec6
Re: How to use a custom partitioner in a dataframe in Spark
although it is not a bad idea to write data out partitioned, and then use a merge join when reading it back in, this currently isn't even easily doable with rdds because when you read an rdd from disk the partitioning info is lost. re-introducing a partitioner at that point causes a shuffle defeating the purpose. On Thu, Feb 18, 2016 at 1:49 PM, Rishi Mishra <rmis...@snappydata.io> wrote: > Michael, > Is there any specific reason why DataFrames does not have partitioners > like RDDs ? This will be very useful if one is writing custom datasources , > which keeps data in partitions. While storing data one can pre-partition > the data at Spark level rather than at the datasource. > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > https://in.linkedin.com/in/rishiteshmishra > > On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> So suppose I have a bunch of userIds and I need to save them as parquet >> in database. I also need to load them back and need to be able to do a join >> on userId. My idea is to partition by userId hashcode first and then on >> userId. So that I don't have to deal with any performance issues because of >> a number of small files and also to be able to scan faster. >> >> >> Something like ...df.write.format("parquet").partitionBy( "userIdHash" >> , "userId").mode(SaveMode.Append).save("userRecords"); >> >> On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy < >> swethakasire...@gmail.com> wrote: >> >>> So suppose I have a bunch of userIds and I need to save them as parquet >>> in database. I also need to load them back and need to be able to do a join >>> on userId. My idea is to partition by userId hashcode first and then on >>> userId. >>> >>> >>> >>> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust < >>> mich...@databricks.com> wrote: >>> >>>> Can you describe what you are trying to accomplish? What would the >>>> custom partitioner be? >>>> >>>> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> How do I use a custom partitioner when I do a saveAsTable in a >>>>> dataframe. >>>>> >>>>> >>>>> Thanks, >>>>> Swetha >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> - >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>> >> >
Re: How to use a custom partitioner in a dataframe in Spark
Michael, Is there any specific reason why DataFrames does not have partitioners like RDDs ? This will be very useful if one is writing custom datasources , which keeps data in partitions. While storing data one can pre-partition the data at Spark level rather than at the datasource. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Thu, Feb 18, 2016 at 3:50 AM, swetha kasireddy <swethakasire...@gmail.com > wrote: > So suppose I have a bunch of userIds and I need to save them as parquet in > database. I also need to load them back and need to be able to do a join > on userId. My idea is to partition by userId hashcode first and then on > userId. So that I don't have to deal with any performance issues because of > a number of small files and also to be able to scan faster. > > > Something like ...df.write.format("parquet").partitionBy( "userIdHash" > , "userId").mode(SaveMode.Append).save("userRecords"); > > On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> So suppose I have a bunch of userIds and I need to save them as parquet >> in database. I also need to load them back and need to be able to do a join >> on userId. My idea is to partition by userId hashcode first and then on >> userId. >> >> >> >> On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust < >> mich...@databricks.com> wrote: >> >>> Can you describe what you are trying to accomplish? What would the >>> custom partitioner be? >>> >>> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> How do I use a custom partitioner when I do a saveAsTable in a >>>> dataframe. >>>> >>>> >>>> Thanks, >>>> Swetha >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >
Re: How to use a custom partitioner in a dataframe in Spark
So suppose I have a bunch of userIds and I need to save them as parquet in database. I also need to load them back and need to be able to do a join on userId. My idea is to partition by userId hashcode first and then on userId. So that I don't have to deal with any performance issues because of a number of small files and also to be able to scan faster. Something like ...df.write.format("parquet").partitionBy( "userIdHash" , "userId").mode(SaveMode.Append).save("userRecords"); On Wed, Feb 17, 2016 at 2:16 PM, swetha kasireddy <swethakasire...@gmail.com > wrote: > So suppose I have a bunch of userIds and I need to save them as parquet in > database. I also need to load them back and need to be able to do a join > on userId. My idea is to partition by userId hashcode first and then on > userId. > > > > On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <mich...@databricks.com > > wrote: > >> Can you describe what you are trying to accomplish? What would the >> custom partitioner be? >> >> On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote: >> >>> Hi, >>> >>> How do I use a custom partitioner when I do a saveAsTable in a dataframe. >>> >>> >>> Thanks, >>> Swetha >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
Re: How to use a custom partitioner in a dataframe in Spark
So suppose I have a bunch of userIds and I need to save them as parquet in database. I also need to load them back and need to be able to do a join on userId. My idea is to partition by userId hashcode first and then on userId. On Wed, Feb 17, 2016 at 11:51 AM, Michael Armbrust <mich...@databricks.com> wrote: > Can you describe what you are trying to accomplish? What would the custom > partitioner be? > > On Tue, Feb 16, 2016 at 1:21 PM, SRK <swethakasire...@gmail.com> wrote: > >> Hi, >> >> How do I use a custom partitioner when I do a saveAsTable in a dataframe. >> >> >> Thanks, >> Swetha >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: How to use a custom partitioner in a dataframe in Spark
Unfortunately there is not any, at least till 1.5. Have not gone through the new DataSet of 1.6. There is some basic support for Parquet like partitionByColumn. If you want to partition your dataset on a certain way you have to use an RDD to partition & convert that into a DataFrame before storing in table. Regards, Rishitesh Mishra, SnappyData . (http://www.snappydata.io/) https://in.linkedin.com/in/rishiteshmishra On Tue, Feb 16, 2016 at 11:51 PM, SRK <swethakasire...@gmail.com> wrote: > Hi, > > How do I use a custom partitioner when I do a saveAsTable in a dataframe. > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
How to use a custom partitioner in a dataframe in Spark
Hi, How do I use a custom partitioner when I do a saveAsTable in a dataframe. Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-a-custom-partitioner-in-a-dataframe-in-Spark-tp26240.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: python rdd.partionBy(): any examples of a custom partitioner?
refer here: https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html of section: Example 4-27. Python custom partitioner > On Dec 8, 2015, at 10:07 AM, Keith Freeman <8fo...@gmail.com> wrote: > > I'm not a python expert, so I'm wondering if anybody has a working example of > a partitioner for the "partitionFunc" argument (default "portable_hash") to > rdd.partitionBy()? > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
python rdd.partionBy(): any examples of a custom partitioner?
I'm not a python expert, so I'm wondering if anybody has a working example of a partitioner for the "partitionFunc" argument (default "portable_hash") to rdd.partitionBy()? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does using Custom Partitioner before calling reduceByKey improve performance?
Hi, We currently use reduceByKey to reduce by a particular metric name in our Streaming/Batch job. It seems to be doing a lot of shuffles and it has impact on performance. Does using a custompartitioner before calling reduceByKey improve performance? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
If you just want to control the number of reducers, then setting the numPartitions is sufficient. If you want to control how exact partitioning scheme (that is some other scheme other than hash-based) then you need to implement a custom partitioner. It can be used to improve data skews, etc. which ultimately improves performance. On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote: > Hi, > > We currently use reduceByKey to reduce by a particular metric name in our > Streaming/Batch job. It seems to be doing a lot of shuffles and it has > impact on performance. Does using a custompartitioner before calling > reduceByKey improve performance? > > > Thanks, > Swetha > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
So, Wouldn't using a customPartitioner on the rdd upon which the groupByKey or reduceByKey is performed avoid shuffles and improve performance? My code does groupByAndSort and reduceByKey on different datasets as shown below. Would using a custom partitioner on those datasets before using a groupByKey or reduceByKey improve performance? My idea is to avoid shuffles and improve performance. Also, right now I see a lot of spills when there is a very large dataset for groupByKey and reduceByKey. I think the memory is not sufficient. We need to group by sessionId and then sort the Jsons based on the timeStamp as shown in the below code. What is the alternative to using groupByKey for better performance? And in case of reduceByKey, would using a customPartitioner on the RDD upon which the reduceByKey is performed would reduce the shuffles and improve the performance? rdd.partitionBy(customPartitioner) def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, List[(Long, String)])] = { val grpdRecs = rdd.groupByKey(); val srtdRecs = grpdRecs.mapValues[(List[(Long, String)])](iter => iter.toList.sortBy(_._1)) srtdRecs } rdd.reduceByKey((a, b) => { (Math.max(a._1, b._1), (a._2 ++ b._2)) }) On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> wrote: > If you just want to control the number of reducers, then setting the > numPartitions is sufficient. If you want to control how exact partitioning > scheme (that is some other scheme other than hash-based) then you need to > implement a custom partitioner. It can be used to improve data skews, etc. > which ultimately improves performance. > > On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote: > >> Hi, >> >> We currently use reduceByKey to reduce by a particular metric name in our >> Streaming/Batch job. It seems to be doing a lot of shuffles and it has >> impact on performance. Does using a custompartitioner before calling >> reduceByKey improve performance? >> >> >> Thanks, >> Swetha >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
if you specify the same partitioner (custom or otherwise) for both partitionBy and groupBy, then may be it will help. The fundamental problem is groupByKey, that takes a lot of working memory. 1. Try to avoid groupByKey. What is it that you want to after sorting the list of grouped events? can you do that operation with a reduceByKey? 2. If not, use more partitions. That would cause lesser data in each partition, so less spilling. 3. You can control the amount memory allocated for shuffles by changing the configuration spark.shuffle.memoryFraction . More fraction would cause less spilling. On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <swethakasire...@gmail.com > wrote: > So, Wouldn't using a customPartitioner on the rdd upon which the > groupByKey or reduceByKey is performed avoid shuffles and improve > performance? My code does groupByAndSort and reduceByKey on different > datasets as shown below. Would using a custom partitioner on those datasets > before using a groupByKey or reduceByKey improve performance? My idea is > to avoid shuffles and improve performance. Also, right now I see a lot of > spills when there is a very large dataset for groupByKey and reduceByKey. I > think the memory is not sufficient. We need to group by sessionId and then > sort the Jsons based on the timeStamp as shown in the below code. > > > What is the alternative to using groupByKey for better performance? And in > case of reduceByKey, would using a customPartitioner on the RDD upon which > the reduceByKey is performed would reduce the shuffles and improve the > performance? > > > rdd.partitionBy(customPartitioner) > > def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, > List[(Long, String)])] = > { val grpdRecs = rdd.groupByKey(); val srtdRecs = > grpdRecs.mapValues[(List[(Long, String)])](iter => > iter.toList.sortBy(_._1)) srtdRecs } > > rdd.reduceByKey((a, b) => { > (Math.max(a._1, b._1), (a._2 ++ b._2)) > }) > > > > On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> > wrote: > >> If you just want to control the number of reducers, then setting the >> numPartitions is sufficient. If you want to control how exact partitioning >> scheme (that is some other scheme other than hash-based) then you need to >> implement a custom partitioner. It can be used to improve data skews, etc. >> which ultimately improves performance. >> >> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> >> wrote: >> >>> Hi, >>> >>> We currently use reduceByKey to reduce by a particular metric name in our >>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has >>> impact on performance. Does using a custompartitioner before calling >>> reduceByKey improve performance? >>> >>> >>> Thanks, >>> Swetha >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
If it is streaming, you can look at updateStateByKey for maintaining active sessions. But wont work for batch. and I answered that before. it can improve performance if you change the partitioning scheme from hash-based to something else. Its hard to say anything beyond that without understand the data skew and other details of your application. Before jumping into that, you should simple change the number of partitions and see if the performance improves. On Tue, Oct 27, 2015 at 7:10 PM, swetha kasireddy <swethakasire...@gmail.com > wrote: > After sorting the list of grouped events I would need to have an RDD that > has a key which is nothing but the sessionId and a list of values that are > sorted by timeStamp for each input Json. So basically the return type would > be RDD[(String, List[(Long, String)] where the key is the sessionId and > a list of tuples that has a timeStamp and Json as the values. I will need > to use groupByKey to do a groupBy sessionId and secondary sort by timeStamp > and then get the list of JsonValues in a sorted order. Is there any > alternative for that? Please find the code below that I used for the same. > > > Also, does using a customPartitioner for a reduceByKey improve performance? > > > def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, > List[(Long, String)])] = > { val grpdRecs = rdd.groupByKey(); val srtdRecs = > grpdRecs.mapValues[(List[(Long, String)])](iter => > iter.toList.sortBy(_._1)) srtdRecs } > > > On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com> > wrote: > >> if you specify the same partitioner (custom or otherwise) for both >> partitionBy and groupBy, then may be it will help. The fundamental problem >> is groupByKey, that takes a lot of working memory. >> 1. Try to avoid groupByKey. What is it that you want to after sorting the >> list of grouped events? can you do that operation with a reduceByKey? >> 2. If not, use more partitions. That would cause lesser data in each >> partition, so less spilling. >> 3. You can control the amount memory allocated for shuffles by changing >> the configuration spark.shuffle.memoryFraction . More fraction would cause >> less spilling. >> >> >> On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy < >> swethakasire...@gmail.com> wrote: >> >>> So, Wouldn't using a customPartitioner on the rdd upon which the >>> groupByKey or reduceByKey is performed avoid shuffles and improve >>> performance? My code does groupByAndSort and reduceByKey on different >>> datasets as shown below. Would using a custom partitioner on those datasets >>> before using a groupByKey or reduceByKey improve performance? My idea is >>> to avoid shuffles and improve performance. Also, right now I see a lot of >>> spills when there is a very large dataset for groupByKey and reduceByKey. I >>> think the memory is not sufficient. We need to group by sessionId and then >>> sort the Jsons based on the timeStamp as shown in the below code. >>> >>> >>> What is the alternative to using groupByKey for better performance? And >>> in case of reduceByKey, would using a customPartitioner on the RDD upon >>> which the reduceByKey is performed would reduce the shuffles and improve >>> the performance? >>> >>> >>> rdd.partitionBy(customPartitioner) >>> >>> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): >>> RDD[(String, List[(Long, String)])] = >>> { val grpdRecs = rdd.groupByKey(); val srtdRecs = >>> grpdRecs.mapValues[(List[(Long, String)])](iter => >>> iter.toList.sortBy(_._1)) srtdRecs } >>> >>> rdd.reduceByKey((a, b) => { >>> (Math.max(a._1, b._1), (a._2 ++ b._2)) >>> }) >>> >>> >>> >>> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> If you just want to control the number of reducers, then setting the >>>> numPartitions is sufficient. If you want to control how exact partitioning >>>> scheme (that is some other scheme other than hash-based) then you need to >>>> implement a custom partitioner. It can be used to improve data skews, etc. >>>> which ultimately improves performance. >>>> >>>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> We currently use reduceByKey to reduce by a particular metric name in >>>>> our >>>>> Streamin
Re: Does using Custom Partitioner before calling reduceByKey improve performance?
After sorting the list of grouped events I would need to have an RDD that has a key which is nothing but the sessionId and a list of values that are sorted by timeStamp for each input Json. So basically the return type would be RDD[(String, List[(Long, String)] where the key is the sessionId and a list of tuples that has a timeStamp and Json as the values. I will need to use groupByKey to do a groupBy sessionId and secondary sort by timeStamp and then get the list of JsonValues in a sorted order. Is there any alternative for that? Please find the code below that I used for the same. Also, does using a customPartitioner for a reduceByKey improve performance? def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, List[(Long, String)])] = { val grpdRecs = rdd.groupByKey(); val srtdRecs = grpdRecs.mapValues[(List[(Long, String)])](iter => iter.toList.sortBy(_._1)) srtdRecs } On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com> wrote: > if you specify the same partitioner (custom or otherwise) for both > partitionBy and groupBy, then may be it will help. The fundamental problem > is groupByKey, that takes a lot of working memory. > 1. Try to avoid groupByKey. What is it that you want to after sorting the > list of grouped events? can you do that operation with a reduceByKey? > 2. If not, use more partitions. That would cause lesser data in each > partition, so less spilling. > 3. You can control the amount memory allocated for shuffles by changing > the configuration spark.shuffle.memoryFraction . More fraction would cause > less spilling. > > > On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> So, Wouldn't using a customPartitioner on the rdd upon which the >> groupByKey or reduceByKey is performed avoid shuffles and improve >> performance? My code does groupByAndSort and reduceByKey on different >> datasets as shown below. Would using a custom partitioner on those datasets >> before using a groupByKey or reduceByKey improve performance? My idea is >> to avoid shuffles and improve performance. Also, right now I see a lot of >> spills when there is a very large dataset for groupByKey and reduceByKey. I >> think the memory is not sufficient. We need to group by sessionId and then >> sort the Jsons based on the timeStamp as shown in the below code. >> >> >> What is the alternative to using groupByKey for better performance? And >> in case of reduceByKey, would using a customPartitioner on the RDD upon >> which the reduceByKey is performed would reduce the shuffles and improve >> the performance? >> >> >> rdd.partitionBy(customPartitioner) >> >> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String, >> List[(Long, String)])] = >> { val grpdRecs = rdd.groupByKey(); val srtdRecs = >> grpdRecs.mapValues[(List[(Long, String)])](iter => >> iter.toList.sortBy(_._1)) srtdRecs } >> >> rdd.reduceByKey((a, b) => { >> (Math.max(a._1, b._1), (a._2 ++ b._2)) >> }) >> >> >> >> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> If you just want to control the number of reducers, then setting the >>> numPartitions is sufficient. If you want to control how exact partitioning >>> scheme (that is some other scheme other than hash-based) then you need to >>> implement a custom partitioner. It can be used to improve data skews, etc. >>> which ultimately improves performance. >>> >>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> We currently use reduceByKey to reduce by a particular metric name in >>>> our >>>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has >>>> impact on performance. Does using a custompartitioner before calling >>>> reduceByKey improve performance? >>>> >>>> >>>> Thanks, >>>> Swetha >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >
Re: Custom Partitioner
alter the range partitioner such that it skews the partitioning and assigns more partitions to the heavier weighted keys? to do this you will have to know the weighting before you start On Wed, Sep 2, 2015 at 8:02 AM shahid ashraf <sha...@trialx.com> wrote: > yes i can take as an example , but my actual use case is that in need to > resolve a data skew, when i do grouping based on key(A-Z) the resulting > partitions are skewed like > (partition no.,no_of_keys, total elements with given key) > << partition: [(0, 0, 0), (1, 15, 17395), (2, 0, 0), (3, 0, 0), (4, 13, > 18196), (5, 0, 0), (6, 0, 0), (7, 0, 0), (8, 1, 1), (9, 0, 0)] and > elements: >> > the data has been skewed to partition 1 and 4, i need to split the > partition. and do processing on split partitions and i should be able to > combine splitted partition back also. > > On Tue, Sep 1, 2015 at 10:42 PM, Davies Liu <dav...@databricks.com> wrote: > >> You can take the sortByKey as example: >> https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642 >> >> On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker <jem.tuc...@gmail.com> wrote: >> > something like... >> > >> > class RangePartitioner(Partitioner): >> > def __init__(self, numParts): >> > self.numPartitions = numParts >> > self.partitionFunction = rangePartition >> > def rangePartition(key): >> > # Logic to turn key into a partition id >> > return id >> > >> > On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com> >> wrote: >> >> >> >> Hi >> >> >> >> I think range partitioner is not available in pyspark, so if we want >> >> create one. how should we create that. my question is that. >> >> >> >> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> >> wrote: >> >>> >> >>> Ah sorry I miss read your question. In pyspark it looks like you just >> >>> need to instantiate the Partitioner class with numPartitions and >> >>> partitionFunc. >> >>> >> >>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> >> wrote: >> >>>> >> >>>> Hi >> >>>> >> >>>> I did not get this, e.g if i need to create a custom partitioner like >> >>>> range partitioner. >> >>>> >> >>>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> >> wrote: >> >>>>> >> >>>>> Hi, >> >>>>> >> >>>>> You just need to extend Partitioner and override the numPartitions >> and >> >>>>> getPartition methods, see below >> >>>>> >> >>>>> class MyPartitioner extends partitioner { >> >>>>> def numPartitions: Int = // Return the number of partitions >> >>>>> def getPartition(key Any): Int = // Return the partition for a >> given >> >>>>> key >> >>>>> } >> >>>>> >> >>>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri < >> shahidashr...@icloud.com> >> >>>>> wrote: >> >>>>>> >> >>>>>> Hi Sparkians >> >>>>>> >> >>>>>> How can we create a customer partition in pyspark >> >>>>>> >> >>>>>> >> - >> >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >> >>>>>> >> >>>> >> >>>> >> >>>> >> >>>> -- >> >>>> with Regards >> >>>> Shahid Ashraf >> >> >> >> >> >> >> >> >> >> -- >> >> with Regards >> >> Shahid Ashraf >> > > > > -- > with Regards > Shahid Ashraf >
Re: Custom Partitioner
yes i can take as an example , but my actual use case is that in need to resolve a data skew, when i do grouping based on key(A-Z) the resulting partitions are skewed like (partition no.,no_of_keys, total elements with given key) << partition: [(0, 0, 0), (1, 15, 17395), (2, 0, 0), (3, 0, 0), (4, 13, 18196), (5, 0, 0), (6, 0, 0), (7, 0, 0), (8, 1, 1), (9, 0, 0)] and elements: >> the data has been skewed to partition 1 and 4, i need to split the partition. and do processing on split partitions and i should be able to combine splitted partition back also. On Tue, Sep 1, 2015 at 10:42 PM, Davies Liu <dav...@databricks.com> wrote: > You can take the sortByKey as example: > https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642 > > On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker <jem.tuc...@gmail.com> wrote: > > something like... > > > > class RangePartitioner(Partitioner): > > def __init__(self, numParts): > > self.numPartitions = numParts > > self.partitionFunction = rangePartition > > def rangePartition(key): > > # Logic to turn key into a partition id > > return id > > > > On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com> wrote: > >> > >> Hi > >> > >> I think range partitioner is not available in pyspark, so if we want > >> create one. how should we create that. my question is that. > >> > >> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> > wrote: > >>> > >>> Ah sorry I miss read your question. In pyspark it looks like you just > >>> need to instantiate the Partitioner class with numPartitions and > >>> partitionFunc. > >>> > >>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> > wrote: > >>>> > >>>> Hi > >>>> > >>>> I did not get this, e.g if i need to create a custom partitioner like > >>>> range partitioner. > >>>> > >>>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> > wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> You just need to extend Partitioner and override the numPartitions > and > >>>>> getPartition methods, see below > >>>>> > >>>>> class MyPartitioner extends partitioner { > >>>>> def numPartitions: Int = // Return the number of partitions > >>>>> def getPartition(key Any): Int = // Return the partition for a > given > >>>>> key > >>>>> } > >>>>> > >>>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri < > shahidashr...@icloud.com> > >>>>> wrote: > >>>>>> > >>>>>> Hi Sparkians > >>>>>> > >>>>>> How can we create a customer partition in pyspark > >>>>>> > >>>>>> > - > >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >>>>>> For additional commands, e-mail: user-h...@spark.apache.org > >>>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> with Regards > >>>> Shahid Ashraf > >> > >> > >> > >> > >> -- > >> with Regards > >> Shahid Ashraf > -- with Regards Shahid Ashraf
Re: Custom Partitioner
You can take the sortByKey as example: https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L642 On Tue, Sep 1, 2015 at 3:48 AM, Jem Tucker <jem.tuc...@gmail.com> wrote: > something like... > > class RangePartitioner(Partitioner): > def __init__(self, numParts): > self.numPartitions = numParts > self.partitionFunction = rangePartition > def rangePartition(key): > # Logic to turn key into a partition id > return id > > On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com> wrote: >> >> Hi >> >> I think range partitioner is not available in pyspark, so if we want >> create one. how should we create that. my question is that. >> >> On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: >>> >>> Ah sorry I miss read your question. In pyspark it looks like you just >>> need to instantiate the Partitioner class with numPartitions and >>> partitionFunc. >>> >>> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote: >>>> >>>> Hi >>>> >>>> I did not get this, e.g if i need to create a custom partitioner like >>>> range partitioner. >>>> >>>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: >>>>> >>>>> Hi, >>>>> >>>>> You just need to extend Partitioner and override the numPartitions and >>>>> getPartition methods, see below >>>>> >>>>> class MyPartitioner extends partitioner { >>>>> def numPartitions: Int = // Return the number of partitions >>>>> def getPartition(key Any): Int = // Return the partition for a given >>>>> key >>>>> } >>>>> >>>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com> >>>>> wrote: >>>>>> >>>>>> Hi Sparkians >>>>>> >>>>>> How can we create a customer partition in pyspark >>>>>> >>>>>> - >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>> >>>> >>>> >>>> -- >>>> with Regards >>>> Shahid Ashraf >> >> >> >> >> -- >> with Regards >> Shahid Ashraf - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom Partitioner
Hi, You just need to extend Partitioner and override the numPartitions and getPartition methods, see below class MyPartitioner extends partitioner { def numPartitions: Int = // Return the number of partitions def getPartition(key Any): Int = // Return the partition for a given key } On Tue, Sep 1, 2015 at 10:15 AM shahid qadriwrote: > Hi Sparkians > > How can we create a customer partition in pyspark > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Custom Partitioner
Ah sorry I miss read your question. In pyspark it looks like you just need to instantiate the Partitioner class with numPartitions and partitionFunc. On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote: > Hi > > I did not get this, e.g if i need to create a custom partitioner like > range partitioner. > > On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: > >> Hi, >> >> You just need to extend Partitioner and override the numPartitions and >> getPartition methods, see below >> >> class MyPartitioner extends partitioner { >> def numPartitions: Int = // Return the number of partitions >> def getPartition(key Any): Int = // Return the partition for a given key >> } >> >> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com> >> wrote: >> >>> Hi Sparkians >>> >>> How can we create a customer partition in pyspark >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> > > > -- > with Regards > Shahid Ashraf >
Custom Partitioner
Hi Sparkians How can we create a customer partition in pyspark - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom Partitioner
Hi I did not get this, e.g if i need to create a custom partitioner like range partitioner. On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: > Hi, > > You just need to extend Partitioner and override the numPartitions and > getPartition methods, see below > > class MyPartitioner extends partitioner { > def numPartitions: Int = // Return the number of partitions > def getPartition(key Any): Int = // Return the partition for a given key > } > > On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com> > wrote: > >> Hi Sparkians >> >> How can we create a customer partition in pyspark >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> -- with Regards Shahid Ashraf
Re: Custom Partitioner
Hi I think range partitioner is not available in pyspark, so if we want create one. how should we create that. my question is that. On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: > Ah sorry I miss read your question. In pyspark it looks like you just need > to instantiate the Partitioner class with numPartitions and partitionFunc. > > On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote: > >> Hi >> >> I did not get this, e.g if i need to create a custom partitioner like >> range partitioner. >> >> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: >> >>> Hi, >>> >>> You just need to extend Partitioner and override the numPartitions and >>> getPartition methods, see below >>> >>> class MyPartitioner extends partitioner { >>> def numPartitions: Int = // Return the number of partitions >>> def getPartition(key Any): Int = // Return the partition for a given >>> key >>> } >>> >>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com> >>> wrote: >>> >>>> Hi Sparkians >>>> >>>> How can we create a customer partition in pyspark >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >> >> >> -- >> with Regards >> Shahid Ashraf >> > -- with Regards Shahid Ashraf
Re: Custom Partitioner
something like... class RangePartitioner(Partitioner): def __init__(self, numParts): self.numPartitions = numParts self.partitionFunction = rangePartition def rangePartition(key): # Logic to turn key into a partition id return id On Tue, Sep 1, 2015 at 11:38 AM shahid ashraf <sha...@trialx.com> wrote: > Hi > > I think range partitioner is not available in pyspark, so if we want > create one. how should we create that. my question is that. > > On Tue, Sep 1, 2015 at 3:57 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: > >> Ah sorry I miss read your question. In pyspark it looks like you just >> need to instantiate the Partitioner class with numPartitions and >> partitionFunc. >> >> On Tue, Sep 1, 2015 at 11:13 AM shahid ashraf <sha...@trialx.com> wrote: >> >>> Hi >>> >>> I did not get this, e.g if i need to create a custom partitioner like >>> range partitioner. >>> >>> On Tue, Sep 1, 2015 at 3:22 PM, Jem Tucker <jem.tuc...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> You just need to extend Partitioner and override the numPartitions and >>>> getPartition methods, see below >>>> >>>> class MyPartitioner extends partitioner { >>>> def numPartitions: Int = // Return the number of partitions >>>> def getPartition(key Any): Int = // Return the partition for a given >>>> key >>>> } >>>> >>>> On Tue, Sep 1, 2015 at 10:15 AM shahid qadri <shahidashr...@icloud.com> >>>> wrote: >>>> >>>>> Hi Sparkians >>>>> >>>>> How can we create a customer partition in pyspark >>>>> >>>>> - >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>> >>> >>> -- >>> with Regards >>> Shahid Ashraf >>> >> > > > -- > with Regards > Shahid Ashraf >
Custom partitioner
Hi I have csv data in which i have a column of date time. I want to partition my data in 12 partitions with each partition containing data of one month only. I am not getting how to write such partitioner and how to use that partitioner to read write data. Kindly help me in this regard. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-partitioner-tp24001.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom partitioner
You can write subclass of Partitioner whose getPartition() returns partition number corresponding to the given key. Take a look at core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala for an example. Cheers On Sun, Jul 26, 2015 at 1:43 PM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi I have csv data in which i have a column of date time. I want to partition my data in 12 partitions with each partition containing data of one month only. I am not getting how to write such partitioner and how to use that partitioner to read write data. Kindly help me in this regard. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-partitioner-tp24001.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Python Custom Partitioner
Hi Can someone share some working code for custom partitioner in python? I am trying to understand it better. Here is documentation partitionBy(*numPartitions*, *partitionFunc=function portable_hash at 0x2c45140*) https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy Return a copy of the RDD partitioned using the specified partitioner. what I am trying to do - 1. Create a dataframe 2. Partition it using one specific column 3. create another dataframe 4. partition it on the same column 5. join (to enforce map-side join) My question: a) Am I on right path? b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy, what gets passed to the custom function? tuple? row? how to access (say 3rd column of a tuple inside partitioner function)? -- Best Regards, Ayan Guha
Re: Python Custom Partitioner
I have implemented map-side join with broadcast variables and the code is on mailing list (scala). On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote: Hi Can someone share some working code for custom partitioner in python? I am trying to understand it better. Here is documentation partitionBy(*numPartitions*, *partitionFunc=function portable_hash at 0x2c45140*) https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy Return a copy of the RDD partitioned using the specified partitioner. what I am trying to do - 1. Create a dataframe 2. Partition it using one specific column 3. create another dataframe 4. partition it on the same column 5. join (to enforce map-side join) My question: a) Am I on right path? b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy, what gets passed to the custom function? tuple? row? how to access (say 3rd column of a tuple inside partitioner function)? -- Best Regards, Ayan Guha -- Deepak
Re: Python Custom Partitioner
Thanks, but is there non broadcast solution? On 5 May 2015 01:34, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have implemented map-side join with broadcast variables and the code is on mailing list (scala). On Mon, May 4, 2015 at 8:38 PM, ayan guha guha.a...@gmail.com wrote: Hi Can someone share some working code for custom partitioner in python? I am trying to understand it better. Here is documentation partitionBy(*numPartitions*, *partitionFunc=function portable_hash at 0x2c45140*) https://spark.apache.org/docs/1.3.1/api/python/pyspark.html#pyspark.RDD.partitionBy Return a copy of the RDD partitioned using the specified partitioner. what I am trying to do - 1. Create a dataframe 2. Partition it using one specific column 3. create another dataframe 4. partition it on the same column 5. join (to enforce map-side join) My question: a) Am I on right path? b) How can I do partitionby? Specifically, when I call DF.rdd.partitionBy, what gets passed to the custom function? tuple? row? how to access (say 3rd column of a tuple inside partitioner function)? -- Best Regards, Ayan Guha -- Deepak