Re: RDD.aggregate versus accumulables...
We use Algebird for calculating things like min/max, stddev, variance, etc. https://github.com/twitter/algebird/wiki -Suren On Mon, Nov 17, 2014 at 11:32 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: You should *never* use accumulators for this purpose because you may get incorrect answers. Accumulators can count the same thing multiple times - you cannot rely upon the correctness of the values they compute. See SPARK-732 https://issues.apache.org/jira/browse/SPARK-732 for more info. On Sun, Nov 16, 2014 at 10:06 PM, Segerlind, Nathan L nathan.l.segerl...@intel.com wrote: Hi All. I am trying to get my head around why using accumulators and accumulables seems to be the most recommended method for accumulating running sums, averages, variances and the like, whereas the aggregate method seems to me to be the right one. I have no performance measurements as of yet, but it seems that aggregate is simpler and more intuitive (And it does what one might expect an accumulator to do) whereas the accumulators and accumulables seem to have some extra complications and overhead. So… What’s the real difference between an accumulator/accumulable and aggregating an RDD? When is one method of aggregation preferred over the other? Thanks, Nate -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Play framework
Mohammed, Jumping in for Daniel, we actually address the configuration issue by pulling values from environment variables or command line options. Maybe that may handle at least some of your needs. For the akka issue, here is the akka version we include in build.sbt: com.typesafe.akka %% akka-actor % 2.2.1 -Suren On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.com wrote: Daniel, Thanks for sharing this. It is very helpful. The reason I want to use Spark submit is that it provides more flexibility. For example, with spark-submit, I don’t need to hard code the master info in the code. I can easily change the config without having to change and recompile code. Do you mind sharing the sbt build file for your play app? I tried to build an uber jar using sbt-assembly. It gets built, but when I run it, it throws all sorts of exception. I have seen some blog posts that Spark and Play use different version of the Akka library. So I included Akka in my build.scala file, but still cannot get rid of Akka related exceptions. I suspect that the settings in the build.scala file for my play project is incorrect. Mohammed *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io] *Sent:* Thursday, October 16, 2014 7:15 AM *To:* Mohammed Guller *Cc:* user@spark.apache.org *Subject:* Re: Play framework We execute Spark jobs from a Play application but we don't use spark-submit. I don't know if you really want to use spark-submit, but if not you can just create a SparkContext programmatically in your app. In development I typically run Spark locally. Creating the Spark context is pretty trivial: val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome App) // call conf.set for any other configuration you want val sc = new SparkContext(sparkConf) It is important to keep in mind you cannot have multiple local contexts (you can create them but you'll get odd errors), so if you are running things in parallel within your app (even unit tests) you'd need to share a context in this case. If you are running sequentially you can create a new local context each time, but you must make sure to call SparkContext.stop() when you're done. Running against a cluster is a bit more complicated because you need to add all your dependency jars. I'm not sure how to get this to work with play run. I stick to building the app with play dist and then running against the packaged application, because it very conveniently provides all the dependencies in a lib folder. Here is some code to load all the paths you need from the dist: def libs : Seq[String] = { val libDir = play.api.Play.application.getFile(lib) logger.info(sSparkContext will be initialized with libraries from directory $libDir) return if ( libDir.exists ) { libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(.jar)) } else { throw new IllegalStateException(slib dir is missing: $libDir) } } Creating the context is similar to above, but with this extra line: conf.setJars(libs) I hope this helps. I should note that I don't use play run very much, at least not for when I'm actually executing Spark jobs. So I'm not sure if this integrates properly with that. I have unit tests which execute on Spark and have executed the dist package both locally and on a cluster. To make working with the dist locally easier, I wrote myself a little shell script to unzip and run the dist. On Wed, Oct 15, 2014 at 10:51 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – Has anybody figured out how to integrate a Play application with Spark and run it on a Spark cluster using spark-submit script? I have seen some blogs about creating a simple Play app and running it locally on a dev machine with sbt run command. However, those steps don’t work for Spark-submit. If you have figured out how to build and run a Play app with Spark-submit, I would appreciate if you could share the steps and the sbt settings for your Play app. Thanks, Mohammed -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Spark And Mapr
As Sungwook said, the classpath pointing to the mapr jar is the key for that error. MapR has a Spark install that hopefully makes it easier. I don't have the instructions handy but you can asking their support about it. -Suren On Wed, Oct 1, 2014 at 7:18 PM, Matei Zaharia matei.zaha...@gmail.com wrote: It should just work in PySpark, the same way it does in Java / Scala apps. Matei On Oct 1, 2014, at 4:12 PM, Sungwook Yoon sy...@maprtech.com wrote: Yes.. you should use maprfs:// I personally haven't used pyspark, I just used scala shell or standalone with MapR. I think you need to set classpath right, adding jar like /opt/mapr/hadoop/hadoop-0.20.2/lib/hadoop-0.20.2-dev-core.jar to the classpath in the classpath. Sungwook On Wed, Oct 1, 2014 at 4:09 PM, Addanki, Santosh Kumar santosh.kumar.adda...@sap.com wrote: Hi We would like to do this in PySpark Environment i.e something like test = sc.textFile(maprfs:///user/root/test) or test = sc.textFile(hdfs:///user/root/test) or Currently when we try test = sc.textFile(maprfs:///user/root/test) It throws the error No File-System for scheme: maprfs Best Regards Santosh *From:* Vladimir Rodionov [mailto:vrodio...@splicemachine.com] *Sent:* Wednesday, October 01, 2014 3:59 PM *To:* Addanki, Santosh Kumar *Cc:* user@spark.apache.org *Subject:* Re: Spark And Mapr There is doc on MapR: http://doc.mapr.com/display/MapR/Accessing+MapR-FS+in+Java+Applications -Vladimir Rodionov On Wed, Oct 1, 2014 at 3:00 PM, Addanki, Santosh Kumar santosh.kumar.adda...@sap.com wrote: Hi We were using Horton 2.4.1 as our Hadoop distribution and now switched to MapR Previously to read a text file we would use : test = sc.textFile(\hdfs://10.48.101.111:8020/user/hdfs/test\ http://10.48.101.111:8020/user/hdfs/test%5C) What would be the equivalent of the same for Mapr. Best Regards Santosh -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: All of the tasks have been completed but the Stage is still shown as Active?
History Server is also very helpful. On Thu, Jul 10, 2014 at 7:37 AM, Haopu Wang hw...@qilinsoft.com wrote: I didn't keep the driver's log. It's a lesson. I will try to run it again to see if it happens again. -- *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com] *Sent:* 2014年7月10日 17:29 *To:* user@spark.apache.org *Subject:* Re: All of the tasks have been completed but the Stage is still shown as Active? Do you see any errors in the logs of the driver? On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang hw...@qilinsoft.com wrote: I'm running an App for hours in a standalone cluster. From the data injector and Streaming tab of web ui, it's running well. However, I see quite a lot of Active stages in web ui even some of them have all of their tasks completed. I attach a screenshot for your reference. Do you ever see this kind of behavior? -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Purpose of spark-submit?
Are there any gaps beyond convenience and code/config separation in using spark-submit versus SparkConf/SparkContext if you are willing to set your own config? If there are any gaps, +1 on having parity within SparkConf/SparkContext where possible. In my use case, we launch our jobs programmatically. In theory, we could shell out to spark-submit but it's not the best option for us. So far, we are only using Standalone Cluster mode, so I'm not knowledgeable on the complexities of other modes, though. -Suren On Wed, Jul 9, 2014 at 8:20 AM, Koert Kuipers ko...@tresata.com wrote: not sure I understand why unifying how you submit app for different platforms and dynamic configuration cannot be part of SparkConf and SparkContext? for classpath a simple script similar to hadoop classpath that shows what needs to be added should be sufficient. on spark standalone I can launch a program just fine with just SparkConf and SparkContext. not on yarn, so the spark-launch script must be doing a few things extra there I am missing... which makes things more difficult because I am not sure its realistic to expect every application that needs to run something on spark to be launched using spark-submit. On Jul 9, 2014 3:45 AM, Patrick Wendell pwend...@gmail.com wrote: It fulfills a few different functions. The main one is giving users a way to inject Spark as a runtime dependency separately from their program and make sure they get exactly the right version of Spark. So a user can bundle an application and then use spark-submit to send it to different types of clusters (or using different versions of Spark). It also unifies the way you bundle and submit an app for Yarn, Mesos, etc... this was something that became very fragmented over time before this was added. Another feature is allowing users to set configuration values dynamically rather than compile them inside of their program. That's the one you mention here. You can choose to use this feature or not. If you know your configs are not going to change, then you don't need to set them with spark-submit. On Wed, Jul 9, 2014 at 10:22 AM, Robert James srobertja...@gmail.com wrote: What is the purpose of spark-submit? Does it do anything outside of the standard val conf = new SparkConf ... val sc = new SparkContext ... ? -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Comparative study
I'll respond for Dan. Our test dataset was a total of 10 GB of input data (full production dataset for this particular dataflow would be 60 GB roughly). I'm not sure what the size of the final output data was but I think it was on the order of 20 GBs for the given 10 GB of input data. Also, I can say that when we were experimenting with persist(DISK_ONLY), the size of all RDDs on disk was around 200 GB, which gives a sense of overall transient memory usage with no persistence. In terms of our test cluster, we had 15 nodes. Each node had 24 cores and 2 workers each. Each executor got 14 GB of memory. -Suren On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com wrote: When you say large data sets, how large? Thanks On 07/07/2014 01:39 PM, Daniel Siegmann wrote: From a development perspective, I vastly prefer Spark to MapReduce. The MapReduce API is very constrained; Spark's API feels much more natural to me. Testing and local development is also very easy - creating a local Spark context is trivial and it reads local files. For your unit tests you can just have them create a local context and execute your flow with some test data. Even better, you can do ad-hoc work in the Spark shell and if you want that in your production code it will look exactly the same. Unfortunately, the picture isn't so rosy when it gets to production. In my experience, Spark simply doesn't scale to the volumes that MapReduce will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be better, but I haven't had the opportunity to try them. I find jobs tend to just hang forever for no apparent reason on large data sets (but smaller than what I push through MapReduce). I am hopeful the situation will improve - Spark is developing quickly - but if you have large amounts of data you should proceed with caution. Keep in mind there are some frameworks for Hadoop which can hide the ugly MapReduce with something very similar in form to Spark's API; e.g. Apache Crunch. So you might consider those as well. (Note: the above is with Spark 1.0.0.) On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com wrote: Hello Experts, I am doing some comparative study on the below: Spark vs Impala Spark vs MapREduce . Is it worth migrating from existing MR implementation to Spark? Please share your thoughts and expertise. Thanks, Santosh -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy. __ www.accenture.com -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Comparative study
To clarify, we are not persisting to disk. That was just one of the experiments we did because of some issues we had along the way. At this time, we are NOT using persist but cannot get the flow to complete in Standalone Cluster mode. We do not have a YARN-capable cluster at this time. We agree with what you're saying. Your results are what we were hoping for and expecting. :-) Unfortunately we still haven't gotten the flow to run end to end on this relatively small dataset. It must be something related to our cluster, standalone mode or our flow but as far as we can tell, we are not doing anything unusual. Did you do any custom configuration? Any advice would be appreciated. -Suren On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com wrote: It seems to me that you're not taking full advantage of the lazy evaluation, especially persisting to disk only. While it might be true that the cumulative size of the RDDs looks like it's 300GB, only a small portion of that should be resident at any one time. We've evaluated data sets much greater than 10GB in Spark using the Spark master and Spark with Yarn (cluster -- formerly standalone -- mode). Nice thing about using Yarn is that it reports the actual memory *demand*, not just the memory requested for driver and workers. Processing a 60GB data set through thousands of stages in a rather complex set of analytics and transformations consumed a total cluster resource (divided among all workers and driver) of only 9GB. We were somewhat startled at first by this result, thinking that it would be much greater, but realized that it is a consequence of Spark's lazy evaluation model. This is even with several intermediate computations being cached as input to multiple evaluation paths. Good luck. Kevin On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote: I'll respond for Dan. Our test dataset was a total of 10 GB of input data (full production dataset for this particular dataflow would be 60 GB roughly). I'm not sure what the size of the final output data was but I think it was on the order of 20 GBs for the given 10 GB of input data. Also, I can say that when we were experimenting with persist(DISK_ONLY), the size of all RDDs on disk was around 200 GB, which gives a sense of overall transient memory usage with no persistence. In terms of our test cluster, we had 15 nodes. Each node had 24 cores and 2 workers each. Each executor got 14 GB of memory. -Suren On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com wrote: When you say large data sets, how large? Thanks On 07/07/2014 01:39 PM, Daniel Siegmann wrote: From a development perspective, I vastly prefer Spark to MapReduce. The MapReduce API is very constrained; Spark's API feels much more natural to me. Testing and local development is also very easy - creating a local Spark context is trivial and it reads local files. For your unit tests you can just have them create a local context and execute your flow with some test data. Even better, you can do ad-hoc work in the Spark shell and if you want that in your production code it will look exactly the same. Unfortunately, the picture isn't so rosy when it gets to production. In my experience, Spark simply doesn't scale to the volumes that MapReduce will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be better, but I haven't had the opportunity to try them. I find jobs tend to just hang forever for no apparent reason on large data sets (but smaller than what I push through MapReduce). I am hopeful the situation will improve - Spark is developing quickly - but if you have large amounts of data you should proceed with caution. Keep in mind there are some frameworks for Hadoop which can hide the ugly MapReduce with something very similar in form to Spark's API; e.g. Apache Crunch. So you might consider those as well. (Note: the above is with Spark 1.0.0.) On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com wrote: Hello Experts, I am doing some comparative study on the below: Spark vs Impala Spark vs MapREduce . Is it worth migrating from existing MR implementation to Spark? Please share your thoughts and expertise. Thanks, Santosh -- This message is for the designated recipient only and may contain privileged, proprietary, or otherwise confidential information. If you have received it in error, please notify the sender immediately and delete the original. Any other use of the e-mail by you is prohibited. Where allowed by local law, electronic communications with Accenture and its affiliates, including e-mail and instant messaging (including content), may be scanned by our systems for the purposes of information security and assessment of internal compliance with Accenture policy
Re: Comparative study
How wide are the rows of data, either the raw input data or any generated intermediate data? We are at a loss as to why our flow doesn't complete. We banged our heads against it for a few weeks. -Suren On Tue, Jul 8, 2014 at 2:12 PM, Kevin Markey kevin.mar...@oracle.com wrote: Nothing particularly custom. We've tested with small (4 node) development clusters, single-node pseudoclusters, and bigger, using plain-vanilla Hadoop 2.2 or 2.3 or CDH5 (beta and beyond), in Spark master, Spark local, Spark Yarn (client and cluster) modes, with total memory resources ranging from 4GB to 256GB+. K On 07/08/2014 12:04 PM, Surendranauth Hiraman wrote: To clarify, we are not persisting to disk. That was just one of the experiments we did because of some issues we had along the way. At this time, we are NOT using persist but cannot get the flow to complete in Standalone Cluster mode. We do not have a YARN-capable cluster at this time. We agree with what you're saying. Your results are what we were hoping for and expecting. :-) Unfortunately we still haven't gotten the flow to run end to end on this relatively small dataset. It must be something related to our cluster, standalone mode or our flow but as far as we can tell, we are not doing anything unusual. Did you do any custom configuration? Any advice would be appreciated. -Suren On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com wrote: It seems to me that you're not taking full advantage of the lazy evaluation, especially persisting to disk only. While it might be true that the cumulative size of the RDDs looks like it's 300GB, only a small portion of that should be resident at any one time. We've evaluated data sets much greater than 10GB in Spark using the Spark master and Spark with Yarn (cluster -- formerly standalone -- mode). Nice thing about using Yarn is that it reports the actual memory *demand*, not just the memory requested for driver and workers. Processing a 60GB data set through thousands of stages in a rather complex set of analytics and transformations consumed a total cluster resource (divided among all workers and driver) of only 9GB. We were somewhat startled at first by this result, thinking that it would be much greater, but realized that it is a consequence of Spark's lazy evaluation model. This is even with several intermediate computations being cached as input to multiple evaluation paths. Good luck. Kevin On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote: I'll respond for Dan. Our test dataset was a total of 10 GB of input data (full production dataset for this particular dataflow would be 60 GB roughly). I'm not sure what the size of the final output data was but I think it was on the order of 20 GBs for the given 10 GB of input data. Also, I can say that when we were experimenting with persist(DISK_ONLY), the size of all RDDs on disk was around 200 GB, which gives a sense of overall transient memory usage with no persistence. In terms of our test cluster, we had 15 nodes. Each node had 24 cores and 2 workers each. Each executor got 14 GB of memory. -Suren On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com wrote: When you say large data sets, how large? Thanks On 07/07/2014 01:39 PM, Daniel Siegmann wrote: From a development perspective, I vastly prefer Spark to MapReduce. The MapReduce API is very constrained; Spark's API feels much more natural to me. Testing and local development is also very easy - creating a local Spark context is trivial and it reads local files. For your unit tests you can just have them create a local context and execute your flow with some test data. Even better, you can do ad-hoc work in the Spark shell and if you want that in your production code it will look exactly the same. Unfortunately, the picture isn't so rosy when it gets to production. In my experience, Spark simply doesn't scale to the volumes that MapReduce will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be better, but I haven't had the opportunity to try them. I find jobs tend to just hang forever for no apparent reason on large data sets (but smaller than what I push through MapReduce). I am hopeful the situation will improve - Spark is developing quickly - but if you have large amounts of data you should proceed with caution. Keep in mind there are some frameworks for Hadoop which can hide the ugly MapReduce with something very similar in form to Spark's API; e.g. Apache Crunch. So you might consider those as well. (Note: the above is with Spark 1.0.0.) On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com wrote: Hello Experts, I am doing some comparative study on the below: Spark vs Impala Spark vs MapREduce . Is it worth migrating from existing MR implementation to Spark? Please share your thoughts
Re: Comparative study
Also, our exact same flow but with 1 GB of input data completed fine. -Suren On Tue, Jul 8, 2014 at 2:16 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: How wide are the rows of data, either the raw input data or any generated intermediate data? We are at a loss as to why our flow doesn't complete. We banged our heads against it for a few weeks. -Suren On Tue, Jul 8, 2014 at 2:12 PM, Kevin Markey kevin.mar...@oracle.com wrote: Nothing particularly custom. We've tested with small (4 node) development clusters, single-node pseudoclusters, and bigger, using plain-vanilla Hadoop 2.2 or 2.3 or CDH5 (beta and beyond), in Spark master, Spark local, Spark Yarn (client and cluster) modes, with total memory resources ranging from 4GB to 256GB+. K On 07/08/2014 12:04 PM, Surendranauth Hiraman wrote: To clarify, we are not persisting to disk. That was just one of the experiments we did because of some issues we had along the way. At this time, we are NOT using persist but cannot get the flow to complete in Standalone Cluster mode. We do not have a YARN-capable cluster at this time. We agree with what you're saying. Your results are what we were hoping for and expecting. :-) Unfortunately we still haven't gotten the flow to run end to end on this relatively small dataset. It must be something related to our cluster, standalone mode or our flow but as far as we can tell, we are not doing anything unusual. Did you do any custom configuration? Any advice would be appreciated. -Suren On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com wrote: It seems to me that you're not taking full advantage of the lazy evaluation, especially persisting to disk only. While it might be true that the cumulative size of the RDDs looks like it's 300GB, only a small portion of that should be resident at any one time. We've evaluated data sets much greater than 10GB in Spark using the Spark master and Spark with Yarn (cluster -- formerly standalone -- mode). Nice thing about using Yarn is that it reports the actual memory *demand*, not just the memory requested for driver and workers. Processing a 60GB data set through thousands of stages in a rather complex set of analytics and transformations consumed a total cluster resource (divided among all workers and driver) of only 9GB. We were somewhat startled at first by this result, thinking that it would be much greater, but realized that it is a consequence of Spark's lazy evaluation model. This is even with several intermediate computations being cached as input to multiple evaluation paths. Good luck. Kevin On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote: I'll respond for Dan. Our test dataset was a total of 10 GB of input data (full production dataset for this particular dataflow would be 60 GB roughly). I'm not sure what the size of the final output data was but I think it was on the order of 20 GBs for the given 10 GB of input data. Also, I can say that when we were experimenting with persist(DISK_ONLY), the size of all RDDs on disk was around 200 GB, which gives a sense of overall transient memory usage with no persistence. In terms of our test cluster, we had 15 nodes. Each node had 24 cores and 2 workers each. Each executor got 14 GB of memory. -Suren On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com wrote: When you say large data sets, how large? Thanks On 07/07/2014 01:39 PM, Daniel Siegmann wrote: From a development perspective, I vastly prefer Spark to MapReduce. The MapReduce API is very constrained; Spark's API feels much more natural to me. Testing and local development is also very easy - creating a local Spark context is trivial and it reads local files. For your unit tests you can just have them create a local context and execute your flow with some test data. Even better, you can do ad-hoc work in the Spark shell and if you want that in your production code it will look exactly the same. Unfortunately, the picture isn't so rosy when it gets to production. In my experience, Spark simply doesn't scale to the volumes that MapReduce will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be better, but I haven't had the opportunity to try them. I find jobs tend to just hang forever for no apparent reason on large data sets (but smaller than what I push through MapReduce). I am hopeful the situation will improve - Spark is developing quickly - but if you have large amounts of data you should proceed with caution. Keep in mind there are some frameworks for Hadoop which can hide the ugly MapReduce with something very similar in form to Spark's API; e.g. Apache Crunch. So you might consider those as well. (Note: the above is with Spark 1.0.0.) On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com wrote: Hello Experts, I am doing some
Re: Comparative study
We kind of hijacked Santos' original thread, so apologies for that and let me try to get back to Santos' original question on Map/Reduce versus Spark. I would say it's worth migrating from M/R, with the following thoughts. Just my opinion but I would summarize the latest emails in this thread as Spark can scale to datasets in 10s and 100s of GBs. I've seen some companies talk about TBs of data but I'm unclear if that is for a single flow. At the same time, some folks (like my team) that I've seen on the user group have a lot of difficulty with the same sized datasets, which points to either environmental issues (machines, cluster mode, etc.), nature of the data or nature of the transforms/flow complexity (though Kevin's experience runs counter to the latter, which is very positive) or we are just doing something subtle wrong. My overall opinion right now is Map/Reduce is easier to get working in general on very large, heterogeneous datasets but the programming model for Spark is the right way to go and worth the effort. Libraries like Scoobi, Scrunch and Scalding (and their associated Java versions) provide a Spark-like wrapper around Map/Reduce but my guess is that, since they are limited to Map/Reduce under the covers, they cannot do some of the optimizations that Spark can, such as collapsing several transforms into a single stage. In addition, my company believes that having batch, streaming and SQL (ad hoc querying) on a single platform has worthwhile benefits. We're still relatively new with Spark (a few months), so would also love to hear more from others in the community. -Suren On Tue, Jul 8, 2014 at 2:17 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Also, our exact same flow but with 1 GB of input data completed fine. -Suren On Tue, Jul 8, 2014 at 2:16 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: How wide are the rows of data, either the raw input data or any generated intermediate data? We are at a loss as to why our flow doesn't complete. We banged our heads against it for a few weeks. -Suren On Tue, Jul 8, 2014 at 2:12 PM, Kevin Markey kevin.mar...@oracle.com wrote: Nothing particularly custom. We've tested with small (4 node) development clusters, single-node pseudoclusters, and bigger, using plain-vanilla Hadoop 2.2 or 2.3 or CDH5 (beta and beyond), in Spark master, Spark local, Spark Yarn (client and cluster) modes, with total memory resources ranging from 4GB to 256GB+. K On 07/08/2014 12:04 PM, Surendranauth Hiraman wrote: To clarify, we are not persisting to disk. That was just one of the experiments we did because of some issues we had along the way. At this time, we are NOT using persist but cannot get the flow to complete in Standalone Cluster mode. We do not have a YARN-capable cluster at this time. We agree with what you're saying. Your results are what we were hoping for and expecting. :-) Unfortunately we still haven't gotten the flow to run end to end on this relatively small dataset. It must be something related to our cluster, standalone mode or our flow but as far as we can tell, we are not doing anything unusual. Did you do any custom configuration? Any advice would be appreciated. -Suren On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com wrote: It seems to me that you're not taking full advantage of the lazy evaluation, especially persisting to disk only. While it might be true that the cumulative size of the RDDs looks like it's 300GB, only a small portion of that should be resident at any one time. We've evaluated data sets much greater than 10GB in Spark using the Spark master and Spark with Yarn (cluster -- formerly standalone -- mode). Nice thing about using Yarn is that it reports the actual memory *demand*, not just the memory requested for driver and workers. Processing a 60GB data set through thousands of stages in a rather complex set of analytics and transformations consumed a total cluster resource (divided among all workers and driver) of only 9GB. We were somewhat startled at first by this result, thinking that it would be much greater, but realized that it is a consequence of Spark's lazy evaluation model. This is even with several intermediate computations being cached as input to multiple evaluation paths. Good luck. Kevin On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote: I'll respond for Dan. Our test dataset was a total of 10 GB of input data (full production dataset for this particular dataflow would be 60 GB roughly). I'm not sure what the size of the final output data was but I think it was on the order of 20 GBs for the given 10 GB of input data. Also, I can say that when we were experimenting with persist(DISK_ONLY), the size of all RDDs on disk was around 200 GB, which gives a sense of overall transient memory usage with no persistence. In terms of our test cluster, we had 15 nodes
Re: Comparative study
Aaron, I don't think anyone was saying Spark can't handle this data size, given testimony from the Spark team, Bizo, etc., on large datasets. This has kept us trying different things to get our flow to work over the course of several weeks. Agreed that the first instinct should be what did I do wrong. I believe that is what every person facing this issue has done, in reaching out to the user group repeatedly over the course of the few of months that I've been active here. I also know other companies (all experienced with large production datasets on other platforms) facing the same types of issues - flows that run on subsets of data but not the whole production set. So I think, as you are saying, it points to the need for further diagnostics. And maybe also some type of guidance on typical issues with different types of datasets (wide rows, narrow rows, etc.), flow topologies. etc.? Hard to tell where we are going wrong right now. We've tried many things over the course of 6 weeks or so. I tried to look for the professional services link on databricks.com but didn't find it. ;-) (jk). -Suren On Tue, Jul 8, 2014 at 4:16 PM, Aaron Davidson ilike...@gmail.com wrote: Not sure exactly what is happening but perhaps there are ways to restructure your program for it to work better. Spark is definitely able to handle much, much larger workloads. +1 @Reynold Spark can handle big big data. There are known issues with informing the user about what went wrong and how to fix it that we're actively working on, but the first impulse when a job fails should be what did I do wrong rather than Spark can't handle this workload. Messaging is a huge part in making this clear -- getting things like a job hanging or an out of memory error can be very difficult to debug, and improving this is one of our highest priorties. On Tue, Jul 8, 2014 at 12:47 PM, Reynold Xin r...@databricks.com wrote: Not sure exactly what is happening but perhaps there are ways to restructure your program for it to work better. Spark is definitely able to handle much, much larger workloads. I've personally run a workload that shuffled 300 TB of data. I've also ran something that shuffled 5TB/node and stuffed my disks fairly full that the file system is close to breaking. We can definitely do a better job in Spark to make it output more meaningful diagnosis and more robust with partitions of data that don't fit in memory though. A lot of the work in the next few releases will be on that. On Tue, Jul 8, 2014 at 10:04 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: I'll respond for Dan. Our test dataset was a total of 10 GB of input data (full production dataset for this particular dataflow would be 60 GB roughly). I'm not sure what the size of the final output data was but I think it was on the order of 20 GBs for the given 10 GB of input data. Also, I can say that when we were experimenting with persist(DISK_ONLY), the size of all RDDs on disk was around 200 GB, which gives a sense of overall transient memory usage with no persistence. In terms of our test cluster, we had 15 nodes. Each node had 24 cores and 2 workers each. Each executor got 14 GB of memory. -Suren On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com wrote: When you say large data sets, how large? Thanks On 07/07/2014 01:39 PM, Daniel Siegmann wrote: From a development perspective, I vastly prefer Spark to MapReduce. The MapReduce API is very constrained; Spark's API feels much more natural to me. Testing and local development is also very easy - creating a local Spark context is trivial and it reads local files. For your unit tests you can just have them create a local context and execute your flow with some test data. Even better, you can do ad-hoc work in the Spark shell and if you want that in your production code it will look exactly the same. Unfortunately, the picture isn't so rosy when it gets to production. In my experience, Spark simply doesn't scale to the volumes that MapReduce will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be better, but I haven't had the opportunity to try them. I find jobs tend to just hang forever for no apparent reason on large data sets (but smaller than what I push through MapReduce). I am hopeful the situation will improve - Spark is developing quickly - but if you have large amounts of data you should proceed with caution. Keep in mind there are some frameworks for Hadoop which can hide the ugly MapReduce with something very similar in form to Spark's API; e.g. Apache Crunch. So you might consider those as well. (Note: the above is with Spark 1.0.0.) On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com wrote: Hello Experts, I am doing some comparative study on the below: Spark vs Impala Spark vs MapREduce . Is it worth migrating from existing MR implementation
Re: Spark memory optimization
Using persist() is a sort of a hack or a hint (depending on your perspective :-)) to make the RDD use disk, not memory. As I mentioned though, the disk io has consequences, mainly (I think) making sure you have enough disks to not let io be a bottleneck. Increasing partitions I think is the other common approach people take, from what I've read. For alternatives, if your data is in HDFS or you just want to stick with Map/Reduce, then the higher level abstractions on top of M/R you might want to look at include the following, which have both Scala and Java implementations in some cases. Scalding (Scala API on top of Cascading and it seems is the most active of such projects, at least on the surface) Scoobi Scrunch (Scala wrapper around Crunch) There are other parallel distributed frameworks outside of the Hadoop ecosystem, of course. -Suren On Mon, Jul 7, 2014 at 7:31 AM, Igor Pernek i...@pernek.net wrote: Thanks guys! Actually, I'm not doing any caching (at least I'm not calling cache/persist), do I still need to use the DISK_ONLY storage level? However, I do use reduceByKey and sortByKey. Mayur, you mentioned that sortByKey requires data to fit the memory. Is there any way to work around this (maybe by increasing the number of partitions or something similar?). What alternative would you suggest, if Spark is not the way to go with this kind of scenario. As mentioned, what I like about spark is its high level of abstraction of parallelization. I'm ready to sacrifice speed (if the slowdown is not too big - I'm doing batch processing, nothing real-time) for code simplicity and readability. On Fri, Jul 4, 2014 at 3:16 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: When using DISK_ONLY, keep in mind that disk I/O is pretty high. Make sure you are writing to multiple disks for best operation. And even with DISK_ONLY, we've found that there is a minimum threshold for executor ram (spark.executor.memory), which for us seemed to be around 8 GB. If you find that, with enough disks, you still have errors/exceptions getting the flow to finish, first check iostat to see if disk is the bottleneck. Then, you may want to try tuning some or all of the following, which affect buffers and timeouts. For us, because we did not have enough disks to start out, the io bottleneck caused timeouts and other errors. In the end, IMHO, it's probably best to solve the problem by adding disks than by tuning the parameters, because it seemed that the i/o bottlenecks eventually backed up the processing. //conf.set(spark.shuffle.consolidateFiles,true) //conf.set(spark.shuffle.file.buffer.kb, 200)// does doubling this help? should increase in-memory buffer to decrease disk writes //conf.set(spark.reducer.maxMbInFlight, 96) // does doubling this help? should allow for more simultaneous shuffle data to be read from remotes // because we use disk-only, we should be able to reverse the default memory usage settings //conf.set(spark.shuffle.memoryFraction,0.6) // default 0.3 //conf.set(spark.storage.memoryFraction,0.3) // default 0.6 //conf.set(spark.worker.timeout,180) // akka settings //conf.set(spark.akka.threads, 300) // number of akka actors //conf.set(spark.akka.timeout, 180) // we saw a problem with smaller numbers //conf.set(spark.akka.frameSize, 100) // not sure if we need to up this. Default is 10. //conf.set(spark.akka.batchSize, 30) //conf.set(spark.akka.askTimeout, 30) // supposedly this is important for high cpu/io load // block manager //conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18) //conf.set(spark.blockManagerHeartBeatMs, 8) On Fri, Jul 4, 2014 at 8:52 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: I would go with Spark only if you are certain that you are going to scale out in the near future. You can change the default storage of RDD to DISK_ONLY, that might remove issues around any rdd leveraging memory. Thr are some functions particularly sortbykey that require data to fit in memory to work, so you may be hitting some of those walls too. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek i...@pernek.net wrote: Hi all! I have a folder with 150 G of txt files (around 700 files, on average each 200 MB). I'm using scala to process the files and calculate some aggregate statistics in the end. I see two possible approaches to do that: - manually loop through all the files, do the calculations per file and merge the results in the end - read the whole folder to one RDD, do all the operations on this single RDD and let spark do all the parallelization I'm leaning towards the second approach as it seems
Re: Spark memory optimization
When using DISK_ONLY, keep in mind that disk I/O is pretty high. Make sure you are writing to multiple disks for best operation. And even with DISK_ONLY, we've found that there is a minimum threshold for executor ram (spark.executor.memory), which for us seemed to be around 8 GB. If you find that, with enough disks, you still have errors/exceptions getting the flow to finish, first check iostat to see if disk is the bottleneck. Then, you may want to try tuning some or all of the following, which affect buffers and timeouts. For us, because we did not have enough disks to start out, the io bottleneck caused timeouts and other errors. In the end, IMHO, it's probably best to solve the problem by adding disks than by tuning the parameters, because it seemed that the i/o bottlenecks eventually backed up the processing. //conf.set(spark.shuffle.consolidateFiles,true) //conf.set(spark.shuffle.file.buffer.kb, 200)// does doubling this help? should increase in-memory buffer to decrease disk writes //conf.set(spark.reducer.maxMbInFlight, 96) // does doubling this help? should allow for more simultaneous shuffle data to be read from remotes // because we use disk-only, we should be able to reverse the default memory usage settings //conf.set(spark.shuffle.memoryFraction,0.6) // default 0.3 //conf.set(spark.storage.memoryFraction,0.3) // default 0.6 //conf.set(spark.worker.timeout,180) // akka settings //conf.set(spark.akka.threads, 300) // number of akka actors //conf.set(spark.akka.timeout, 180) // we saw a problem with smaller numbers //conf.set(spark.akka.frameSize, 100) // not sure if we need to up this. Default is 10. //conf.set(spark.akka.batchSize, 30) //conf.set(spark.akka.askTimeout, 30) // supposedly this is important for high cpu/io load // block manager //conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18) //conf.set(spark.blockManagerHeartBeatMs, 8) On Fri, Jul 4, 2014 at 8:52 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: I would go with Spark only if you are certain that you are going to scale out in the near future. You can change the default storage of RDD to DISK_ONLY, that might remove issues around any rdd leveraging memory. Thr are some functions particularly sortbykey that require data to fit in memory to work, so you may be hitting some of those walls too. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek i...@pernek.net wrote: Hi all! I have a folder with 150 G of txt files (around 700 files, on average each 200 MB). I'm using scala to process the files and calculate some aggregate statistics in the end. I see two possible approaches to do that: - manually loop through all the files, do the calculations per file and merge the results in the end - read the whole folder to one RDD, do all the operations on this single RDD and let spark do all the parallelization I'm leaning towards the second approach as it seems cleaner (no need for parallelization specific code), but I'm wondering if my scenario will fit the constraints imposed by my hardware and data. I have one workstation with 16 threads and 64 GB of RAM available (so the parallelization will be strictly local between different processor cores). I might scale the infrastructure with more machines later on, but for now I would just like to focus on tunning the settings for this one workstation scenario. The code I'm using: - reads TSV files, and extracts meaningful data to (String, String, String) triplets - afterwards some filtering, mapping and grouping is performed - finally, the data is reduced and some aggregates are calculated I've been able to run this code with a single file (~200 MB of data), however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded and/or a Java out of heap exception when adding more data (the application breaks with 6GB of data but I would like to use it with 150 GB of data). I guess I would have to tune some parameters to make this work. I would appreciate any tips on how to approach this problem (how to debug for memory demands). I've tried increasing the 'spark.executor.memory' and using a smaller number of cores (the rational being that each core needs some heap space), but this didn't solve my problems. I don't need the solution to be very fast (it can easily run for a few hours even days if needed). I'm also not caching any data, but just saving them to the file system in the end. If you think it would be more feasible to just go with the manual parallelization approach, I could do that as well. Thanks, Igor -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O:
Re: Enable Parsing Failed or Incompleted jobs on HistoryServer (YARN mode)
I've had some odd behavior with jobs showing up in the history server in 1.0.0. Failed jobs do show up but it seems they can show up minutes or hours later. I see in the history server logs messages about bad task ids. But then eventually the jobs show up. This might be your situation. Anecdotally, if you click on the job in the Spark Master GUI after it is done, this may help it show up in the history server faster. Haven't reliably tested this though. May just be a coincidence of timing. -Suren On Wed, Jul 2, 2014 at 8:01 PM, Andrew Lee alee...@hotmail.com wrote: Hi All, I have HistoryServer up and running, and it is great. Is it possible to also enable HsitoryServer to parse failed jobs event by default as well? I get No Completed Applications Found if job fails. - - *= Event Log Location: *hdfs:///user/test01/spark/logs/ No Completed Applications Found = The reason is that it is good to run the HistoryServer to keep track of performance and resource usage for each completed job, but I found it more useful when job fails. I can identify which stage did it fail, etc instead of sipping through the logs from the Resource Manager. The same event log is only available when the Application Master is still active, once the job fails, the Application Master is killed, and I lose the GUI access, even though I have the event log in JSON format, I can't open it with the HistoryServer. This is very helpful especially for long running jobs that last for 2-18 hours that generates Gigabytes of logs. So I have 2 questions: 1. Any reason why we only render completed jobs? Why can't we bring in all jobs and choose from the GUI? Like a time machine to restore the status from the Application Master? ./core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala val logInfos = logDirs .sortBy { dir = getModificationTime(dir) } .map { dir = (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) } .filter { case (dir, info) = info.*applicationComplete* } 2. If I force to touch a file APPLICATION_COMPLETE in the failed job event log folder, will this cause any problem? -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Changing log level of spark
One thing we ran into was that there was another log4j.properties earlier in the classpath. For us, it was in our MapR/Hadoop conf. If that is the case, something like the following could help you track it down. The only thing to watch out for is that you might have to walk up the classloader hierarchy. ClassLoader cl = Thread.currentThread().getContextClassLoader(); URL loc = cl.getResource(/log4j.properties); System.out.println(loc); -Suren On Tue, Jul 1, 2014 at 9:20 AM, Philip Limbeck philiplimb...@gmail.com wrote: We changed the loglevel to DEBUG by replacing every INFO with DEBUG in /root/ephemeral-hdfs/conf/log4j.properties and propagating it to the cluster. There is some DEBUG output visible in both master and worker but nothing really interesting regarding stages or scheduling. Since we expected a little more than that, there could be 2 possibilites: a) There is still some other unknown way to set the loglevel to debug b) There is not that much log output to be expected in this direction, I looked for logDebug (The log wrapper in spark) in github with 84 results, which means that I doubt that there is not much else to expect. We actually just want to have a little more insight into the system behavior especially when using Shark since we ran into some serious concurrency issues with blocking queries. So much for the background why this is important to us. On Thu, Jun 26, 2014 at 3:30 AM, Aaron Davidson ilike...@gmail.com wrote: If you're using the spark-ec2 scripts, you may have to change /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that is added to the classpath before Spark's own conf. On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp wrote: I have a log4j.xml in src/main/resources with ?xml version=1.0 encoding=UTF-8 ? !DOCTYPE log4j:configuration SYSTEM log4j.dtd log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/; [...] root priority value =warn / appender-ref ref=Console / /root /log4j:configuration and that is included in the jar I package with `sbt assembly`. That works fine for me, at least on the driver. Tobias On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com wrote: Hi! According to https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging , changing log-level is just a matter of creating a log4j.properties (which is in the classpath of spark) and changing log level there for the root logger. I did this steps on every node in the cluster (master and worker nodes). However, after restart there is still no debug output as desired, but only the default info log level. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Spark executor error
I unfortunately haven't seen this directly. But some typical things I try when debugging are as follows. Do you see a corresponding error on the other side of that connection (alpinenode7.alpinenow.local)? Or is that the same machine? Also, do the driver logs show any longer stack trace and have you enabled the history server, so you can see some more details about execution? That helps me tremendously. -Suren On Wed, Jun 25, 2014 at 11:08 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: I'm seeing the following message in the log of an executor. Anyone seen this error? After this, the executor seems to lose the cache, and but besides that the whole thing slows down drastically - I.e. it gets stuck in a reduce phase for 40+ minutes, whereas before it was finishing reduces in 2~3 seconds. 14/06/25 19:22:31 WARN SendingConnection: Error writing in connection to ConnectionManagerId(alpinenode7.alpinenow.local,46251) java.lang.NullPointerException at org.apache.spark.network.MessageChunkHeader.buffer$lzycompute(MessageChunkHeader.scala:35) at org.apache.spark.network.MessageChunkHeader.buffer(MessageChunkHeader.scala:32) at org.apache.spark.network.MessageChunk.buffers$lzycompute(MessageChunk.scala:31) at org.apache.spark.network.MessageChunk.buffers(MessageChunk.scala:29) at org.apache.spark.network.SendingConnection.write(Connection.scala:349) at org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:142) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: MLLib inside Storm : silly or not ?
I can't speak for MLlib, too. But I can say the model of training in Hadoop M/R or Spark and production scoring in Storm works very well. My team has done online learning (Sofia ML library, I think) in Storm as well. I would be interested in this answer as well. -Suren On Thu, Jun 19, 2014 at 7:35 AM, Eustache DIEMERT eusta...@diemert.fr wrote: Well, yes VW is an appealing option but I only found experimental integrations so far. Also, early experiments suggest Decision Trees Ensembles (RF, GBT) perform better than generalized linear models on our data. Hence the interest for MLLib :) Any other comments / suggestions welcome :) E/ 2014-06-19 12:37 GMT+02:00 Charles Earl charles.ce...@gmail.com: While I can't definitively speak to MLLib online learning, I'm sure you're evaluating Vowpal Wabbit, for which there's been some storm integrations contributed. Also you might look at factorie, http://factorie.cs.understanding.edu, which at least provides an online lda. C On Thursday, June 19, 2014, Eustache DIEMERT eusta...@diemert.fr wrote: Hi Sparkers, We have a Storm cluster and looking for a decent execution engine for machine learned models. What I've seen from MLLib is extremely positive, but we can't just throw away our Storm based stack. So my question is: is it feasible/recommended to train models in Spark/MLLib and execute them in another Java environment (Storm in this case) ? Thanks for any insights :) Eustache -- - Charles -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Trailing Tasks Saving to HDFS
I've created an issue for this but if anyone has any advice, please let me know. Basically, on about 10 GBs of data, saveAsTextFile() to HDFS hangs on two remaining tasks (out of 320). Those tasks seem to be waiting on data from another task on another node. Eventually (about 2 hours later) they time out with a connection reset by peer. All the data actually seems to be on HDFS as the expected part files. It just seems like the remaining tasks have corrupted metadata, so that they do not realize that they are done. Just a guess though. https://issues.apache.org/jira/browse/SPARK-2202 -Suren On Wed, Jun 18, 2014 at 8:35 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Looks like eventually there was some type of reset or timeout and the tasks have been reassigned. I'm guessing they'll keep failing until max failure count. The machine it disconnected from was a remote machine, though I've seen such failures from connections to itself with other problems. The log lines from the remote machine are also below. Any thoughts or guesses would be appreciated! *HUNG WORKER* 14/06/18 19:41:18 WARN network.ReceivingConnection: Error reading from connection to ConnectionManagerId(172.16.25.103,57626) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:496) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/06/18 19:41:18 INFO network.ConnectionManager: Handling connection error on connection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found *REMOTE WORKER* 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.124,55610) 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found On Wed, Jun 18, 2014 at 7:16 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: I have a flow that ends with saveAsTextFile() to HDFS. It seems all the expected files per partition have been written out, based on the number of part files and the file sizes. But the driver logs show 2 tasks still not completed and has no activity and the worker logs show no activity for those two tasks for a while now. Has anyone run into this situation? It's happened to me a couple of times now. Thanks. -- Suren SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Java IO Stream Corrupted - Invalid Type AC?
Patrick, My team is using shuffle consolidation but not speculation. We are also using persist(DISK_ONLY) for caching. Here are some config changes that are in our work-in-progress. We've been trying for 2 weeks to get our production flow (maybe around 50-70 stages, a few forks and joins with up to 20 branches in the forks) to run end to end without any success, running into other problems besides this one as well. For example, we have run into situations where saving to HDFS just hangs on a couple of tasks, which are printing out nothing in their logs and not taking any CPU. For testing, our input data is 10 GB across 320 input splits and generates maybe around 200-300 GB of intermediate and final data. conf.set(spark.executor.memory, 14g) // TODO make this configurable // shuffle configs conf.set(spark.default.parallelism, 320) // TODO make this configurable conf.set(spark.shuffle.consolidateFiles,true) conf.set(spark.shuffle.file.buffer.kb, 200) conf.set(spark.reducer.maxMbInFlight, 96) conf.set(spark.rdd.compress,true // we ran into a problem with the default timeout of 60 seconds // this is also being set in the master's spark-env.sh. Not sure if it needs to be in both places conf.set(spark.worker.timeout,180) // akka settings conf.set(spark.akka.threads, 300) conf.set(spark.akka.timeout, 180) conf.set(spark.akka.frameSize, 100) conf.set(spark.akka.batchSize, 30) conf.set(spark.akka.askTimeout, 30) // block manager conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18) conf.set(spark.blockManagerHeartBeatMs, 8) -Suren On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell pwend...@gmail.com wrote: Out of curiosity - are you guys using speculation, shuffle consolidation, or any other non-default option? If so that would help narrow down what's causing this corruption. On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matt/Ryan, Did you make any headway on this? My team is running into this also. Doesn't happen on smaller datasets. Our input set is about 10 GB but we generate 100s of GBs in the flow itself. -Suren On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton compton.r...@gmail.com wrote: Just ran into this today myself. I'm on branch-1.0 using a CDH3 cluster (no modifications to Spark or its dependencies). The error appeared trying to run GraphX's .connectedComponents() on a ~200GB edge list (GraphX worked beautifully on smaller data). Here's the stacktrace (it's quite similar to yours https://imgur.com/7iBA4nJ ). 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed 4 times; aborting job 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at VertexRDD.scala:100 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.599:39 failed 4 times, most recent failure: Exception failure in TID 29735 on host node18: java.io.StreamCorruptedException: invalid type code: AC java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355) java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192) org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51
Trailing Tasks Saving to HDFS
I have a flow that ends with saveAsTextFile() to HDFS. It seems all the expected files per partition have been written out, based on the number of part files and the file sizes. But the driver logs show 2 tasks still not completed and has no activity and the worker logs show no activity for those two tasks for a while now. Has anyone run into this situation? It's happened to me a couple of times now. Thanks. -- Suren SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Trailing Tasks Saving to HDFS
Looks like eventually there was some type of reset or timeout and the tasks have been reassigned. I'm guessing they'll keep failing until max failure count. The machine it disconnected from was a remote machine, though I've seen such failures from connections to itself with other problems. The log lines from the remote machine are also below. Any thoughts or guesses would be appreciated! *HUNG WORKER* 14/06/18 19:41:18 WARN network.ReceivingConnection: Error reading from connection to ConnectionManagerId(172.16.25.103,57626) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:496) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/06/18 19:41:18 INFO network.ConnectionManager: Handling connection error on connection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found *REMOTE WORKER* 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.124,55610) 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found On Wed, Jun 18, 2014 at 7:16 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: I have a flow that ends with saveAsTextFile() to HDFS. It seems all the expected files per partition have been written out, based on the number of part files and the file sizes. But the driver logs show 2 tasks still not completed and has no activity and the worker logs show no activity for those two tasks for a while now. Has anyone run into this situation? It's happened to me a couple of times now. Thanks. -- Suren SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: GroupByKey results in OOM - Any other alternative
Vivek, If the foldByKey solution doesn't work for you, my team uses RDD.persist(DISK_ONLY) to avoid OOM errors. It's slower, of course, and requires tuning other config parameters. It can also be a problem if you do not have enough disk space, meaning that you have to unpersist at the right points if you are running long flows. For us, even though the disk writes are a performance hit, we prefer the Spark programming model to Hadoop M/R. But we are still working on getting this to work end to end on 100s of GB of data on our 16-node cluster. Suren On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote: Thanks for the input. I will give foldByKey a shot. The way I am doing is, data is partitioned hourly. So I am computing distinct values hourly. Then I use unionRDD to merge them and compute distinct on the overall data. Is there a way to know which key,value pair is resulting in the OOM ? Is there a way to set parallelism in the map stage so that, each worker will process one key at time. ? I didn't realise countApproxDistinctByKey is using hyperloglogplus. This should be interesting. --Vivek On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote: Grouping by key is always problematic since a key might have a huge number of values. You can do a little better than grouping *all* values and *then* finding distinct values by using foldByKey, putting values into a Set. At least you end up with only distinct values in memory. (You don't need two maps either, right?) If the number of distinct values is still huge for some keys, consider the experimental method countApproxDistinctByKey: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285 This should be much more performant at the cost of some accuracy. On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote: Hi, For last couple of days I have been trying hard to get around this problem. Please share any insights on solving this problem. Problem : There is a huge list of (key, value) pairs. I want to transform this to (key, distinct values) and then eventually to (key, distinct values count) On small dataset groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1, x._2.distinct.count)) On large data set I am getting OOM. Is there a way to represent Seq of values from groupByKey as RDD and then perform distinct over it ? Thanks Vivek -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: long GC pause during file.cache()
Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0? On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you don’t mind the WARNING in the logs you can set spark.executor.extraJavaOpts in your SparkConf obj Best, -- Nan Zhu On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote: Hi, Wei You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center *http://researcher.ibm.com/person/us-wtan* http://researcher.ibm.com/person/us-wtan -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Error During ReceivingConnection
It looks like this was due to another executor on a different node closing the connection on its side. I found the entries below in the remote side's logs. Can anyone comment on why one ConnectionManager would close its connection to another node and what could be tuned to avoid this? It did not have any errors on its side. This is from the ConnectionManager on the side shutting down the connection, not the ConnectionManager that had the Connection Reset By Peer. 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.125,45610) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.125,45610) On Wed, Jun 11, 2014 at 8:38 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: I have a somewhat large job (10 GB input data but generates about 500 GB of data after many stages). Most tasks completed but a few stragglers on the same node/executor are still active (but doing nothing) after about 16 hours. At about 3 to 4 hours in, the tasks that are hanging have the following in the work logs. Any idea what config to tweak for this? 14/06/10 18:51:10 WARN network.ReceivingConnection: Error reading from connection to ConnectionManagerId(172.16.25.108,37693) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:534) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/06/10 18:51:10 INFO network.ConnectionManager: Handling connection error on connection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/06/10 18:51:10 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.108,37693) 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/06/10 18:51:14 WARN network.ReceivingConnection: Error reading from connection to ConnectionManagerId(172.16.25.97,54918) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:534) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/06/10 18:51:14 INFO network.ConnectionManager: Handling connection error on connection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found 14/06/10 18:51:14 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.97,54918) 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira
Re: Spark Logging
Event logs are different from writing using a logger, like log4j. The event logs are the type of data showing up in the history server. For my team, we use com.typesafe.scalalogging.slf4j.Logging. Our logs show up in /etc/spark/work/app-id/executor-id/stderr and stdout. All of our logging seems to show up in stderr. -Suren On Tue, Jun 10, 2014 at 2:56 PM, coderxiang shuoxiang...@gmail.com wrote: By default, the logs are available at `/tmp/spark-events`. You can specify the log directory via spark.eventLog.dir, see this configuration page http://spark.apache.org/docs/latest/configuration.html . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Logging-tp7340p7343.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
FileNotFoundException when using persist(DISK_ONLY)
I have a dataset of about 10GB. I am using persist(DISK_ONLY) to avoid out of memory issues when running my job. When I run with a dataset of about 1 GB, the job is able to complete. But when I run with the larger dataset of 10 GB, I get the following error/stacktrace, which seems to be happening when the RDD is writing out to disk. Anyone have any ideas as to what is going on or if there is a setting I can tune? 14/06/09 21:33:55 ERROR executor.Executor: Exception in task ID 560 java.io.FileNotFoundException: /tmp/spark-local-20140609210741-0bb8/14/rdd_331_175 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:209) at java.io.FileOutputStream.init(FileOutputStream.java:160) at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:79) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:698) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:95) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: FileNotFoundException when using persist(DISK_ONLY)
I don't know if this is related but a little earlier in stderr, I also have the following stacktrace. But this stacktrace seems to be when the code is grabbing RDD data from a remote node, which is different from the above. 14/06/09 21:33:26 ERROR executor.ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-16,5,main] java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:329) at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94) at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) at org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109) at org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487) at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:473) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:513) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) On Mon, Jun 9, 2014 at 10:05 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: I have a dataset of about 10GB. I am using persist(DISK_ONLY) to avoid out of memory issues when running my job. When I run with a dataset of about 1 GB, the job is able to complete. But when I run with the larger dataset of 10 GB, I get the following error/stacktrace, which seems to be happening when the RDD is writing out to disk. Anyone have any ideas as to what is going on or if there is a setting I can tune? 14/06/09 21:33:55 ERROR executor.Executor: Exception in task ID 560 java.io.FileNotFoundException: /tmp/spark-local-20140609210741-0bb8/14/rdd_331_175 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:209) at java.io.FileOutputStream.init(FileOutputStream.java:160) at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:79) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:698) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:95) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679) -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F
Re: FileNotFoundException when using persist(DISK_ONLY)
Sorry for the stream of consciousness but after thinking about this a bit more, I'm thinking that the FileNotFoundExceptions are due to tasks being cancelled/restarted and the root cause is the OutOfMemoryError. If anyone has any insights on how to debug this more deeply or relevant config settings, that would be much appreciated. Otherwise, I figure next steps would be to enable more debugging levels in the spark code to see what much memory the code is trying to allocate. At this point, I'm wondering if the block could be in the GB range. -Suren On Mon, Jun 9, 2014 at 10:27 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: I don't know if this is related but a little earlier in stderr, I also have the following stacktrace. But this stacktrace seems to be when the code is grabbing RDD data from a remote node, which is different from the above. 14/06/09 21:33:26 ERROR executor.ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-16,5,main] java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:329) at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94) at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) at org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109) at org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489) at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487) at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:473) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:513) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) On Mon, Jun 9, 2014 at 10:05 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: I have a dataset of about 10GB. I am using persist(DISK_ONLY) to avoid out of memory issues when running my job. When I run with a dataset of about 1 GB, the job is able to complete. But when I run with the larger dataset of 10 GB, I get the following error/stacktrace, which seems to be happening when the RDD is writing out to disk. Anyone have any ideas as to what is going on or if there is a setting I can tune? 14/06/09 21:33:55 ERROR executor.Executor: Exception in task ID 560 java.io.FileNotFoundException: /tmp/spark-local-20140609210741-0bb8/14/rdd_331_175 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:209) at java.io.FileOutputStream.init(FileOutputStream.java:160) at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:79) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:698) at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:95) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99
Re: Spark 1.0.0 - Java 8
With respect to virtual hosts, my team uses Vagrant/Virtualbox. We have 3 CentOS VMs with 4 GB RAM each - 2 worker nodes and a master node. Everything works fine, though if you are using MapR, you have to make sure they are all on the same subnet. -Suren On Fri, May 30, 2014 at 12:20 PM, Upender Nimbekar upent...@gmail.com wrote: Great News ! I've been awaiting this release to start doing some coding with Spark using Java 8. Can I run Spark 1.0 examples on a virtual host with 16 GB ram and fair descent amount of hard disk ? Or do I reaaly need to use a cluster of machines. Second, are there any good exmaples of using MLIB on Spark. Please shoot me in the right direction. Thanks Upender On Fri, May 30, 2014 at 6:12 AM, Patrick Wendell pwend...@gmail.com wrote: I'm thrilled to announce the availability of Spark 1.0.0! Spark 1.0.0 is a milestone release as the first in the 1.0 line of releases, providing API stability for Spark's core interfaces. Spark 1.0.0 is Spark's largest release ever, with contributions from 117 developers. I'd like to thank everyone involved in this release - it was truly a community effort with fixes, features, and optimizations contributed from dozens of organizations. This release expands Spark's standard libraries, introducing a new SQL package (SparkSQL) which lets users integrate SQL queries into existing Spark workflows. MLlib, Spark's machine learning library, is expanded with sparse vector support and several new algorithms. The GraphX and Streaming libraries also introduce new features and optimizations. Spark's core engine adds support for secured YARN clusters, a unified tool for submitting Spark applications, and several performance and stability improvements. Finally, Spark adds support for Java 8 lambda syntax and improves coverage of the Java and Python API's. Those features only scratch the surface - check out the release notes here: http://spark.apache.org/releases/spark-release-1-0-0.html Note that since release artifacts were posted recently, certain mirrors may not have working downloads for a few hours. - Patrick -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: maprfs and spark libraries
My team is successfully running on Spark on MapR. However, we add the mapr jars to the SPARK_CLASSAPTH on the workers, as well as making sure they are on the classpath of the driver. I'm not sure if we need every jar that we currently add but below is what we currently use. The important file in the conf directory is mapr-clusters.conf. I do not think we add the jars to the classpath via addJars and prefer to reference them from the mapr installation. This is partly because we colocate our workers on our mapr worker nodes. export SPARK_CLASSPATH=/opt/mapr/hadoop/hadoop-0.20.2/conf:/opt/mapr/hadoop/hadoop-0.20.2/lib/hadoop-0.20.2-dev-core.jar:/opt/mapr/lib/maprfs-1.0.3-mapr-3.0.2.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/commons-logging-1.0.4.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/maprfs-1.0.3-mapr-3.0.2.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/zookeeper-3.3.6.jar On Mon, May 26, 2014 at 10:50 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: Did you try in standalone mode. You may not see serialization issues in local threaded mode. Serialization errors are unlikely to be cause of Mapr hadoop version. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Mon, May 26, 2014 at 3:18 PM, nelson nelson.verd...@ysance.com wrote: Hi all, I am trying to run spark over a MapR cluster. I successfully ran several custom applications on a previous non-mapr hadoop cluster but i can't get them working on the mapr one. To be more specific, i am not able to read or write on mfs without running into a serialization error from Java. Note that everything works fine when i am running the app in local mode which make me think of a dependancy error. The test application is built using sbt with the following dependency: - org.apache.spark spark-core 0.9.1 In my test_app/lib directory i have: - hadoop-core-1.0.3-mapr-3.0.2.jar - json-20140107.jar - maprfs-1.0.3-mapr-3.0.2.jar Finally, i add those jars with conf.setJars so that they are distributed on the cluster. Am I compiling with the wrong dependencies? Should i get a mapr version of spark-core? Regards, Nelson -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: maprfs and spark libraries
We use the mapr rpm and have successfully read and written hdfs data. Are you using custom readers/writers? Maybe the relevant stacktrace might help. Maybe also try a standard text reader and writer to see if there is a basic issue with accessing mfs? -Suren On Mon, May 26, 2014 at 11:31 AM, nelson nelson.verd...@ysance.com wrote: thanks for replying guys. Mayur: Indeed i tried the local mode (sparkmaster: local[5]) before and the application runs well, no serialization problem. The problem arises as soon as i try to run the app over the cluster. Surendranauth: I just double checked my spark_classpath from spark_env.sh and we have a similar one. Is your team running spark with the mapr rpm [1]? Or did you compile it yourself? [1]: http://doc.mapr.com/display/MapR/Installing+Spark+and+Shark http://doc.mapr.com/display/MapR/Installing+Spark+and+Shark Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392p6400.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: maprfs and spark libraries
When I have stack traces, I usually see the MapR versions of the various hadoop classes, though maybe that's at a deeper level of the stack trace. If my memory is right though, this may point to the classpath having regular hadoop jars before the standard hadoop jars. My guess is that this is on the driver side, if so. I had to make sure to put the MapR jars as the very first entries on my driver classpath. -Suren On Mon, May 26, 2014 at 12:11 PM, nelson nelson.verd...@ysance.com wrote: I am using standard readers and writers i believe. When i locally run the app, spark is able to write on hdfs. Then i assume accessing and reading mfs is doable. Here is the piece of code i use for testing: /val list = List (dad, mum, brother , sister) val mlist = sc.parallelize(list) mlist.saveAsTextFile(maprfs:///user/nelson/test)/ and the stack trace: /14/05/26 16:02:54 WARN scheduler.TaskSetManager: Loss was due to java.lang.NullPointerException java.lang.NullPointerException at org.apache.hadoop.fs.FileSystem.fixName(FileSystem.java:187) at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:123) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:115) at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:617) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:439) at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:412) at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:391) at org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:391) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$1.apply(HadoopRDD.scala:111) at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$1.apply(HadoopRDD.scala:111) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:154) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)/ The uri name seems to be the issue now as i happen to get rid of the serialization issue. Regards, Nelson -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392p6402.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Anyone using value classes in RDDs?
If the purpose is only aliasing, rather than adding additional methods and avoiding runtime allocation, what about type aliases? type ID = String type Name = String On Sat, Apr 19, 2014 at 9:26 PM, kamatsuoka ken...@gmail.com wrote: No, you can wrap other types in value classes as well. You can try it in the REPL: scala case class ID(val id: String) extends AnyVal defined class ID scala val i = ID(foo) i: ID = ID(foo) On Fri, Apr 18, 2014 at 4:14 PM, Koert Kuipers [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=4494i=0wrote: isn't valueclasses for primitives (AnyVal) only? that doesn't apply to string, which is an object (AnyRef) On Fri, Apr 18, 2014 at 2:51 PM, kamatsuoka [hidden email]http://user/SendEmail.jtp?type=nodenode=4475i=0 wrote: I'm wondering if anyone has tried using value classes in RDDs? My use case is that I have a number of RDDs containing strings, e.g. val r1: RDD[(String, (String, Int)] = ... val r2: RDD[(String, (String, Int)] = ... and it might be clearer if I wrote case class ID(val id: String) extends AnyVal case class Name(val id: String) extends AnyVal val r1: RDD[(ID, (Name, Int)] = ... val r2: RDD[(Name, (ID, Int)] = ... This seems like a pretty typical use case for value classes, but I haven't noticed anyone talking about it. Although, I think you'd have to read through all of the Spark code paths to know whether allocation is required (http://docs.scala-lang.org/overviews/core/value-classes.html), so some comparative performance testing would be called for. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4475.html To unsubscribe from Anyone using value classes in RDDs?, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Kenji -- View this message in context: Re: Anyone using value classes in RDDs?http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4494.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Anyone using value classes in RDDs?
Oh, sorry, I think your point was probably you wouldn't need runtime allocation. I guess that is the key question. I would be interested if this works for you. -Suren On Sun, Apr 20, 2014 at 9:18 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: If the purpose is only aliasing, rather than adding additional methods and avoiding runtime allocation, what about type aliases? type ID = String type Name = String On Sat, Apr 19, 2014 at 9:26 PM, kamatsuoka ken...@gmail.com wrote: No, you can wrap other types in value classes as well. You can try it in the REPL: scala case class ID(val id: String) extends AnyVal defined class ID scala val i = ID(foo) i: ID = ID(foo) On Fri, Apr 18, 2014 at 4:14 PM, Koert Kuipers [via Apache Spark User List] [hidden email] http://user/SendEmail.jtp?type=nodenode=4494i=0 wrote: isn't valueclasses for primitives (AnyVal) only? that doesn't apply to string, which is an object (AnyRef) On Fri, Apr 18, 2014 at 2:51 PM, kamatsuoka [hidden email]http://user/SendEmail.jtp?type=nodenode=4475i=0 wrote: I'm wondering if anyone has tried using value classes in RDDs? My use case is that I have a number of RDDs containing strings, e.g. val r1: RDD[(String, (String, Int)] = ... val r2: RDD[(String, (String, Int)] = ... and it might be clearer if I wrote case class ID(val id: String) extends AnyVal case class Name(val id: String) extends AnyVal val r1: RDD[(ID, (Name, Int)] = ... val r2: RDD[(Name, (ID, Int)] = ... This seems like a pretty typical use case for value classes, but I haven't noticed anyone talking about it. Although, I think you'd have to read through all of the Spark code paths to know whether allocation is required (http://docs.scala-lang.org/overviews/core/value-classes.html), so some comparative performance testing would be called for. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4475.html To unsubscribe from Anyone using value classes in RDDs?, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Kenji -- View this message in context: Re: Anyone using value classes in RDDs?http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4494.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: standalone vs YARN
Prashant, In another email thread several weeks ago, it was mentioned that YARN support is considered beta until Spark 1.0. Is that not the case? -Suren On Tue, Apr 15, 2014 at 8:38 AM, Prashant Sharma scrapco...@gmail.comwrote: Hi Ishaaq, answers inline from what I know, I had like to be corrected though. On Tue, Apr 15, 2014 at 5:58 PM, ishaaq ish...@gmail.com wrote: Hi all, I am evaluating Spark to use here at my work. We have an existing Hadoop 1.x install which I planning to upgrade to Hadoop 2.3. This is not really a requirement for spark, if you are doing for some other reason great ! I am trying to work out whether I should install YARN or simply just setup a Spark standalone cluster. We already use ZooKeeper so it isn't a problem to setup HA. I am puzzled however as to how the Spark nodes can coordinate on data locality - i.e., assuming I install the nodes on the same machines as the DFS data nodes, I don't understand how Spark can work out which nodes should get which splits of the jobs? This happens exactly the same way hadoop's mapreduce figures out data locality. Since we support hadoop's inputformats(which also has the information on how data is partitioned) etc. So having spark workers share the same nodes as your DFS is a good idea. Anyway, my bigger question remains: YARN or standalone? Which is the more stable option currently? Which is the more future-proof option? Well I think standalone is stable enough for all purposes and Spark's yarn support has been keeping up with latest hadoop versions too. It depends on the fact that if you are already using yarn and don't want the hassle of setting up another cluster manager you can probably prefer yarn. Thanks, Ishaaq -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/standalone-vs-YARN-tp4271.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Spark - ready for prime time?
Excellent, thanks you. On Fri, Apr 11, 2014 at 12:09 PM, Matei Zaharia matei.zaha...@gmail.comwrote: It's not a new API, it just happens underneath the current one if you have spark.shuffle.spill set to true (which it is by default). Take a look at the config settings that mention spill in http://spark.incubator.apache.org/docs/latest/configuration.html. Matei On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matei, Where is the functionality in 0.9 to spill data within a task (separately from persist)? My apologies if this is something obvious but I don't see it in the api docs. -Suren On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia matei.zaha...@gmail.comwrote: To add onto the discussion about memory working space, 0.9 introduced the ability to spill data within a task to disk, and in 1.0 we're also changing the interface to allow spilling data within the same *group* to disk (e.g. when you do groupBy and get a key with lots of values). The main reason these weren't there was that for a lot of workloads (everything except the same key having lots of values), simply launching more reduce tasks was also a good solution, because it results in an external sort across the cluster similar to what would happen within a task. Overall, expect to see more work to both explain how things execute ( http://spark.incubator.apache.org/docs/latest/tuning.html is one example, the monitoring UI is another) and try to make things require no configuration out of the box. We're doing a lot of this based on user feedback, so that's definitely appreciated. Matei On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote: The biggest issue I've come across is that the cluster is somewhat unstable when under memory pressure. Meaning that if you attempt to persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll often still get OOMs. I had to carefully modify some of the space tuning parameters and GC settings to get some jobs to even finish. The other issue I've observed is if you group on a key that is highly skewed, with a few massively-common keys and a long tail of rare keys, the one massive key can be too big for a single machine and again cause OOMs. My take on it -- Spark doesn't believe in sort-and-spill things to enable super long groups, and IMO for a good reason. Here are my thoughts: (1) in my work i don't need sort in 99% of the cases, i only need group which absolutely doesn't need the spill which makes things slow down to a crawl. (2) if that's an aggregate (such as group count), use combine(), not groupByKey -- this will do tons of good on memory use. (3) if you really need groups that don't fit into memory, that is always because you want to do something that is other than aggregation, with them. E,g build an index of that grouped data. we actually had a case just like that. In this case your friend is really not groupBy, but rather PartitionBy. I.e. what happens there you build a quick count sketch, perhaps on downsampled data, to figure which keys have sufficiently big count -- and then you build a partitioner that redirects large groups to a dedicated map(). assuming this map doesn't try to load things in memory but rather do something like streaming BTree build, that should be fine. In certain cituations such processing may require splitting super large group even into smaller sub groups (e.g. partitioned BTree structure), at which point you should be fine even from uniform load point of view. It takes a little of jiu-jitsu to do it all, but it is not Spark's fault here, it did not promise do this all for you in the groupBy contract. I'm hopeful that off-heap caching (Tachyon) could fix some of these issues. Just my personal experience, but I've observed significant improvements in stability since even the 0.7.x days, so I'm confident that things will continue to get better as long as people report what they're seeing so it can get fixed. Andrew On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert alex.boisv...@gmail.comwrote: I'll provide answers from our own experience at Bizo. We've been using Spark for 1+ year now and have found it generally better than previous approaches (Hadoop + Hive mostly). On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: I. Is it too much magic? Lots of things just work right in Spark and it's extremely convenient and efficient when it indeed works. But should we be worried that customization is hard if the built in behavior is not quite right for us? Are we to expect hard to track down issues originating from the black box behind the magic? I think is goes back to understanding Spark's architecture, its design constraints and the problems it explicitly set out to address. If the solution
Re: Spark Disk Usage
Hi, Any thoughts on this? Thanks. -Suren On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, I know if we call persist with the right options, we can have Spark persist an RDD's data on disk. I am wondering what happens in intermediate operations that could conceivably create large collections/Sequences, like GroupBy and shuffling. Basically, one part of the question is when is disk used internally? And is calling persist() on the RDD returned by such transformations what let's it know to use disk in those situations? Trying to understand if persist() is applied during the transformation or after it. Thank you. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
PySpark SocketConnect Issue in Cluster
Hi, We have a situation where a Pyspark script works fine as a local process (local url) on the Master and the Worker nodes, which would indicate that all python dependencies are set up properly on each machine. But when we try to run the script at the cluster level (using the master's url), if fails partway through the flow on a GroupBy with a SocketConnect error and python crashes. This is on ec2 using the AMI. This doesn't seem to be an issue of the master not seeing the workers, since they show up in the web ui. Also, we can see the job running on the cluster until it reaches the GroupBy transform step, which is when we get the SocketConnect error. Any ideas? -Suren SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Spark Disk Usage
It might help if I clarify my questions. :-) 1. Is persist() applied during the transformation right before the persist() call in the graph? Or is is applied after the transform's processing is complete? In the case of things like GroupBy, is the Seq backed by disk as it is being created? We're trying to get a sense of how the processing is handled behind the scenes with respect to disk. 2. When else is disk used internally? Any pointers are appreciated. -Suren On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, Any thoughts on this? Thanks. -Suren On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, I know if we call persist with the right options, we can have Spark persist an RDD's data on disk. I am wondering what happens in intermediate operations that could conceivably create large collections/Sequences, like GroupBy and shuffling. Basically, one part of the question is when is disk used internally? And is calling persist() on the RDD returned by such transformations what let's it know to use disk in those situations? Trying to understand if persist() is applied during the transformation or after it. Thank you. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Spark Disk Usage
Hi, I know if we call persist with the right options, we can have Spark persist an RDD's data on disk. I am wondering what happens in intermediate operations that could conceivably create large collections/Sequences, like GroupBy and shuffling. Basically, one part of the question is when is disk used internally? And is calling persist() on the RDD returned by such transformations what let's it know to use disk in those situations? Trying to understand if persist() is applied during the transformation or after it. Thank you. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Accessing the reduce key
Mayur, To be a little clearer, for creating the Bloom Filters, I don't think broadcast variables are the way to go, though definitely that would work for using the Bloom Filters to filter data. The reason why is that the creation needs to happen in a single thread. Otherwise, some type of locking/distributed locking is needed on the individual Bloom Filter itself, with performance impact. Agreed? -Suren On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Mayur, Thanks. This step is for creating the Bloom Filter, not using it to filter data, actually. But your answer still stands. Partitioning by key, having the bloom filters as a broadcast variable and then doing mappartition makes sense. Are there performance implications for this approach, such as with using the broadcast variable, versus the approach we used, in which the Bloom Filter (again, for creating it) is only referenced by the single map application? -Suren On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Why are you trying to reducebyKey? Are you looking to work on the data sequentially. If I understand correctly you are looking to filter your data using the bloom filter each bloom filter is tied to which key is instantiating it. Following are some of the options *partiition* your data by key use mappartition operator to run function on partition independently. The same function will be applied to each partition. If your bloomfilter is large then you can bundle all of them in as a broadcast variable use it to apply the transformation on your data using a simple map operation, basically you are looking up the right bloom filter on each key applying the filter on it, again here if unserializing bloom filter is time consuming then you can partition the data on key then use the broadcast variable to look up the bloom filter for each key apply filter on all data in serial. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: We ended up going with: map() - set the group_id as the key in a Tuple reduceByKey() - end up with (K,Seq[V]) map() - create the bloom filter and loop through the Seq and persist the Bloom filter This seems to be fine. I guess Spark cannot optimize the reduceByKey and map steps to occur together since the fact that we are looping through the Seq is out of Spark's control. -Suren On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, My team is trying to replicate an existing Map/Reduce process in Spark. Basically, we are creating Bloom Filters for quick set membership tests within our processing pipeline. We have a single column (call it group_id) that we use to partition into sets. As you would expect, in the map phase, we emit the group_id as the key and in the reduce phase, we instantiate the Bloom Filter for a given key in the setup() method and persist that Bloom Filter in the cleanup() method. In Spark, we can do something similar with map() and reduceByKey() but we have the following questions. 1. Accessing the reduce key In reduceByKey(), how do we get access to the specific key within the reduce function? 2. Equivalent of setup/cleanup Where should we instantiate and persist each Bloom Filter by key? In the driver and then pass in the references to the reduce function? But if so, how does the reduce function know which set's Bloom Filter it should be writing to (question 1 above)? It seems if we use groupByKey and then reduceByKey, that gives us access to all of the values at one go. I assume there, Spark will manage if those values all don't fit in memory in one go. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Accessing the reduce key
Grouped by the group_id but not sorted. -Suren On Thu, Mar 20, 2014 at 5:52 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: You are using the data grouped (sorted?) To create the bloom filter ? On Mar 20, 2014 4:35 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Mayur, To be a little clearer, for creating the Bloom Filters, I don't think broadcast variables are the way to go, though definitely that would work for using the Bloom Filters to filter data. The reason why is that the creation needs to happen in a single thread. Otherwise, some type of locking/distributed locking is needed on the individual Bloom Filter itself, with performance impact. Agreed? -Suren On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Mayur, Thanks. This step is for creating the Bloom Filter, not using it to filter data, actually. But your answer still stands. Partitioning by key, having the bloom filters as a broadcast variable and then doing mappartition makes sense. Are there performance implications for this approach, such as with using the broadcast variable, versus the approach we used, in which the Bloom Filter (again, for creating it) is only referenced by the single map application? -Suren On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Why are you trying to reducebyKey? Are you looking to work on the data sequentially. If I understand correctly you are looking to filter your data using the bloom filter each bloom filter is tied to which key is instantiating it. Following are some of the options *partiition* your data by key use mappartition operator to run function on partition independently. The same function will be applied to each partition. If your bloomfilter is large then you can bundle all of them in as a broadcast variable use it to apply the transformation on your data using a simple map operation, basically you are looking up the right bloom filter on each key applying the filter on it, again here if unserializing bloom filter is time consuming then you can partition the data on key then use the broadcast variable to look up the bloom filter for each key apply filter on all data in serial. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: We ended up going with: map() - set the group_id as the key in a Tuple reduceByKey() - end up with (K,Seq[V]) map() - create the bloom filter and loop through the Seq and persist the Bloom filter This seems to be fine. I guess Spark cannot optimize the reduceByKey and map steps to occur together since the fact that we are looping through the Seq is out of Spark's control. -Suren On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, My team is trying to replicate an existing Map/Reduce process in Spark. Basically, we are creating Bloom Filters for quick set membership tests within our processing pipeline. We have a single column (call it group_id) that we use to partition into sets. As you would expect, in the map phase, we emit the group_id as the key and in the reduce phase, we instantiate the Bloom Filter for a given key in the setup() method and persist that Bloom Filter in the cleanup() method. In Spark, we can do something similar with map() and reduceByKey() but we have the following questions. 1. Accessing the reduce key In reduceByKey(), how do we get access to the specific key within the reduce function? 2. Equivalent of setup/cleanup Where should we instantiate and persist each Bloom Filter by key? In the driver and then pass in the references to the reduce function? But if so, how does the reduce function know which set's Bloom Filter it should be writing to (question 1 above)? It seems if we use groupByKey and then reduceByKey, that gives us access to all of the values at one go. I assume there, Spark will manage if those values all don't fit in memory in one go. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine