Re: Assigning input files to spark partitions
Hi Daniel, Yes that should work also. However, is it possible to setup so that each RDD has exactly one partition, without repartitioning (and thus incurring extra cost)? Is there a mechanism similar to MR where we can ensure each partition is assigned some amount of data by size, by setting some block size parameter? On Thu, Nov 13, 2014 at 1:05 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote No i don't want separate RDD because each of these partitions are being processed the same way (in my case, each partition corresponds to HBase keys belonging to one region server, and i will do HBase lookups). After that i have aggregations too, hence all these partitions should be in the same RDD. The reason to follow the partition structure is to limit concurrent HBase lookups targeting a single region server. Neither of these is necessarily a barrier to using separate RDDs. You can define the function you want to use and then pass it to multiple map methods. Then you could union all the RDDs to do your aggregations. For example, it might look something like this: val paths: String = ... // the paths to the files you want to load def myFunc(t: T) = ... // the function to apply to every RDD val rdds = paths.map { path = sc.textFile(path).map(myFunc) } val completeRdd = sc.union(rdds) Does that make any sense? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Assigning input files to spark partitions
I'm not aware of any such mechanism. On Mon, Nov 17, 2014 at 2:55 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi Daniel, Yes that should work also. However, is it possible to setup so that each RDD has exactly one partition, without repartitioning (and thus incurring extra cost)? Is there a mechanism similar to MR where we can ensure each partition is assigned some amount of data by size, by setting some block size parameter? On Thu, Nov 13, 2014 at 1:05 PM, Daniel Siegmann daniel.siegm...@velos.io wrote: On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote No i don't want separate RDD because each of these partitions are being processed the same way (in my case, each partition corresponds to HBase keys belonging to one region server, and i will do HBase lookups). After that i have aggregations too, hence all these partitions should be in the same RDD. The reason to follow the partition structure is to limit concurrent HBase lookups targeting a single region server. Neither of these is necessarily a barrier to using separate RDDs. You can define the function you want to use and then pass it to multiple map methods. Then you could union all the RDDs to do your aggregations. For example, it might look something like this: val paths: String = ... // the paths to the files you want to load def myFunc(t: T) = ... // the function to apply to every RDD val rdds = paths.map { path = sc.textFile(path).map(myFunc) } val completeRdd = sc.union(rdds) Does that make any sense? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Assigning input files to spark partitions
Would it make sense to read each file in as a separate RDD? This way you would be guaranteed the data is partitioned as you expected. Possibly you could then repartition each of those RDDs into a single partition and then union them. I think that would achieve what you expect. But it would be easy to accidentally screw this up (have some operation that causes a shuffle), so I think you're better off just leaving them as separate RDDs. On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I have a set of input files for a spark program, with each file corresponding to a logical data partition. What is the API/mechanism to assign each input file (or a set of files) to a spark partition, when initializing RDDs? When i create a spark RDD pointing to the directory of files, my understanding is it's not guaranteed that each input file will be treated as separate partition. My job semantics require that the data is partitioned, and i want to leverage the partitioning that has already been done, rather than repartitioning again in the spark job. I tried to lookup online but haven't found any pointers so far. Thanks pala -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Assigning input files to spark partitions
If your data is in hdfs and you are reading as textFile and each file is less than block size, my understanding is it would always have one partition per file. On Thursday, November 13, 2014, Daniel Siegmann daniel.siegm...@velos.io wrote: Would it make sense to read each file in as a separate RDD? This way you would be guaranteed the data is partitioned as you expected. Possibly you could then repartition each of those RDDs into a single partition and then union them. I think that would achieve what you expect. But it would be easy to accidentally screw this up (have some operation that causes a shuffle), so I think you're better off just leaving them as separate RDDs. On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia mchett...@rocketfuelinc.com javascript:_e(%7B%7D,'cvml','mchett...@rocketfuelinc.com'); wrote: Hi, I have a set of input files for a spark program, with each file corresponding to a logical data partition. What is the API/mechanism to assign each input file (or a set of files) to a spark partition, when initializing RDDs? When i create a spark RDD pointing to the directory of files, my understanding is it's not guaranteed that each input file will be treated as separate partition. My job semantics require that the data is partitioned, and i want to leverage the partitioning that has already been done, rather than repartitioning again in the spark job. I tried to lookup online but haven't found any pointers so far. Thanks pala -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io javascript:_e(%7B%7D,'cvml','daniel.siegm...@velos.io'); W: www.velos.io -- - Rishi
Re: Assigning input files to spark partitions
I believe Rishi is correct. I wouldn't rely on that though - all it would take is for one file to exceed the block size and you'd be setting yourself up for pain. Also, if your files are small - small enough to fit in a single record - you could use SparkContext.wholeTextFile. On Thu, Nov 13, 2014 at 10:11 AM, Rishi Yadav ri...@infoobjects.com wrote: If your data is in hdfs and you are reading as textFile and each file is less than block size, my understanding is it would always have one partition per file. On Thursday, November 13, 2014, Daniel Siegmann daniel.siegm...@velos.io wrote: Would it make sense to read each file in as a separate RDD? This way you would be guaranteed the data is partitioned as you expected. Possibly you could then repartition each of those RDDs into a single partition and then union them. I think that would achieve what you expect. But it would be easy to accidentally screw this up (have some operation that causes a shuffle), so I think you're better off just leaving them as separate RDDs. On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I have a set of input files for a spark program, with each file corresponding to a logical data partition. What is the API/mechanism to assign each input file (or a set of files) to a spark partition, when initializing RDDs? When i create a spark RDD pointing to the directory of files, my understanding is it's not guaranteed that each input file will be treated as separate partition. My job semantics require that the data is partitioned, and i want to leverage the partitioning that has already been done, rather than repartitioning again in the spark job. I tried to lookup online but haven't found any pointers so far. Thanks pala -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io -- - Rishi -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Assigning input files to spark partitions
Thanks for the responses Daniel and Rishi. No i don't want separate RDD because each of these partitions are being processed the same way (in my case, each partition corresponds to HBase keys belonging to one region server, and i will do HBase lookups). After that i have aggregations too, hence all these partitions should be in the same RDD. The reason to follow the partition structure is to limit concurrent HBase lookups targeting a single region server. Not sure what the block size is here (HDFS block size?), but my files may get large over time, so cannot depend on block size assumption. That said, from your description, it seems like i don't have to worry too much because Spark does assign files to partitions while maintaining 'locality' (i.e. a given file's data would fit in ceil(filesize/blocksize) partitions, as opposed to spread across numerous partitions). Yes, i saw the wholeTextFile(), it won't apply in my case because input file size can be quite large. On Thu, Nov 13, 2014 at 8:04 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: I believe Rishi is correct. I wouldn't rely on that though - all it would take is for one file to exceed the block size and you'd be setting yourself up for pain. Also, if your files are small - small enough to fit in a single record - you could use SparkContext.wholeTextFile. On Thu, Nov 13, 2014 at 10:11 AM, Rishi Yadav ri...@infoobjects.com wrote: If your data is in hdfs and you are reading as textFile and each file is less than block size, my understanding is it would always have one partition per file. On Thursday, November 13, 2014, Daniel Siegmann daniel.siegm...@velos.io wrote: Would it make sense to read each file in as a separate RDD? This way you would be guaranteed the data is partitioned as you expected. Possibly you could then repartition each of those RDDs into a single partition and then union them. I think that would achieve what you expect. But it would be easy to accidentally screw this up (have some operation that causes a shuffle), so I think you're better off just leaving them as separate RDDs. On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I have a set of input files for a spark program, with each file corresponding to a logical data partition. What is the API/mechanism to assign each input file (or a set of files) to a spark partition, when initializing RDDs? When i create a spark RDD pointing to the directory of files, my understanding is it's not guaranteed that each input file will be treated as separate partition. My job semantics require that the data is partitioned, and i want to leverage the partitioning that has already been done, rather than repartitioning again in the spark job. I tried to lookup online but haven't found any pointers so far. Thanks pala -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io -- - Rishi -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io
Re: Assigning input files to spark partitions
On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote No i don't want separate RDD because each of these partitions are being processed the same way (in my case, each partition corresponds to HBase keys belonging to one region server, and i will do HBase lookups). After that i have aggregations too, hence all these partitions should be in the same RDD. The reason to follow the partition structure is to limit concurrent HBase lookups targeting a single region server. Neither of these is necessarily a barrier to using separate RDDs. You can define the function you want to use and then pass it to multiple map methods. Then you could union all the RDDs to do your aggregations. For example, it might look something like this: val paths: String = ... // the paths to the files you want to load def myFunc(t: T) = ... // the function to apply to every RDD val rdds = paths.map { path = sc.textFile(path).map(myFunc) } val completeRdd = sc.union(rdds) Does that make any sense? -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io