Re: RDD.aggregate versus accumulables...

2014-11-17 Thread Surendranauth Hiraman
We use Algebird for calculating things like min/max, stddev, variance, etc.

https://github.com/twitter/algebird/wiki

-Suren


On Mon, Nov 17, 2014 at 11:32 AM, Daniel Siegmann daniel.siegm...@velos.io
wrote:

 You should *never* use accumulators for this purpose because you may get
 incorrect answers. Accumulators can count the same thing multiple times -
 you cannot rely upon the correctness of the values they compute. See
 SPARK-732 https://issues.apache.org/jira/browse/SPARK-732 for more info.

 On Sun, Nov 16, 2014 at 10:06 PM, Segerlind, Nathan L 
 nathan.l.segerl...@intel.com wrote:

  Hi All.



 I am trying to get my head around why using accumulators and accumulables
 seems to be the most recommended method for accumulating running sums,
 averages, variances and the like, whereas the aggregate method seems to me
 to be the right one. I have no performance measurements as of yet, but it
 seems that aggregate is simpler and more intuitive (And it does what one
 might expect an accumulator to do) whereas the accumulators and
 accumulables seem to have some extra complications and overhead.



 So…



 What’s the real difference between an accumulator/accumulable and
 aggregating an RDD? When is one method of aggregation preferred over the
 other?



 Thanks,

 Nate




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Play framework

2014-10-16 Thread Surendranauth Hiraman
Mohammed,

Jumping in for Daniel, we actually address the configuration issue by
pulling values from environment variables or command line options. Maybe
that may handle at least some of your needs.

For the akka issue, here is the akka version we include in build.sbt:
com.typesafe.akka %% akka-actor % 2.2.1

-Suren


On Thu, Oct 16, 2014 at 12:23 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Daniel,

 Thanks for sharing this. It is very helpful.



 The reason I want to use Spark submit is that it provides more
 flexibility. For example, with spark-submit, I don’t need to hard code the
 master info in the code. I can easily change the config without having to
 change and recompile code.



 Do you mind sharing the sbt build file for your play app? I tried to build
 an uber jar using sbt-assembly. It gets built, but when I run it, it throws
 all sorts of exception. I have seen some blog posts that Spark and Play use
 different version of the Akka library. So I included Akka in my build.scala
 file, but still cannot get rid of Akka related exceptions. I suspect that
 the settings in the build.scala file for my play project is incorrect.



 Mohammed



 *From:* Daniel Siegmann [mailto:daniel.siegm...@velos.io]
 *Sent:* Thursday, October 16, 2014 7:15 AM
 *To:* Mohammed Guller
 *Cc:* user@spark.apache.org
 *Subject:* Re: Play framework



 We execute Spark jobs from a Play application but we don't use
 spark-submit. I don't know if you really want to use spark-submit, but if
 not you can just create a SparkContext programmatically in your app.

 In development I typically run Spark locally. Creating the Spark context
 is pretty trivial:

 val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome
 App)

 // call conf.set for any other configuration you want

 val sc = new SparkContext(sparkConf)

 It is important to keep in mind you cannot have multiple local contexts
 (you can create them but you'll get odd errors), so if you are running
 things in parallel within your app (even unit tests) you'd need to share a
 context in this case. If you are running sequentially you can create a new
 local context each time, but you must make sure to call SparkContext.stop()
 when you're done.

 Running against a cluster is a bit more complicated because you need to
 add all your dependency jars. I'm not sure how to get this to work with play
 run. I stick to building the app with play dist and then running against
 the packaged application, because it very conveniently provides all the
 dependencies in a lib folder. Here is some code to load all the paths you
 need from the dist:


 def libs : Seq[String] = {
 val libDir = play.api.Play.application.getFile(lib)

 logger.info(sSparkContext will be initialized with libraries
 from directory $libDir)

 return if ( libDir.exists ) {

 libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(.jar))
 } else {
 throw new IllegalStateException(slib dir is missing: $libDir)
 }
 }

 Creating the context is similar to above, but with this extra line:


 conf.setJars(libs)

 I hope this helps. I should note that I don't use play run very much, at
 least not for when I'm actually executing Spark jobs. So I'm not sure if
 this integrates properly with that. I have unit tests which execute on
 Spark and have executed the dist package both locally and on a cluster. To
 make working with the dist locally easier, I wrote myself a little shell
 script to unzip and run the dist.





 On Wed, Oct 15, 2014 at 10:51 PM, Mohammed Guller moham...@glassbeam.com
 wrote:

 Hi –



 Has anybody figured out how to integrate a Play application with Spark and
 run it on a Spark cluster using spark-submit script? I have seen some blogs
 about creating a simple Play app and running it locally on a dev machine
 with sbt run command. However, those steps don’t work for Spark-submit.



 If you have figured out how to build and run a Play app with Spark-submit,
 I would appreciate if you could share the steps and the sbt settings for
 your Play app.



 Thanks,

 Mohammed






 --

 Daniel Siegmann, Software Developer
 Velos

 Accelerating Machine Learning


 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Spark And Mapr

2014-10-01 Thread Surendranauth Hiraman
As Sungwook said, the classpath pointing to the mapr jar is the key for
that error.

MapR has a Spark install that hopefully makes it easier. I don't have the
instructions handy but you can asking their support about it.

-Suren

On Wed, Oct 1, 2014 at 7:18 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 It should just work in PySpark, the same way it does in Java / Scala apps.

 Matei

 On Oct 1, 2014, at 4:12 PM, Sungwook Yoon sy...@maprtech.com wrote:


 Yes.. you should use maprfs://

 I personally haven't used pyspark, I just used scala shell or standalone
 with MapR.

 I think you need to set classpath right, adding jar like
  /opt/mapr/hadoop/hadoop-0.20.2/lib/hadoop-0.20.2-dev-core.jar to the
 classpath
 in the classpath.

 Sungwook

 On Wed, Oct 1, 2014 at 4:09 PM, Addanki, Santosh Kumar 
 santosh.kumar.adda...@sap.com wrote:

  Hi



 We would like to do this in PySpark Environment



 i.e something like



 test = sc.textFile(maprfs:///user/root/test) or

 test = sc.textFile(hdfs:///user/root/test) or





 Currently when we try

 test = sc.textFile(maprfs:///user/root/test)



 It throws the error

 No File-System for scheme: maprfs





 Best Regards

 Santosh







 *From:* Vladimir Rodionov [mailto:vrodio...@splicemachine.com]
 *Sent:* Wednesday, October 01, 2014 3:59 PM
 *To:* Addanki, Santosh Kumar
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark And Mapr



 There is doc on MapR:



 http://doc.mapr.com/display/MapR/Accessing+MapR-FS+in+Java+Applications



 -Vladimir Rodionov



 On Wed, Oct 1, 2014 at 3:00 PM, Addanki, Santosh Kumar 
 santosh.kumar.adda...@sap.com wrote:

 Hi



 We were using Horton 2.4.1 as our Hadoop distribution and now switched to
 MapR



 Previously to read a text file  we would use :



 test = sc.textFile(\hdfs://10.48.101.111:8020/user/hdfs/test\
 http://10.48.101.111:8020/user/hdfs/test%5C)





 What would be the equivalent of the same for Mapr.



 Best Regards

 Santosh








-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: All of the tasks have been completed but the Stage is still shown as Active?

2014-07-10 Thread Surendranauth Hiraman
History Server is also very helpful.



On Thu, Jul 10, 2014 at 7:37 AM, Haopu Wang hw...@qilinsoft.com wrote:

  I didn't keep the driver's log. It's a lesson.

 I will try to run it again to see if it happens again.


  --

 *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com]
 *Sent:* 2014年7月10日 17:29
 *To:* user@spark.apache.org
 *Subject:* Re: All of the tasks have been completed but the Stage is
 still shown as Active?



 Do you see any errors in the logs of the driver?



 On Thu, Jul 10, 2014 at 1:21 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I'm running an App for hours in a standalone cluster. From the data
 injector and Streaming tab of web ui, it's running well.

 However, I see quite a lot of Active stages in web ui even some of them
 have all of their tasks completed.

 I attach a screenshot for your reference.

 Do you ever see this kind of behavior?






-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Purpose of spark-submit?

2014-07-09 Thread Surendranauth Hiraman
Are there any gaps beyond convenience and code/config separation in using
spark-submit versus SparkConf/SparkContext if you are willing to set your
own config?

If there are any gaps, +1 on having parity within SparkConf/SparkContext
where possible. In my use case, we launch our jobs programmatically. In
theory, we could shell out to spark-submit but it's not the best option for
us.

So far, we are only using Standalone Cluster mode, so I'm not knowledgeable
on the complexities of other modes, though.

-Suren



On Wed, Jul 9, 2014 at 8:20 AM, Koert Kuipers ko...@tresata.com wrote:

 not sure I understand why unifying how you submit app for different
 platforms and dynamic configuration cannot be part of SparkConf and
 SparkContext?

 for classpath a simple script similar to hadoop classpath that shows
 what needs to be added should be sufficient.

 on spark standalone I can launch a program just fine with just SparkConf
 and SparkContext. not on yarn, so the spark-launch script must be doing a
 few things extra there I am missing... which makes things more difficult
 because I am not sure its realistic to expect every application that needs
 to run something on spark to be launched using spark-submit.
  On Jul 9, 2014 3:45 AM, Patrick Wendell pwend...@gmail.com wrote:

 It fulfills a few different functions. The main one is giving users a
 way to inject Spark as a runtime dependency separately from their
 program and make sure they get exactly the right version of Spark. So
 a user can bundle an application and then use spark-submit to send it
 to different types of clusters (or using different versions of Spark).

 It also unifies the way you bundle and submit an app for Yarn, Mesos,
 etc... this was something that became very fragmented over time before
 this was added.

 Another feature is allowing users to set configuration values
 dynamically rather than compile them inside of their program. That's
 the one you mention here. You can choose to use this feature or not.
 If you know your configs are not going to change, then you don't need
 to set them with spark-submit.


 On Wed, Jul 9, 2014 at 10:22 AM, Robert James srobertja...@gmail.com
 wrote:
  What is the purpose of spark-submit? Does it do anything outside of
  the standard val conf = new SparkConf ... val sc = new SparkContext
  ... ?




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Comparative study

2014-07-08 Thread Surendranauth Hiraman
I'll respond for Dan.

Our test dataset was a total of 10 GB of input data (full production
dataset for this particular dataflow would be 60 GB roughly).

I'm not sure what the size of the final output data was but I think it was
on the order of 20 GBs for the given 10 GB of input data. Also, I can say
that when we were experimenting with persist(DISK_ONLY), the size of all
RDDs on disk was around 200 GB, which gives a sense of overall transient
memory usage with no persistence.

In terms of our test cluster, we had 15 nodes. Each node had 24 cores and 2
workers each. Each executor got 14 GB of memory.

-Suren



On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce. The
 MapReduce API is very constrained; Spark's API feels much more natural to
 me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production. In
 my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly -
 but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com




 --
  Daniel Siegmann, Software Developer
 Velos
  Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Comparative study

2014-07-08 Thread Surendranauth Hiraman
To clarify, we are not persisting to disk. That was just one of the
experiments we did because of some issues we had along the way.

At this time, we are NOT using persist but cannot get the flow to complete
in Standalone Cluster mode. We do not have a YARN-capable cluster at this
time.

We agree with what you're saying. Your results are what we were hoping for
and expecting. :-)  Unfortunately we still haven't gotten the flow to run
end to end on this relatively small dataset.

It must be something related to our cluster, standalone mode or our flow
but as far as we can tell, we are not doing anything unusual.

Did you do any custom configuration? Any advice would be appreciated.

-Suren




On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com
wrote:

  It seems to me that you're not taking full advantage of the lazy
 evaluation, especially persisting to disk only.  While it might be true
 that the cumulative size of the RDDs looks like it's 300GB, only a small
 portion of that should be resident at any one time.  We've evaluated data
 sets much greater than 10GB in Spark using the Spark master and Spark with
 Yarn (cluster -- formerly standalone -- mode).  Nice thing about using Yarn
 is that it reports the actual memory *demand*, not just the memory
 requested for driver and workers.  Processing a 60GB data set through
 thousands of stages in a rather complex set of analytics and
 transformations consumed a total cluster resource (divided among all
 workers and driver) of only 9GB.  We were somewhat startled at first by
 this result, thinking that it would be much greater, but realized that it
 is a consequence of Spark's lazy evaluation model.  This is even with
 several intermediate computations being cached as input to multiple
 evaluation paths.

 Good luck.

 Kevin



 On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote:

 I'll respond for Dan.

  Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

  I'm not sure what the size of the final output data was but I think it
 was on the order of 20 GBs for the given 10 GB of input data. Also, I can
 say that when we were experimenting with persist(DISK_ONLY), the size of
 all RDDs on disk was around 200 GB, which gives a sense of overall
 transient memory usage with no persistence.

  In terms of our test cluster, we had 15 nodes. Each node had 24 cores
 and 2 workers each. Each executor got 14 GB of memory.

  -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce. The
 MapReduce API is very constrained; Spark's API feels much more natural to
 me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production. In
 my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly -
 but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy

Re: Comparative study

2014-07-08 Thread Surendranauth Hiraman
How wide are the rows of data, either the raw input data or any generated
intermediate data?

We are at a loss as to why our flow doesn't complete. We banged our heads
against it for a few weeks.

-Suren



On Tue, Jul 8, 2014 at 2:12 PM, Kevin Markey kevin.mar...@oracle.com
wrote:

  Nothing particularly custom.  We've tested with small (4 node)
 development clusters, single-node pseudoclusters, and bigger, using
 plain-vanilla Hadoop 2.2 or 2.3 or CDH5 (beta and beyond), in Spark master,
 Spark local, Spark Yarn (client and cluster) modes, with total memory
 resources ranging from 4GB to 256GB+.

 K



 On 07/08/2014 12:04 PM, Surendranauth Hiraman wrote:

 To clarify, we are not persisting to disk. That was just one of the
 experiments we did because of some issues we had along the way.

  At this time, we are NOT using persist but cannot get the flow to
 complete in Standalone Cluster mode. We do not have a YARN-capable cluster
 at this time.

  We agree with what you're saying. Your results are what we were hoping
 for and expecting. :-)  Unfortunately we still haven't gotten the flow to
 run end to end on this relatively small dataset.

  It must be something related to our cluster, standalone mode or our flow
 but as far as we can tell, we are not doing anything unusual.

  Did you do any custom configuration? Any advice would be appreciated.

  -Suren




 On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  It seems to me that you're not taking full advantage of the lazy
 evaluation, especially persisting to disk only.  While it might be true
 that the cumulative size of the RDDs looks like it's 300GB, only a small
 portion of that should be resident at any one time.  We've evaluated data
 sets much greater than 10GB in Spark using the Spark master and Spark with
 Yarn (cluster -- formerly standalone -- mode).  Nice thing about using Yarn
 is that it reports the actual memory *demand*, not just the memory
 requested for driver and workers.  Processing a 60GB data set through
 thousands of stages in a rather complex set of analytics and
 transformations consumed a total cluster resource (divided among all
 workers and driver) of only 9GB.  We were somewhat startled at first by
 this result, thinking that it would be much greater, but realized that it
 is a consequence of Spark's lazy evaluation model.  This is even with
 several intermediate computations being cached as input to multiple
 evaluation paths.

 Good luck.

 Kevin



 On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote:

 I'll respond for Dan.

  Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

  I'm not sure what the size of the final output data was but I think it
 was on the order of 20 GBs for the given 10 GB of input data. Also, I can
 say that when we were experimenting with persist(DISK_ONLY), the size of
 all RDDs on disk was around 200 GB, which gives a sense of overall
 transient memory usage with no persistence.

  In terms of our test cluster, we had 15 nodes. Each node had 24 cores
 and 2 workers each. Each executor got 14 GB of memory.

  -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce.
 The MapReduce API is very constrained; Spark's API feels much more natural
 to me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production.
 In my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly
 - but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts

Re: Comparative study

2014-07-08 Thread Surendranauth Hiraman
Also, our exact same flow but with 1 GB of input data completed fine.

-Suren


On Tue, Jul 8, 2014 at 2:16 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 How wide are the rows of data, either the raw input data or any generated
 intermediate data?

 We are at a loss as to why our flow doesn't complete. We banged our heads
 against it for a few weeks.

 -Suren



 On Tue, Jul 8, 2014 at 2:12 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  Nothing particularly custom.  We've tested with small (4 node)
 development clusters, single-node pseudoclusters, and bigger, using
 plain-vanilla Hadoop 2.2 or 2.3 or CDH5 (beta and beyond), in Spark master,
 Spark local, Spark Yarn (client and cluster) modes, with total memory
 resources ranging from 4GB to 256GB+.

 K



 On 07/08/2014 12:04 PM, Surendranauth Hiraman wrote:

 To clarify, we are not persisting to disk. That was just one of the
 experiments we did because of some issues we had along the way.

  At this time, we are NOT using persist but cannot get the flow to
 complete in Standalone Cluster mode. We do not have a YARN-capable cluster
 at this time.

  We agree with what you're saying. Your results are what we were hoping
 for and expecting. :-)  Unfortunately we still haven't gotten the flow to
 run end to end on this relatively small dataset.

  It must be something related to our cluster, standalone mode or our
 flow but as far as we can tell, we are not doing anything unusual.

  Did you do any custom configuration? Any advice would be appreciated.

  -Suren




 On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  It seems to me that you're not taking full advantage of the lazy
 evaluation, especially persisting to disk only.  While it might be true
 that the cumulative size of the RDDs looks like it's 300GB, only a small
 portion of that should be resident at any one time.  We've evaluated data
 sets much greater than 10GB in Spark using the Spark master and Spark with
 Yarn (cluster -- formerly standalone -- mode).  Nice thing about using Yarn
 is that it reports the actual memory *demand*, not just the memory
 requested for driver and workers.  Processing a 60GB data set through
 thousands of stages in a rather complex set of analytics and
 transformations consumed a total cluster resource (divided among all
 workers and driver) of only 9GB.  We were somewhat startled at first by
 this result, thinking that it would be much greater, but realized that it
 is a consequence of Spark's lazy evaluation model.  This is even with
 several intermediate computations being cached as input to multiple
 evaluation paths.

 Good luck.

 Kevin



 On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote:

 I'll respond for Dan.

  Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

  I'm not sure what the size of the final output data was but I think it
 was on the order of 20 GBs for the given 10 GB of input data. Also, I can
 say that when we were experimenting with persist(DISK_ONLY), the size of
 all RDDs on disk was around 200 GB, which gives a sense of overall
 transient memory usage with no persistence.

  In terms of our test cluster, we had 15 nodes. Each node had 24 cores
 and 2 workers each. Each executor got 14 GB of memory.

  -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce.
 The MapReduce API is very constrained; Spark's API feels much more natural
 to me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production.
 In my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly
 - but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some

Re: Comparative study

2014-07-08 Thread Surendranauth Hiraman
We kind of hijacked Santos' original thread, so apologies for that and let
me try to get back to Santos' original question on Map/Reduce versus Spark.

I would say it's worth migrating from M/R, with the following thoughts.

Just my opinion but I would summarize the latest emails in this thread as
Spark can scale to datasets in 10s and 100s of GBs. I've seen some
companies talk about TBs of data but I'm unclear if that is for a single
flow.

At the same time, some folks (like my team) that I've seen on the user
group have a lot of difficulty with the same sized datasets, which points
to either environmental issues (machines, cluster mode, etc.), nature of
the data or nature of the transforms/flow complexity (though Kevin's
experience runs counter to the latter, which is very positive) or we are
just doing something subtle wrong.

My overall opinion right now is Map/Reduce is easier to get working in
general on very large, heterogeneous datasets but the programming model for
Spark is the right way to go and worth the effort.

Libraries like Scoobi, Scrunch and Scalding (and their associated Java
versions) provide a Spark-like wrapper around Map/Reduce but my guess is
that, since they are limited to Map/Reduce under the covers, they cannot do
some of the optimizations that Spark can, such as collapsing several
transforms into a single stage.

In addition, my company believes that having batch, streaming and SQL (ad
hoc querying) on a single platform has worthwhile benefits.

We're still relatively new with Spark (a few months), so would also love to
hear more from others in the community.

-Suren



On Tue, Jul 8, 2014 at 2:17 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 Also, our exact same flow but with 1 GB of input data completed fine.

 -Suren


 On Tue, Jul 8, 2014 at 2:16 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 How wide are the rows of data, either the raw input data or any generated
 intermediate data?

 We are at a loss as to why our flow doesn't complete. We banged our heads
 against it for a few weeks.

 -Suren



 On Tue, Jul 8, 2014 at 2:12 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  Nothing particularly custom.  We've tested with small (4 node)
 development clusters, single-node pseudoclusters, and bigger, using
 plain-vanilla Hadoop 2.2 or 2.3 or CDH5 (beta and beyond), in Spark master,
 Spark local, Spark Yarn (client and cluster) modes, with total memory
 resources ranging from 4GB to 256GB+.

 K



 On 07/08/2014 12:04 PM, Surendranauth Hiraman wrote:

 To clarify, we are not persisting to disk. That was just one of the
 experiments we did because of some issues we had along the way.

  At this time, we are NOT using persist but cannot get the flow to
 complete in Standalone Cluster mode. We do not have a YARN-capable cluster
 at this time.

  We agree with what you're saying. Your results are what we were hoping
 for and expecting. :-)  Unfortunately we still haven't gotten the flow to
 run end to end on this relatively small dataset.

  It must be something related to our cluster, standalone mode or our
 flow but as far as we can tell, we are not doing anything unusual.

  Did you do any custom configuration? Any advice would be appreciated.

  -Suren




 On Tue, Jul 8, 2014 at 1:54 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  It seems to me that you're not taking full advantage of the lazy
 evaluation, especially persisting to disk only.  While it might be true
 that the cumulative size of the RDDs looks like it's 300GB, only a small
 portion of that should be resident at any one time.  We've evaluated data
 sets much greater than 10GB in Spark using the Spark master and Spark with
 Yarn (cluster -- formerly standalone -- mode).  Nice thing about using Yarn
 is that it reports the actual memory *demand*, not just the memory
 requested for driver and workers.  Processing a 60GB data set through
 thousands of stages in a rather complex set of analytics and
 transformations consumed a total cluster resource (divided among all
 workers and driver) of only 9GB.  We were somewhat startled at first by
 this result, thinking that it would be much greater, but realized that it
 is a consequence of Spark's lazy evaluation model.  This is even with
 several intermediate computations being cached as input to multiple
 evaluation paths.

 Good luck.

 Kevin



 On 07/08/2014 11:04 AM, Surendranauth Hiraman wrote:

 I'll respond for Dan.

  Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

  I'm not sure what the size of the final output data was but I think
 it was on the order of 20 GBs for the given 10 GB of input data. Also, I
 can say that when we were experimenting with persist(DISK_ONLY), the size
 of all RDDs on disk was around 200 GB, which gives a sense of overall
 transient memory usage with no persistence.

  In terms of our test cluster, we had 15 nodes

Re: Comparative study

2014-07-08 Thread Surendranauth Hiraman
Aaron,

I don't think anyone was saying Spark can't handle this data size, given
testimony from the Spark team, Bizo, etc., on large datasets. This has kept
us trying different things to get our flow to work over the course of
several weeks.

Agreed that the first instinct should be what did I do wrong.

I believe that is what every person facing this issue has done, in reaching
out to the user group repeatedly over the course of the few of months that
I've been active here. I also know other companies (all experienced with
large production datasets on other platforms) facing the same types of
issues - flows that run on subsets of data but not the whole production set.

So I think, as you are saying, it points to the need for further
diagnostics. And maybe also some type of guidance on typical issues with
different types of datasets (wide rows, narrow rows, etc.), flow
topologies. etc.? Hard to tell where we are going wrong right now. We've
tried many things over the course of 6 weeks or so.

I tried to look for the professional services link on databricks.com but
didn't find it. ;-) (jk).

-Suren



On Tue, Jul 8, 2014 at 4:16 PM, Aaron Davidson ilike...@gmail.com wrote:

 Not sure exactly what is happening but perhaps there are ways to
 restructure your program for it to work better. Spark is definitely able to
 handle much, much larger workloads.


 +1 @Reynold

 Spark can handle big big data. There are known issues with informing the
 user about what went wrong and how to fix it that we're actively working
 on, but the first impulse when a job fails should be what did I do wrong
 rather than Spark can't handle this workload. Messaging is a huge part in
 making this clear -- getting things like a job hanging or an out of memory
 error can be very difficult to debug, and improving this is one of our
 highest priorties.


 On Tue, Jul 8, 2014 at 12:47 PM, Reynold Xin r...@databricks.com wrote:

 Not sure exactly what is happening but perhaps there are ways to
 restructure your program for it to work better. Spark is definitely able to
 handle much, much larger workloads.

 I've personally run a workload that shuffled 300 TB of data. I've also
 ran something that shuffled 5TB/node and stuffed my disks fairly full that
 the file system is close to breaking.

 We can definitely do a better job in Spark to make it output more
 meaningful diagnosis and more robust with partitions of data that don't fit
 in memory though. A lot of the work in the next few releases will be on
 that.



 On Tue, Jul 8, 2014 at 10:04 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 I'll respond for Dan.

 Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

 I'm not sure what the size of the final output data was but I think it
 was on the order of 20 GBs for the given 10 GB of input data. Also, I can
 say that when we were experimenting with persist(DISK_ONLY), the size of
 all RDDs on disk was around 200 GB, which gives a sense of overall
 transient memory usage with no persistence.

 In terms of our test cluster, we had 15 nodes. Each node had 24 cores
 and 2 workers each. Each executor got 14 GB of memory.

 -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce.
 The MapReduce API is very constrained; Spark's API feels much more natural
 to me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production.
 In my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly
 - but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation

Re: Spark memory optimization

2014-07-07 Thread Surendranauth Hiraman
Using persist() is a sort of a hack or a hint (depending on your
perspective :-)) to make the RDD use disk, not memory. As I mentioned
though, the disk io has consequences, mainly (I think) making sure you have
enough disks to not let io be a bottleneck.

Increasing partitions I think is the other common approach people take,
from what I've read.

For alternatives, if your data is in HDFS or you just want to stick with
Map/Reduce, then the higher level abstractions on top of M/R you might want
to look at include the following, which have both Scala and Java
implementations in some cases.

Scalding (Scala API on top of Cascading and it seems is the most active of
such projects, at least on the surface)
Scoobi
Scrunch (Scala wrapper around Crunch)

There are other parallel distributed frameworks outside of the Hadoop
ecosystem, of course.

-Suren




On Mon, Jul 7, 2014 at 7:31 AM, Igor Pernek i...@pernek.net wrote:

 Thanks guys! Actually, I'm not doing any caching (at least I'm not calling
 cache/persist), do I still need to use the DISK_ONLY storage level?
 However, I do use reduceByKey and sortByKey. Mayur, you mentioned that
 sortByKey requires data to fit the memory. Is there any way to work around
 this (maybe by increasing the number of partitions or something similar?).

 What alternative would you suggest, if Spark is not the way to go with
 this kind of scenario. As mentioned, what I like about spark is its high
 level of abstraction of parallelization. I'm ready to sacrifice speed (if
 the slowdown is not too big - I'm doing batch processing, nothing
 real-time) for code simplicity and readability.



 On Fri, Jul 4, 2014 at 3:16 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:
 
  When using DISK_ONLY, keep in mind that disk I/O is pretty high. Make
 sure you are writing to multiple disks for best operation. And even with
 DISK_ONLY, we've found that there is a minimum threshold for executor ram
 (spark.executor.memory), which for us seemed to be around 8 GB.
 
  If you find that, with enough disks, you still have errors/exceptions
 getting the flow to finish, first check iostat to see if disk is the
 bottleneck.
 
  Then, you may want to try tuning some or all of the following, which
 affect buffers and timeouts. For us, because we did not have enough disks
 to start out, the io bottleneck caused timeouts and other errors. In the
 end, IMHO, it's probably best to solve the problem by adding disks than by
 tuning the parameters, because it seemed that the i/o bottlenecks
 eventually backed up the processing.
 
  //conf.set(spark.shuffle.consolidateFiles,true)
 
  //conf.set(spark.shuffle.file.buffer.kb, 200)// does
 doubling this help? should increase in-memory buffer to decrease disk writes
  //conf.set(spark.reducer.maxMbInFlight, 96) // does
 doubling this help? should allow for more simultaneous shuffle data to be
 read from remotes
 
  // because we use disk-only, we should be able to reverse the
 default memory usage settings
  //conf.set(spark.shuffle.memoryFraction,0.6) // default 0.3
  //conf.set(spark.storage.memoryFraction,0.3)   // default 0.6
 
  //conf.set(spark.worker.timeout,180)
 
  // akka settings
  //conf.set(spark.akka.threads, 300)   // number of akka
 actors
  //conf.set(spark.akka.timeout, 180)   // we saw a problem
 with smaller numbers
  //conf.set(spark.akka.frameSize, 100)  // not sure if we
 need to up this. Default is 10.
  //conf.set(spark.akka.batchSize, 30)
  //conf.set(spark.akka.askTimeout, 30) // supposedly this is
 important for high cpu/io load
 
  // block manager
  //conf.set(spark.storage.blockManagerTimeoutIntervalMs,
 18)
  //conf.set(spark.blockManagerHeartBeatMs, 8)
 
 
 
 
 
 
  On Fri, Jul 4, 2014 at 8:52 AM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:
 
  I would go with Spark only if you are certain that you are going to
 scale out in the near future.
  You can change the default storage of RDD to DISK_ONLY, that might
 remove issues around any rdd leveraging memory. Thr are some functions
 particularly sortbykey that require data to fit in memory to work, so you
 may be hitting some of those walls too.
  Regards
  Mayur
 
  Mayur Rustagi
  Ph: +1 (760) 203 3257
  http://www.sigmoidanalytics.com
  @mayur_rustagi
 
 
 
  On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek i...@pernek.net wrote:
 
  Hi all!
 
  I have a folder with 150 G of txt files (around 700 files, on average
 each 200 MB).
 
  I'm using scala to process the files and calculate some aggregate
 statistics in the end. I see two possible approaches to do that: - manually
 loop through all the files, do the calculations per file and merge the
 results in the end - read the whole folder to one RDD, do all the
 operations on this single RDD and let spark do all the parallelization
 
  I'm leaning towards the second approach as it seems

Re: Spark memory optimization

2014-07-04 Thread Surendranauth Hiraman
When using DISK_ONLY, keep in mind that disk I/O is pretty high. Make sure
you are writing to multiple disks for best operation. And even with
DISK_ONLY, we've found that there is a minimum threshold for executor ram
(spark.executor.memory), which for us seemed to be around 8 GB.

If you find that, with enough disks, you still have errors/exceptions
getting the flow to finish, first check iostat to see if disk is the
bottleneck.

Then, you may want to try tuning some or all of the following, which affect
buffers and timeouts. For us, because we did not have enough disks to start
out, the io bottleneck caused timeouts and other errors. In the end, IMHO,
it's probably best to solve the problem by adding disks than by tuning the
parameters, because it seemed that the i/o bottlenecks eventually backed up
the processing.

//conf.set(spark.shuffle.consolidateFiles,true)

//conf.set(spark.shuffle.file.buffer.kb, 200)// does
doubling this help? should increase in-memory buffer to decrease disk writes
//conf.set(spark.reducer.maxMbInFlight, 96) // does
doubling this help? should allow for more simultaneous shuffle data to be
read from remotes

// because we use disk-only, we should be able to reverse the
default memory usage settings
//conf.set(spark.shuffle.memoryFraction,0.6) // default 0.3
//conf.set(spark.storage.memoryFraction,0.3)   // default 0.6

//conf.set(spark.worker.timeout,180)

// akka settings
//conf.set(spark.akka.threads, 300)   // number of akka actors
//conf.set(spark.akka.timeout, 180)   // we saw a problem with
smaller numbers
//conf.set(spark.akka.frameSize, 100)  // not sure if we need
to up this. Default is 10.
//conf.set(spark.akka.batchSize, 30)
//conf.set(spark.akka.askTimeout, 30) // supposedly this is
important for high cpu/io load

// block manager
//conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18)
//conf.set(spark.blockManagerHeartBeatMs, 8)






On Fri, Jul 4, 2014 at 8:52 AM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 I would go with Spark only if you are certain that you are going to scale
 out in the near future.
 You can change the default storage of RDD to DISK_ONLY, that might remove
 issues around any rdd leveraging memory. Thr are some functions
 particularly sortbykey that require data to fit in memory to work, so you
 may be hitting some of those walls too.
 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Fri, Jul 4, 2014 at 2:36 PM, Igor Pernek i...@pernek.net wrote:

  Hi all!

 I have a folder with 150 G of txt files (around 700 files, on average
 each 200 MB).

 I'm using scala to process the files and calculate some aggregate
 statistics in the end. I see two possible approaches to do that: - manually
 loop through all the files, do the calculations per file and merge the
 results in the end - read the whole folder to one RDD, do all the
 operations on this single RDD and let spark do all the parallelization

 I'm leaning towards the second approach as it seems cleaner (no need for
 parallelization specific code), but I'm wondering if my scenario will fit
 the constraints imposed by my hardware and data. I have one workstation
 with 16 threads and 64 GB of RAM available (so the parallelization will be
 strictly local between different processor cores). I might scale the
 infrastructure with more machines later on, but for now I would just like
 to focus on tunning the settings for this one workstation scenario.

 The code I'm using: - reads TSV files, and extracts meaningful data to
 (String, String, String) triplets - afterwards some filtering, mapping and
 grouping is performed - finally, the data is reduced and some aggregates
 are calculated

 I've been able to run this code with a single file (~200 MB of data),
 however I get a java.lang.OutOfMemoryError: GC overhead limit exceeded
 and/or a Java out of heap exception when adding more data (the application
 breaks with 6GB of data but I would like to use it with 150 GB of data).

 I guess I would have to tune some parameters to make this work. I would
 appreciate any tips on how to approach this problem (how to debug for
 memory demands). I've tried increasing the 'spark.executor.memory' and
 using a smaller number of cores (the rational being that each core needs
 some heap space), but this didn't solve my problems.

 I don't need the solution to be very fast (it can easily run for a few
 hours even days if needed). I'm also not caching any data, but just saving
 them to the file system in the end. If you think it would be more feasible
 to just go with the manual parallelization approach, I could do that as
 well.

 Thanks,

 Igor





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: 

Re: Enable Parsing Failed or Incompleted jobs on HistoryServer (YARN mode)

2014-07-03 Thread Surendranauth Hiraman
I've had some odd behavior with jobs showing up in the history server in
1.0.0. Failed jobs do show up but it seems they can show up minutes or
hours later. I see in the history server logs messages about bad task ids.
But then eventually the jobs show up.

This might be your situation.

Anecdotally, if you click on the job in the Spark Master GUI after it is
done, this may help it show up in the history server faster. Haven't
reliably tested this though. May just be a coincidence of timing.

-Suren



On Wed, Jul 2, 2014 at 8:01 PM, Andrew Lee alee...@hotmail.com wrote:

 Hi All,

 I have HistoryServer up and running, and it is great.

 Is it possible to also enable HsitoryServer to parse failed jobs event by
 default as well?

 I get No Completed Applications Found if job fails.

-
 -
 *= Event Log Location: *hdfs:///user/test01/spark/logs/

 No Completed Applications Found
 =

 The reason is that it is good to run the HistoryServer to keep track of
 performance and resource usage for each completed job,
 but I found it more useful when job fails. I can identify which stage did
 it fail, etc instead of sipping through the logs
 from the Resource Manager. The same event log is only available when the
 Application Master is still active, once the job fails,
 the Application Master is killed, and I lose the GUI access, even though I
 have the event log in JSON format, I can't open it with
 the HistoryServer.

 This is very helpful especially for long running jobs that last for 2-18
 hours that generates Gigabytes of logs.

 So I have 2 questions:

 1. Any reason why we only render completed jobs? Why can't we bring in all
 jobs and choose from the GUI? Like a time machine to restore the status
 from the Application Master?

 ./core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

 val logInfos = logDirs

   .sortBy { dir = getModificationTime(dir) }

   .map { dir = (dir,
 EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }

   .filter { case (dir, info) = info.*applicationComplete* }



 2. If I force to touch a file APPLICATION_COMPLETE in the failed job
 event log folder, will this cause any problem?








-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Changing log level of spark

2014-07-01 Thread Surendranauth Hiraman
One thing we ran into was that there was another log4j.properties earlier
in the classpath. For us, it was in our MapR/Hadoop conf.

If that is the case, something like the following could help you track it
down. The only thing to watch out for is that you might have to walk up the
classloader hierarchy.

ClassLoader cl = Thread.currentThread().getContextClassLoader();
URL loc = cl.getResource(/log4j.properties);
System.out.println(loc);

-Suren




On Tue, Jul 1, 2014 at 9:20 AM, Philip Limbeck philiplimb...@gmail.com
wrote:

 We changed the loglevel to DEBUG by replacing every INFO with DEBUG in
 /root/ephemeral-hdfs/conf/log4j.properties and propagating it to the
 cluster. There is some DEBUG output visible in both master and worker but
 nothing really interesting regarding stages or scheduling. Since we
 expected a little more than that, there could be 2 possibilites:
   a) There is still some other unknown way to set the loglevel to debug
   b) There is not that much log output to be expected in this direction, I
 looked for logDebug (The log wrapper in spark) in github with 84 results,
 which means that I doubt that there is not much else to expect.

 We actually just want to have a little more insight into the system
 behavior especially when using Shark since we ran into some serious
 concurrency issues with blocking queries. So much for the background why
 this is important to us.


 On Thu, Jun 26, 2014 at 3:30 AM, Aaron Davidson ilike...@gmail.com
 wrote:

 If you're using the spark-ec2 scripts, you may have to change
 /root/ephemeral-hdfs/conf/log4j.properties or something like that, as that
 is added to the classpath before Spark's own conf.


 On Wed, Jun 25, 2014 at 6:10 PM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 I have a log4j.xml in src/main/resources with

 ?xml version=1.0 encoding=UTF-8 ?
 !DOCTYPE log4j:configuration SYSTEM log4j.dtd
 log4j:configuration xmlns:log4j=http://jakarta.apache.org/log4j/;
 [...]
 root
 priority value =warn /
 appender-ref ref=Console /
 /root
 /log4j:configuration

 and that is included in the jar I package with `sbt assembly`. That
 works fine for me, at least on the driver.

 Tobias

 On Wed, Jun 25, 2014 at 2:25 PM, Philip Limbeck philiplimb...@gmail.com
 wrote:
  Hi!
 
  According to
 
 https://spark.apache.org/docs/0.9.0/configuration.html#configuring-logging
 ,
  changing log-level is just a matter of creating a log4j.properties
 (which is
  in the classpath of spark) and changing log level there for the root
 logger.
  I did this steps on every node in the cluster (master and worker
 nodes).
  However, after restart there is still no debug output as desired, but
 only
  the default info log level.






-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Spark executor error

2014-06-26 Thread Surendranauth Hiraman
I unfortunately haven't seen this directly. But some typical things I try
when debugging are as follows.

Do you see a corresponding error on the other side of that connection
(alpinenode7.alpinenow.local)? Or is that the same machine?

Also, do the driver logs show any longer stack trace and have you enabled
the history server, so you can see some more details about execution? That
helps me tremendously.

-Suren



On Wed, Jun 25, 2014 at 11:08 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 I'm seeing the following message in the log of an executor. Anyone seen this 
 error? After this, the executor seems to lose the cache, and but besides that 
 the whole thing slows down drastically - I.e. it gets stuck in a reduce phase 
 for 40+ minutes, whereas before it was finishing reduces in 2~3 seconds.



 14/06/25 19:22:31 WARN SendingConnection: Error writing in connection to 
 ConnectionManagerId(alpinenode7.alpinenow.local,46251)
 java.lang.NullPointerException
   at 
 org.apache.spark.network.MessageChunkHeader.buffer$lzycompute(MessageChunkHeader.scala:35)
   at 
 org.apache.spark.network.MessageChunkHeader.buffer(MessageChunkHeader.scala:32)
   at 
 org.apache.spark.network.MessageChunk.buffers$lzycompute(MessageChunk.scala:31)
   at org.apache.spark.network.MessageChunk.buffers(MessageChunk.scala:29)
   at 
 org.apache.spark.network.SendingConnection.write(Connection.scala:349)
   at 
 org.apache.spark.network.ConnectionManager$$anon$5.run(ConnectionManager.scala:142)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:724)




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: MLLib inside Storm : silly or not ?

2014-06-19 Thread Surendranauth Hiraman
I can't speak for MLlib, too. But I can say the model of training in Hadoop
M/R or Spark and production scoring in Storm works very well. My team has
done online learning (Sofia ML library, I think) in Storm as well.

I would be interested in this answer as well.

-Suren



On Thu, Jun 19, 2014 at 7:35 AM, Eustache DIEMERT eusta...@diemert.fr
wrote:

 Well, yes VW is an appealing option but I only found experimental
 integrations so far.

 Also, early experiments suggest Decision Trees Ensembles (RF, GBT) perform
 better than generalized linear models on our data. Hence the interest for
 MLLib :)

 Any other comments / suggestions welcome :)

 E/


 2014-06-19 12:37 GMT+02:00 Charles Earl charles.ce...@gmail.com:

 While I can't definitively speak to MLLib online learning,
 I'm sure you're evaluating Vowpal Wabbit, for which there's been some
 storm integrations contributed.
 Also you might look at factorie, http://factorie.cs.understanding.edu,
 which at least provides an online lda.
 C


 On Thursday, June 19, 2014, Eustache DIEMERT eusta...@diemert.fr wrote:

 Hi Sparkers,

 We have a Storm cluster and looking for a decent execution engine for
 machine learned models. What I've seen from MLLib is extremely positive,
 but we can't just throw away our Storm based stack.

 So my question is: is it feasible/recommended to train models in
 Spark/MLLib and execute them in another Java environment (Storm in this
 case) ?

 Thanks for any insights :)

 Eustache



 --
 - Charles





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Trailing Tasks Saving to HDFS

2014-06-19 Thread Surendranauth Hiraman
I've created an issue for this but if anyone has any advice, please let me
know.

Basically, on about 10 GBs of data, saveAsTextFile() to HDFS hangs on two
remaining tasks (out of 320). Those tasks seem to be waiting on data from
another task on another node. Eventually (about 2 hours later) they time
out with a connection reset by peer.

All the data actually seems to be on HDFS as the expected part files. It
just seems like the remaining tasks have corrupted metadata, so that they
do not realize that they are done. Just a guess though.

https://issues.apache.org/jira/browse/SPARK-2202

-Suren




On Wed, Jun 18, 2014 at 8:35 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 Looks like eventually there was some type of reset or timeout and the
 tasks have been reassigned. I'm guessing they'll keep failing until max
 failure count.

 The machine it disconnected from was a remote machine, though I've seen
 such failures from connections to itself with other problems. The log lines
 from the remote machine are also below.

 Any thoughts or guesses would be appreciated!

 *HUNG WORKER*

 14/06/18 19:41:18 WARN network.ReceivingConnection: Error reading from
 connection to ConnectionManagerId(172.16.25.103,57626)

 java.io.IOException: Connection reset by peer

 at sun.nio.ch.FileDispatcher.read0(Native Method)

 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)

 at sun.nio.ch.IOUtil.read(IOUtil.java:224)

 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)

 at org.apache.spark.network.ReceivingConnection.read(Connection.scala:496)

 at
 org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

 at java.lang.Thread.run(Thread.java:679)

 14/06/18 19:41:18 INFO network.ConnectionManager: Handling connection
 error on connection to ConnectionManagerId(172.16.25.103,57626)

 14/06/18 19:41:18 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

 14/06/18 19:41:18 INFO network.ConnectionManager: Removing
 SendingConnection to ConnectionManagerId(172.16.25.103,57626)

 14/06/18 19:41:18 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found


 *REMOTE WORKER*

 14/06/18 19:41:18 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.124,55610)

 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found



 On Wed, Jun 18, 2014 at 7:16 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 I have a flow that ends with saveAsTextFile() to HDFS.

 It seems all the expected files per partition have been written out,
 based on the number of part files and the file sizes.

 But the driver logs show 2 tasks still not completed and has no activity
 and the worker logs show no activity for those two tasks for a while now.

 Has anyone run into this situation? It's happened to me a couple of times
 now.

 Thanks.

 -- Suren

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Java IO Stream Corrupted - Invalid Type AC?

2014-06-18 Thread Surendranauth Hiraman
Patrick,

My team is using shuffle consolidation but not speculation. We are also
using persist(DISK_ONLY) for caching.

Here are some config changes that are in our work-in-progress.

We've been trying for 2 weeks to get our production flow (maybe around
50-70 stages, a few forks and joins with up to 20 branches in the forks) to
run end to end without any success, running into other problems besides
this one as well. For example, we have run into situations where saving to
HDFS just hangs on a couple of tasks, which are printing out nothing in
their logs and not taking any CPU. For testing, our input data is 10 GB
across 320 input splits and generates maybe around 200-300 GB of
intermediate and final data.


conf.set(spark.executor.memory, 14g) // TODO make this
configurable

// shuffle configs
conf.set(spark.default.parallelism, 320) // TODO make this
configurable
conf.set(spark.shuffle.consolidateFiles,true)

conf.set(spark.shuffle.file.buffer.kb, 200)
conf.set(spark.reducer.maxMbInFlight, 96)

conf.set(spark.rdd.compress,true

// we ran into a problem with the default timeout of 60 seconds
// this is also being set in the master's spark-env.sh. Not sure if
it needs to be in both places
conf.set(spark.worker.timeout,180)

// akka settings
conf.set(spark.akka.threads, 300)
conf.set(spark.akka.timeout, 180)
conf.set(spark.akka.frameSize, 100)
conf.set(spark.akka.batchSize, 30)
conf.set(spark.akka.askTimeout, 30)

// block manager
conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18)
conf.set(spark.blockManagerHeartBeatMs, 8)

-Suren



On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell pwend...@gmail.com wrote:

 Out of curiosity - are you guys using speculation, shuffle
 consolidation, or any other non-default option? If so that would help
 narrow down what's causing this corruption.

 On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
 suren.hira...@velos.io wrote:
  Matt/Ryan,
 
  Did you make any headway on this? My team is running into this also.
  Doesn't happen on smaller datasets. Our input set is about 10 GB but we
  generate 100s of GBs in the flow itself.
 
  -Suren
 
 
 
 
  On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton compton.r...@gmail.com
 wrote:
 
  Just ran into this today myself. I'm on branch-1.0 using a CDH3
  cluster (no modifications to Spark or its dependencies). The error
  appeared trying to run GraphX's .connectedComponents() on a ~200GB
  edge list (GraphX worked beautifully on smaller data).
 
  Here's the stacktrace (it's quite similar to yours
  https://imgur.com/7iBA4nJ ).
 
  14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
  4 times; aborting job
  14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
  VertexRDD.scala:100
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 5.599:39 failed 4 times, most
  recent failure: Exception failure in TID 29735 on host node18:
  java.io.StreamCorruptedException: invalid type code: AC
 
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
  java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
 
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 
 
 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
 
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 
 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 
 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
 
 org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
 
 
 org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
 
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
 
 
 org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 
 
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  org.apache.spark.scheduler.Task.run(Task.scala:51

Trailing Tasks Saving to HDFS

2014-06-18 Thread Surendranauth Hiraman
I have a flow that ends with saveAsTextFile() to HDFS.

It seems all the expected files per partition have been written out, based
on the number of part files and the file sizes.

But the driver logs show 2 tasks still not completed and has no activity
and the worker logs show no activity for those two tasks for a while now.

Has anyone run into this situation? It's happened to me a couple of times
now.

Thanks.

-- Suren

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Trailing Tasks Saving to HDFS

2014-06-18 Thread Surendranauth Hiraman
Looks like eventually there was some type of reset or timeout and the tasks
have been reassigned. I'm guessing they'll keep failing until max failure
count.

The machine it disconnected from was a remote machine, though I've seen
such failures from connections to itself with other problems. The log lines
from the remote machine are also below.

Any thoughts or guesses would be appreciated!

*HUNG WORKER*

14/06/18 19:41:18 WARN network.ReceivingConnection: Error reading from
connection to ConnectionManagerId(172.16.25.103,57626)

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcher.read0(Native Method)

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)

at sun.nio.ch.IOUtil.read(IOUtil.java:224)

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)

at org.apache.spark.network.ReceivingConnection.read(Connection.scala:496)

at
org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:679)

14/06/18 19:41:18 INFO network.ConnectionManager: Handling connection error
on connection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
SendingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found


*REMOTE WORKER*

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.124,55610)

14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found



On Wed, Jun 18, 2014 at 7:16 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I have a flow that ends with saveAsTextFile() to HDFS.

 It seems all the expected files per partition have been written out, based
 on the number of part files and the file sizes.

 But the driver logs show 2 tasks still not completed and has no activity
 and the worker logs show no activity for those two tasks for a while now.

 Has anyone run into this situation? It's happened to me a couple of times
 now.

 Thanks.

 -- Suren

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: GroupByKey results in OOM - Any other alternative

2014-06-15 Thread Surendranauth Hiraman
Vivek,

If the foldByKey solution doesn't work for you, my team uses
RDD.persist(DISK_ONLY) to avoid OOM errors.

It's slower, of course, and requires tuning other config parameters. It can
also be a problem if you do not have enough disk space, meaning that you
have to unpersist at the right points if you are running long flows.

For us, even though the disk writes are a performance hit, we prefer the
Spark programming model to Hadoop M/R. But we are still working on getting
this to work end to end on 100s of GB of data on our 16-node cluster.

Suren



On Sun, Jun 15, 2014 at 12:08 AM, Vivek YS vivek...@gmail.com wrote:

 Thanks for the input. I will give foldByKey a shot.

 The way I am doing is, data is partitioned hourly. So I am computing
 distinct values hourly. Then I use unionRDD to merge them and compute
 distinct on the overall data.

  Is there a way to know which key,value pair is resulting in the OOM ?
  Is there a way to set parallelism in the map stage so that, each worker
 will process one key at time. ?

 I didn't realise countApproxDistinctByKey is using hyperloglogplus. This
 should be interesting.

 --Vivek


 On Sat, Jun 14, 2014 at 11:37 PM, Sean Owen so...@cloudera.com wrote:

 Grouping by key is always problematic since a key might have a huge
 number of values. You can do a little better than grouping *all* values and
 *then* finding distinct values by using foldByKey, putting values into a
 Set. At least you end up with only distinct values in memory. (You don't
 need two maps either, right?)

 If the number of distinct values is still huge for some keys, consider
 the experimental method countApproxDistinctByKey:
 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L285

 This should be much more performant at the cost of some accuracy.


 On Sat, Jun 14, 2014 at 1:58 PM, Vivek YS vivek...@gmail.com wrote:

 Hi,
For last couple of days I have been trying hard to get around this
 problem. Please share any insights on solving this problem.

 Problem :
 There is a huge list of (key, value) pairs. I want to transform this to
 (key, distinct values) and then eventually to (key, distinct values count)

 On small dataset

 groupByKey().map( x = (x_1, x._2.distinct)) ...map(x = (x_1,
 x._2.distinct.count))

 On large data set I am getting OOM.

 Is there a way to represent Seq of values from groupByKey as RDD and
 then perform distinct over it ?

 Thanks
 Vivek






-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: long GC pause during file.cache()

2014-06-15 Thread Surendranauth Hiraman
Is SPARK_DAEMON_JAVA_OPTS valid in 1.0.0?



On Sun, Jun 15, 2014 at 4:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  SPARK_JAVA_OPTS is deprecated in 1.0, though it works fine if you
 don’t mind the WARNING in the logs

 you can set spark.executor.extraJavaOpts in your SparkConf obj

 Best,

 --
 Nan Zhu

 On Sunday, June 15, 2014 at 12:13 PM, Hao Wang wrote:

 Hi, Wei

 You may try to set JVM opts in *spark-env.sh http://spark-env.sh* as
 follow to prevent or mitigate GC pause:

 export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC
 -Xmx2g -XX:MaxPermSize=256m

 There are more options you could add, please just Google :)


 Regards,
 Wang Hao(王灏)

 CloudTeam | School of Software Engineering
 Shanghai Jiao Tong University
 Address:800 Dongchuan Road, Minhang District, Shanghai, 200240
 Email:wh.s...@gmail.com


 On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote:

 Hi,

   I have a single node (192G RAM) stand-alone spark, with memory
 configuration like this in spark-env.sh

 SPARK_WORKER_MEMORY=180g
 SPARK_MEM=180g


  In spark-shell I have a program like this:

 val file = sc.textFile(/localpath) //file size is 40G
 file.cache()


 val output = file.map(line = extract something from line)

 output.saveAsTextFile (...)


 When I run this program again and again, or keep trying file.unpersist()
 -- file.cache() -- output.saveAsTextFile(), the run time varies a lot,
 from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min,
 from the stage monitoring GUI I observe big GC pause (some can be 10+ min).
 Of course when run-time is normal, say ~1 min, no significant GC is
 observed. The behavior seems somewhat random.

 Is there any JVM tuning I should do to prevent this long GC pause from
 happening?



 I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something
 like this:

 root 10994  1.7  0.6 196378000 1361496 pts/51 Sl+ 22:06   0:12
 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp
 ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar
 -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g
 org.apache.spark.deploy.SparkSubmit spark-shell --class
 org.apache.spark.repl.Main

 Best regards,
 Wei

 -
 Wei Tan, PhD
 Research Staff Member
 IBM T. J. Watson Research Center
 *http://researcher.ibm.com/person/us-wtan*
 http://researcher.ibm.com/person/us-wtan






-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Error During ReceivingConnection

2014-06-11 Thread Surendranauth Hiraman
It looks like this was due to another executor on a different node closing
the connection on its side. I found the entries below in the remote side's
logs.

Can anyone comment on why one ConnectionManager would close its connection
to another node and what could be tuned to avoid this? It did not have any
errors on its side.


This is from the ConnectionManager on the side shutting down the
connection, not the ConnectionManager that had the Connection Reset By
Peer.

14/06/10 18:51:14 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.125,45610)

14/06/10 18:51:14 INFO network.ConnectionManager: Removing
SendingConnection to ConnectionManagerId(172.16.25.125,45610)




On Wed, Jun 11, 2014 at 8:38 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I have a somewhat large job (10 GB input data but generates about 500 GB
 of data after many stages).

 Most tasks completed but a few stragglers on the same node/executor are
 still active (but doing nothing) after about 16 hours.

 At about 3 to 4 hours in, the tasks that are hanging have the following in
 the work logs.

 Any idea what config to tweak for this?


 14/06/10 18:51:10 WARN network.ReceivingConnection: Error reading from
 connection to ConnectionManagerId(172.16.25.108,37693)
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcher.read0(Native Method)
  at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
  at sun.nio.ch.IOUtil.read(IOUtil.java:224)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
  at
 org.apache.spark.network.ReceivingConnection.read(Connection.scala:534)
 at
 org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
  at java.lang.Thread.run(Thread.java:679)
 14/06/10 18:51:10 INFO network.ConnectionManager: Handling connection
 error on connection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 SendingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/06/10 18:51:10 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.108,37693)
 14/06/10 18:51:10 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/06/10 18:51:14 WARN network.ReceivingConnection: Error reading from
 connection to ConnectionManagerId(172.16.25.97,54918)
 java.io.IOException: Connection reset by peer
  at sun.nio.ch.FileDispatcher.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
  at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
 at sun.nio.ch.IOUtil.read(IOUtil.java:224)
  at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
 at org.apache.spark.network.ReceivingConnection.read(Connection.scala:534)
  at
 org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
 at java.lang.Thread.run(Thread.java:679)
 14/06/10 18:51:14 INFO network.ConnectionManager: Handling connection
 error on connection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 SendingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found
 14/06/10 18:51:14 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(172.16.25.97,54918)
 14/06/10 18:51:14 ERROR network.ConnectionManager: Corresponding
 SendingConnectionManagerId not found

 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira

Re: Spark Logging

2014-06-10 Thread Surendranauth Hiraman
Event logs are different from writing using a logger, like log4j. The event
logs are the type of data showing up in the history server.

For my team, we use com.typesafe.scalalogging.slf4j.Logging. Our logs show
up in /etc/spark/work/app-id/executor-id/stderr and stdout.

All of our logging seems to show up in stderr.

-Suren




On Tue, Jun 10, 2014 at 2:56 PM, coderxiang shuoxiang...@gmail.com wrote:

 By default, the logs are available at `/tmp/spark-events`. You can specify
 the log directory via spark.eventLog.dir, see  this configuration page
 http://spark.apache.org/docs/latest/configuration.html  .



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Logging-tp7340p7343.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


FileNotFoundException when using persist(DISK_ONLY)

2014-06-09 Thread Surendranauth Hiraman
I have a dataset of about 10GB. I am using persist(DISK_ONLY) to avoid out
of memory issues when running my job.

When I run with a dataset of about 1 GB, the job is able to complete.

But when I run with the larger dataset of 10 GB, I get the following
error/stacktrace, which seems to be happening when the RDD is writing out
to disk.

Anyone have any ideas as to what is going on or if there is a setting I can
tune?


14/06/09 21:33:55 ERROR executor.Executor: Exception in task ID 560
java.io.FileNotFoundException:
/tmp/spark-local-20140609210741-0bb8/14/rdd_331_175 (No such file or
directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:209)
at java.io.FileOutputStream.init(FileOutputStream.java:160)
at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:79)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:698)
at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:95)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:679)

-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: FileNotFoundException when using persist(DISK_ONLY)

2014-06-09 Thread Surendranauth Hiraman
I don't know if this is related but a little earlier in stderr, I also have
the following stacktrace. But this stacktrace seems to be when the code is
grabbing RDD data from a remote node, which is different from the above.


14/06/09 21:33:26 ERROR executor.ExecutorUncaughtExceptionHandler: Uncaught
exception in thread Thread[Executor task launch worker-16,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:329)
at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94)
at
org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176)
at
org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63)
at
org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109)
at
org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128)
at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489)
at
org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487)
at org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:473)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:513)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)



On Mon, Jun 9, 2014 at 10:05 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I have a dataset of about 10GB. I am using persist(DISK_ONLY) to avoid out
 of memory issues when running my job.

 When I run with a dataset of about 1 GB, the job is able to complete.

 But when I run with the larger dataset of 10 GB, I get the following
 error/stacktrace, which seems to be happening when the RDD is writing out
 to disk.

 Anyone have any ideas as to what is going on or if there is a setting I
 can tune?


 14/06/09 21:33:55 ERROR executor.Executor: Exception in task ID 560
 java.io.FileNotFoundException:
 /tmp/spark-local-20140609210741-0bb8/14/rdd_331_175 (No such file or
 directory)
  at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:209)
  at java.io.FileOutputStream.init(FileOutputStream.java:160)
 at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:79)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:698)
 at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:95)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:679)

 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F

Re: FileNotFoundException when using persist(DISK_ONLY)

2014-06-09 Thread Surendranauth Hiraman
Sorry for the stream of consciousness but after thinking about this a bit
more, I'm thinking that the FileNotFoundExceptions are due to tasks being
cancelled/restarted and the root cause is the OutOfMemoryError.

If anyone has any insights on how to debug this more deeply or relevant
config settings, that would be much appreciated.

Otherwise, I figure next steps would be to enable more debugging levels in
the spark code to see what much memory the code is trying to allocate. At
this point, I'm wondering if the block could be in the GB range.

-Suren




On Mon, Jun 9, 2014 at 10:27 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I don't know if this is related but a little earlier in stderr, I also
 have the following stacktrace. But this stacktrace seems to be when the
 code is grabbing RDD data from a remote node, which is different from the
 above.


 14/06/09 21:33:26 ERROR executor.ExecutorUncaughtExceptionHandler:
 Uncaught exception in thread Thread[Executor task launch worker-16,5,main]
 java.lang.OutOfMemoryError: Java heap space
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:329)
  at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:94)
 at
 org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176)
  at
 org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63)
 at
 org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109)
  at
 org.apache.spark.storage.BlockManagerWorker$.syncGetBlock(BlockManagerWorker.scala:128)
 at
 org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:489)
  at
 org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:487)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:487)
  at
 org.apache.spark.storage.BlockManager.getRemote(BlockManager.scala:473)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:513)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:39)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)



 On Mon, Jun 9, 2014 at 10:05 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 I have a dataset of about 10GB. I am using persist(DISK_ONLY) to avoid
 out of memory issues when running my job.

 When I run with a dataset of about 1 GB, the job is able to complete.

 But when I run with the larger dataset of 10 GB, I get the following
 error/stacktrace, which seems to be happening when the RDD is writing out
 to disk.

 Anyone have any ideas as to what is going on or if there is a setting I
 can tune?


 14/06/09 21:33:55 ERROR executor.Executor: Exception in task ID 560
 java.io.FileNotFoundException:
 /tmp/spark-local-20140609210741-0bb8/14/rdd_331_175 (No such file or
 directory)
  at java.io.FileOutputStream.open(Native Method)
 at java.io.FileOutputStream.init(FileOutputStream.java:209)
  at java.io.FileOutputStream.init(FileOutputStream.java:160)
 at org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:79)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:698)
 at org.apache.spark.storage.BlockManager.put(BlockManager.scala:546)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:95)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99

Re: Spark 1.0.0 - Java 8

2014-05-30 Thread Surendranauth Hiraman
With respect to virtual hosts, my team uses Vagrant/Virtualbox. We have 3
CentOS VMs with 4 GB RAM each - 2 worker nodes and a master node.

Everything works fine, though if you are using MapR, you have to make sure
they are all on the same subnet.

-Suren



On Fri, May 30, 2014 at 12:20 PM, Upender Nimbekar upent...@gmail.com
wrote:

 Great News ! I've been awaiting this release to start doing some coding
 with Spark using Java 8. Can I run Spark 1.0 examples on a virtual host
 with 16 GB ram and fair descent amount of hard disk ? Or do I reaaly need
 to use a cluster of machines.
 Second, are there any good exmaples of using MLIB on Spark. Please shoot
 me in the right direction.

 Thanks
 Upender

 On Fri, May 30, 2014 at 6:12 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 I'm thrilled to announce the availability of Spark 1.0.0! Spark 1.0.0
 is a milestone release as the first in the 1.0 line of releases,
 providing API stability for Spark's core interfaces.

 Spark 1.0.0 is Spark's largest release ever, with contributions from
 117 developers. I'd like to thank everyone involved in this release -
 it was truly a community effort with fixes, features, and
 optimizations contributed from dozens of organizations.

 This release expands Spark's standard libraries, introducing a new SQL
 package (SparkSQL) which lets users integrate SQL queries into
 existing Spark workflows. MLlib, Spark's machine learning library, is
 expanded with sparse vector support and several new algorithms. The
 GraphX and Streaming libraries also introduce new features and
 optimizations. Spark's core engine adds support for secured YARN
 clusters, a unified tool for submitting Spark applications, and
 several performance and stability improvements. Finally, Spark adds
 support for Java 8 lambda syntax and improves coverage of the Java and
 Python API's.

 Those features only scratch the surface - check out the release notes
 here:
 http://spark.apache.org/releases/spark-release-1-0-0.html

 Note that since release artifacts were posted recently, certain
 mirrors may not have working downloads for a few hours.

 - Patrick





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: maprfs and spark libraries

2014-05-26 Thread Surendranauth Hiraman
My team is successfully running on Spark on MapR.

However, we add the mapr jars to the SPARK_CLASSAPTH on the workers, as
well as making sure they are on the classpath of the driver.

I'm not sure if we need every jar that we currently add but below is what
we currently use. The important file in the conf directory
is mapr-clusters.conf.

I do not think we add the jars to the classpath via addJars and prefer to
reference them from the mapr installation. This is partly because we
colocate our workers on our mapr worker nodes.

export
SPARK_CLASSPATH=/opt/mapr/hadoop/hadoop-0.20.2/conf:/opt/mapr/hadoop/hadoop-0.20.2/lib/hadoop-0.20.2-dev-core.jar:/opt/mapr/lib/maprfs-1.0.3-mapr-3.0.2.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/commons-logging-1.0.4.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/maprfs-1.0.3-mapr-3.0.2.jar:/opt/mapr/hadoop/hadoop-0.20.2/lib/zookeeper-3.3.6.jar




On Mon, May 26, 2014 at 10:50 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Did you try in standalone mode. You may not see serialization issues in
 local threaded mode.
 Serialization errors are unlikely to be cause of Mapr hadoop version.
 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Mon, May 26, 2014 at 3:18 PM, nelson nelson.verd...@ysance.com wrote:

 Hi all,

 I am trying to run spark over a MapR cluster. I successfully ran several
 custom applications on a previous non-mapr hadoop cluster but i can't get
 them working on the mapr one. To be more specific, i am not able to read
 or
 write on mfs without running into a serialization error from Java. Note
 that
 everything works fine when i am running the app in local mode which make
 me
 think of a dependancy error.

 The test application is built using sbt with the following dependency:
  - org.apache.spark spark-core 0.9.1

 In my test_app/lib directory i have:
  - hadoop-core-1.0.3-mapr-3.0.2.jar
  - json-20140107.jar
  - maprfs-1.0.3-mapr-3.0.2.jar

 Finally, i add those jars with conf.setJars so that they are distributed
 on
 the cluster.

 Am I compiling with the wrong dependencies? Should i get a mapr version
 of
 spark-core?

 Regards, Nelson



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: maprfs and spark libraries

2014-05-26 Thread Surendranauth Hiraman
We use the mapr rpm and have successfully read and written hdfs data.

Are you using custom readers/writers? Maybe the relevant stacktrace might
help.

Maybe also try a standard text reader and writer to see if there is a basic
issue with accessing mfs?

-Suren



On Mon, May 26, 2014 at 11:31 AM, nelson nelson.verd...@ysance.com wrote:

 thanks for replying guys.

 Mayur:
 Indeed i tried the local mode (sparkmaster: local[5]) before and the
 application runs well, no serialization problem. The problem arises as soon
 as i try to run the app over the cluster.

 Surendranauth:
 I just double checked my spark_classpath from spark_env.sh and we have a
 similar one.
 Is your team running spark with the mapr rpm [1]? Or did you compile it
 yourself?

 [1]:  http://doc.mapr.com/display/MapR/Installing+Spark+and+Shark
 http://doc.mapr.com/display/MapR/Installing+Spark+and+Shark

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392p6400.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: maprfs and spark libraries

2014-05-26 Thread Surendranauth Hiraman
When I have stack traces, I usually see the MapR versions of the various
hadoop classes, though maybe that's at a deeper level of the stack trace.

If my memory is right though, this may point to the classpath having
regular hadoop jars before the standard hadoop jars. My guess is that this
is on the driver side, if so.

I had to make sure to put the MapR jars as the very first entries on my
driver classpath.

-Suren



On Mon, May 26, 2014 at 12:11 PM, nelson nelson.verd...@ysance.com wrote:

 I am using standard readers and writers i believe.
 When i locally run the app, spark is able to write on hdfs. Then i assume
 accessing and reading mfs is doable.

 Here is the piece of code i use for testing:
 /val list = List (dad, mum, brother , sister)
 val mlist = sc.parallelize(list)
 mlist.saveAsTextFile(maprfs:///user/nelson/test)/

 and the stack trace:
 /14/05/26 16:02:54 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.NullPointerException
 java.lang.NullPointerException
 at org.apache.hadoop.fs.FileSystem.fixName(FileSystem.java:187)
 at
 org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:123)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:115)
 at
 org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:617)
 at

 org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:439)
 at

 org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:412)
 at
 org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:391)
 at
 org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:391)
 at

 org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$1.apply(HadoopRDD.scala:111)
 at

 org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$1.apply(HadoopRDD.scala:111)
 at scala.Option.map(Option.scala:145)
 at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:154)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 at
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)/

 The uri name seems to be the issue now as i happen to get rid of the
 serialization issue.

 Regards,
 Nelson



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/maprfs-and-spark-libraries-tp6392p6402.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Anyone using value classes in RDDs?

2014-04-20 Thread Surendranauth Hiraman
If the purpose is only aliasing, rather than adding additional methods and
avoiding runtime allocation, what about type aliases?

type ID = String
type Name = String


On Sat, Apr 19, 2014 at 9:26 PM, kamatsuoka ken...@gmail.com wrote:

 No, you can wrap other types in value classes as well.  You can try it in
 the REPL:

 scala case class ID(val id: String) extends AnyVal
 defined class ID
 scala val i = ID(foo)
 i: ID = ID(foo)


 On Fri, Apr 18, 2014 at 4:14 PM, Koert Kuipers [via Apache Spark User
 List] [hidden email] 
 http://user/SendEmail.jtp?type=nodenode=4494i=0wrote:

 isn't valueclasses for primitives (AnyVal) only? that doesn't apply to
 string, which is an object (AnyRef)


 On Fri, Apr 18, 2014 at 2:51 PM, kamatsuoka [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=4475i=0
  wrote:

 I'm wondering if anyone has tried using value classes in RDDs?  My use
 case
 is that I have a number of RDDs containing strings, e.g.

 val r1: RDD[(String, (String, Int)] = ...
 val r2: RDD[(String, (String, Int)] = ...

 and it might be clearer if I wrote

 case class ID(val id: String) extends AnyVal
 case class Name(val id: String) extends AnyVal
 val r1: RDD[(ID, (Name, Int)] = ...
 val r2: RDD[(Name, (ID, Int)] = ...

 This seems like a pretty typical use case for value classes, but I
 haven't
 noticed anyone talking about it.  Although, I think you'd have to read
 through all of the Spark code paths to know whether allocation is
 required
 (http://docs.scala-lang.org/overviews/core/value-classes.html), so some
 comparative performance testing would be called for.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4475.html
  To unsubscribe from Anyone using value classes in RDDs?, click here.
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
 Kenji

 --
 View this message in context: Re: Anyone using value classes in 
 RDDs?http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4494.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Anyone using value classes in RDDs?

2014-04-20 Thread Surendranauth Hiraman
Oh, sorry, I think your point was probably you wouldn't need runtime
allocation.

I guess that is the key question. I would be interested if this works for
you.

-Suren



On Sun, Apr 20, 2014 at 9:18 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 If the purpose is only aliasing, rather than adding additional methods and
 avoiding runtime allocation, what about type aliases?

 type ID = String
 type Name = String


 On Sat, Apr 19, 2014 at 9:26 PM, kamatsuoka ken...@gmail.com wrote:

 No, you can wrap other types in value classes as well.  You can try it in
 the REPL:

 scala case class ID(val id: String) extends AnyVal
 defined class ID
  scala val i = ID(foo)
 i: ID = ID(foo)


 On Fri, Apr 18, 2014 at 4:14 PM, Koert Kuipers [via Apache Spark User
 List] [hidden email] http://user/SendEmail.jtp?type=nodenode=4494i=0
  wrote:

 isn't valueclasses for primitives (AnyVal) only? that doesn't apply to
 string, which is an object (AnyRef)


 On Fri, Apr 18, 2014 at 2:51 PM, kamatsuoka [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=4475i=0
  wrote:

 I'm wondering if anyone has tried using value classes in RDDs?  My use
 case
 is that I have a number of RDDs containing strings, e.g.

 val r1: RDD[(String, (String, Int)] = ...
 val r2: RDD[(String, (String, Int)] = ...

 and it might be clearer if I wrote

 case class ID(val id: String) extends AnyVal
 case class Name(val id: String) extends AnyVal
 val r1: RDD[(ID, (Name, Int)] = ...
 val r2: RDD[(Name, (ID, Int)] = ...

 This seems like a pretty typical use case for value classes, but I
 haven't
 noticed anyone talking about it.  Although, I think you'd have to read
 through all of the Spark code paths to know whether allocation is
 required
 (http://docs.scala-lang.org/overviews/core/value-classes.html), so some
 comparative performance testing would be called for.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4475.html
  To unsubscribe from Anyone using value classes in RDDs?, click here.
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
 Kenji

 --
 View this message in context: Re: Anyone using value classes in 
 RDDs?http://apache-spark-user-list.1001560.n3.nabble.com/Anyone-using-value-classes-in-RDDs-tp4464p4494.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.




 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: standalone vs YARN

2014-04-15 Thread Surendranauth Hiraman
Prashant,

In another email thread several weeks ago, it was mentioned that YARN
support is considered beta until Spark 1.0. Is that not the case?

-Suren



On Tue, Apr 15, 2014 at 8:38 AM, Prashant Sharma scrapco...@gmail.comwrote:

 Hi Ishaaq,

 answers inline from what I know, I had like to be corrected though.

 On Tue, Apr 15, 2014 at 5:58 PM, ishaaq ish...@gmail.com wrote:

 Hi all,
 I am evaluating Spark to use here at my work.

 We have an existing Hadoop 1.x install which I planning to upgrade to
 Hadoop
 2.3.

 This is not really a requirement for spark, if you are doing for some
 other reason great !


 I am trying to work out whether I should install YARN or simply just
 setup a
 Spark standalone cluster. We already use ZooKeeper so it isn't a problem
 to
 setup HA. I am puzzled however as to how the Spark nodes can coordinate on
 data locality - i.e., assuming I install the nodes on the same machines as
 the DFS data nodes, I don't understand how Spark can work out which nodes
 should get which splits of the jobs?

 This happens exactly the same way hadoop's mapreduce figures out data
 locality. Since we support hadoop's inputformats(which also has the
 information on how data is partitioned) etc. So having spark workers share
 the same nodes as your DFS is a good idea.


 Anyway, my bigger question remains: YARN or standalone? Which is the more
 stable option currently? Which is the more future-proof option?


 Well I think standalone is stable enough for all purposes and Spark's yarn
 support has been keeping up with latest hadoop versions too. It depends on
 the fact that if you are already using yarn and don't want the hassle of
 setting up another cluster manager you can probably prefer yarn.


 Thanks,
 Ishaaq



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/standalone-vs-YARN-tp4271.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Spark - ready for prime time?

2014-04-11 Thread Surendranauth Hiraman
Excellent, thanks you.



On Fri, Apr 11, 2014 at 12:09 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 It's not a new API, it just happens underneath the current one if you have
 spark.shuffle.spill set to true (which it is by default). Take a look at
 the config settings that mention spill in
 http://spark.incubator.apache.org/docs/latest/configuration.html.

 Matei

 On Apr 11, 2014, at 7:02 AM, Surendranauth Hiraman suren.hira...@velos.io
 wrote:

 Matei,

 Where is the functionality in 0.9 to spill data within a task (separately
 from persist)? My apologies if this is something obvious but I don't see it
 in the api docs.

 -Suren



 On Thu, Apr 10, 2014 at 3:59 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 To add onto the discussion about memory working space, 0.9 introduced the
 ability to spill data within a task to disk, and in 1.0 we're also changing
 the interface to allow spilling data within the same *group* to disk (e.g.
 when you do groupBy and get a key with lots of values). The main reason
 these weren't there was that for a lot of workloads (everything except the
 same key having lots of values), simply launching more reduce tasks was
 also a good solution, because it results in an external sort across the
 cluster similar to what would happen within a task.

 Overall, expect to see more work to both explain how things execute (
 http://spark.incubator.apache.org/docs/latest/tuning.html is one
 example, the monitoring UI is another) and try to make things require no
 configuration out of the box. We're doing a lot of this based on user
 feedback, so that's definitely appreciated.

 Matei

 On Apr 10, 2014, at 10:33 AM, Dmitriy Lyubimov dlie...@gmail.com wrote:

 On Thu, Apr 10, 2014 at 9:24 AM, Andrew Ash and...@andrewash.com wrote:

 The biggest issue I've come across is that the cluster is somewhat
 unstable when under memory pressure.  Meaning that if you attempt to
 persist an RDD that's too big for memory, even with MEMORY_AND_DISK, you'll
 often still get OOMs.  I had to carefully modify some of the space tuning
 parameters and GC settings to get some jobs to even finish.

 The other issue I've observed is if you group on a key that is highly
 skewed, with a few massively-common keys and a long tail of rare keys, the
 one massive key can be too big for a single machine and again cause OOMs.


 My take on it -- Spark doesn't believe in sort-and-spill things to enable
 super long groups, and IMO for a good reason. Here are my thoughts:

 (1) in my work i don't need sort in 99% of the cases, i only need
 group which absolutely doesn't need the spill which makes things slow
 down to a crawl.
 (2) if that's an aggregate (such as group count), use combine(), not
 groupByKey -- this will do tons of good on memory use.
 (3) if you really need groups that don't fit into memory, that is always
 because you want to do something that is other than aggregation, with them.
 E,g build an index of that grouped data. we actually had a case just like
 that. In this case your friend is really not groupBy, but rather
 PartitionBy. I.e. what happens there you build a quick count sketch,
 perhaps on downsampled data, to figure which keys have sufficiently big
 count -- and then you build a partitioner that redirects large groups to a
 dedicated map(). assuming this map doesn't try to load things in memory but
 rather do something like streaming BTree build, that should be fine. In
 certain cituations such processing may require splitting super large group
 even into smaller sub groups (e.g. partitioned BTree structure), at which
 point you should be fine even from uniform load point of view. It takes a
 little of jiu-jitsu to do it all, but it is not Spark's fault here, it did
 not promise do this all for you in the groupBy contract.




 I'm hopeful that off-heap caching (Tachyon) could fix some of these
 issues.

 Just my personal experience, but I've observed significant improvements
 in stability since even the 0.7.x days, so I'm confident that things will
 continue to get better as long as people report what they're seeing so it
 can get fixed.

 Andrew


 On Thu, Apr 10, 2014 at 4:08 PM, Alex Boisvert 
 alex.boisv...@gmail.comwrote:

 I'll provide answers from our own experience at Bizo.  We've been using
 Spark for 1+ year now and have found it generally better than previous
 approaches (Hadoop + Hive mostly).



 On Thu, Apr 10, 2014 at 7:11 AM, Andras Nemeth 
 andras.nem...@lynxanalytics.com wrote:

 I. Is it too much magic? Lots of things just work right in Spark and
 it's extremely convenient and efficient when it indeed works. But should 
 we
 be worried that customization is hard if the built in behavior is not 
 quite
 right for us? Are we to expect hard to track down issues originating from
 the black box behind the magic?


 I think is goes back to understanding Spark's architecture, its design
 constraints and the problems it explicitly set out to address.   If the
 solution

Re: Spark Disk Usage

2014-04-07 Thread Surendranauth Hiraman
Hi,

Any thoughts on this? Thanks.

-Suren



On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 Hi,

 I know if we call persist with the right options, we can have Spark
 persist an RDD's data on disk.

 I am wondering what happens in intermediate operations that could
 conceivably create large collections/Sequences, like GroupBy and shuffling.

 Basically, one part of the question is when is disk used internally?

 And is calling persist() on the RDD returned by such transformations what
 let's it know to use disk in those situations? Trying to understand if
 persist() is applied during the transformation or after it.

 Thank you.


 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


PySpark SocketConnect Issue in Cluster

2014-04-07 Thread Surendranauth Hiraman
Hi,

We have a situation where a Pyspark script works fine as a local process
(local url) on the Master and the Worker nodes, which would indicate that
all python dependencies are set up properly on each machine.

But when we try to run the script at the cluster level (using the master's
url), if fails partway through the flow on a GroupBy with a SocketConnect
error and python crashes.

This is on ec2 using the AMI. This doesn't seem to be an issue of the
master not seeing the workers, since they show up in the web ui.

Also, we can see the job running on the cluster until it reaches the
GroupBy transform step, which is when we get the SocketConnect error.

Any ideas?

-Suren


SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Spark Disk Usage

2014-04-07 Thread Surendranauth Hiraman
It might help if I clarify my questions. :-)

1. Is persist() applied during the transformation right before the
persist() call in the graph? Or is is applied after the transform's
processing is complete? In the case of things like GroupBy, is the Seq
backed by disk as it is being created? We're trying to get a sense of how
the processing is handled behind the scenes with respect to disk.

2. When else is disk used internally?

Any pointers are appreciated.

-Suren




On Mon, Apr 7, 2014 at 8:46 AM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 Hi,

 Any thoughts on this? Thanks.

 -Suren



 On Thu, Apr 3, 2014 at 8:27 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Hi,

 I know if we call persist with the right options, we can have Spark
 persist an RDD's data on disk.

 I am wondering what happens in intermediate operations that could
 conceivably create large collections/Sequences, like GroupBy and shuffling.

 Basically, one part of the question is when is disk used internally?

 And is calling persist() on the RDD returned by such transformations what
 let's it know to use disk in those situations? Trying to understand if
 persist() is applied during the transformation or after it.

 Thank you.


 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Spark Disk Usage

2014-04-03 Thread Surendranauth Hiraman
Hi,

I know if we call persist with the right options, we can have Spark persist
an RDD's data on disk.

I am wondering what happens in intermediate operations that could
conceivably create large collections/Sequences, like GroupBy and shuffling.

Basically, one part of the question is when is disk used internally?

And is calling persist() on the RDD returned by such transformations what
let's it know to use disk in those situations? Trying to understand if
persist() is applied during the transformation or after it.

Thank you.


SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Accessing the reduce key

2014-03-20 Thread Surendranauth Hiraman
Mayur,

To be a little clearer, for creating the Bloom Filters, I don't think
broadcast variables are the way to go, though definitely that would work
for using the Bloom Filters to filter data.

The reason why is that the creation needs to happen in a single thread.
Otherwise, some type of locking/distributed locking is needed on the
individual Bloom Filter itself, with performance impact.

Agreed?

-Suren




On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 Mayur,

 Thanks. This step is for creating the Bloom Filter, not using it to filter
 data, actually. But your answer still stands.

 Partitioning by key, having the bloom filters as a broadcast variable and
 then doing mappartition makes sense.

 Are there performance implications for this approach, such as with using
 the broadcast variable, versus the approach we used, in which the Bloom
 Filter (again, for creating it) is only referenced by the single map
 application?

 -Suren





 On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Why are you trying to reducebyKey? Are you looking to work on the data
 sequentially.
 If I understand correctly you are looking to filter your data using the
 bloom filter  each bloom filter is tied to which key is instantiating it.
 Following are some of the options
 *partiition* your data by key  use mappartition operator to run function
 on partition independently. The same function will be applied to each
 partition.
 If your bloomfilter is large then you can bundle all of them in as a
 broadcast variable  use it to apply the transformation on your data using
 a simple map operation, basically you are looking up the right bloom filter
 on each key  applying the filter on it, again here if unserializing bloom
 filter is time consuming then you can partition the data on key  then use
 the broadcast variable to look up the bloom filter for each key  apply
 filter on all data in serial.

 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 We ended up going with:

 map() - set the group_id as the key in a Tuple
 reduceByKey() - end up with (K,Seq[V])
 map() - create the bloom filter and loop through the Seq and persist the
 Bloom filter

 This seems to be fine.

 I guess Spark cannot optimize the reduceByKey and map steps to occur
 together since the fact that we are looping through the Seq is out of
 Spark's control.

 -Suren




 On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Hi,

 My team is trying to replicate an existing Map/Reduce process in Spark.

 Basically, we are creating Bloom Filters for quick set membership tests
 within our processing pipeline.

 We have a single column (call it group_id) that we use to partition
 into sets.

 As you would expect, in the map phase, we emit the group_id as the key
 and in the reduce phase, we instantiate the Bloom Filter for a given key in
 the setup() method and persist that Bloom Filter in the cleanup() method.

 In Spark, we can do something similar with map() and reduceByKey() but
 we have the following questions.


 1. Accessing the reduce key
 In reduceByKey(), how do we get access to the specific key within the
 reduce function?


 2. Equivalent of setup/cleanup
 Where should we instantiate and persist each Bloom Filter by key? In
 the driver and then pass in the references to the reduce function? But if
 so, how does the reduce function know which set's Bloom Filter it should be
 writing to (question 1 above)?

 It seems if we use groupByKey and then reduceByKey, that gives us
 access to all of the values at one go. I assume there, Spark will manage if
 those values all don't fit in memory in one go.



 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Accessing the reduce key

2014-03-20 Thread Surendranauth Hiraman
Grouped by the group_id but not sorted.

-Suren



On Thu, Mar 20, 2014 at 5:52 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 You are using the data grouped (sorted?) To create the bloom filter ?
 On Mar 20, 2014 4:35 PM, Surendranauth Hiraman suren.hira...@velos.io
 wrote:

 Mayur,

 To be a little clearer, for creating the Bloom Filters, I don't think
 broadcast variables are the way to go, though definitely that would work
 for using the Bloom Filters to filter data.

 The reason why is that the creation needs to happen in a single thread.
 Otherwise, some type of locking/distributed locking is needed on the
 individual Bloom Filter itself, with performance impact.

 Agreed?

 -Suren




 On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Mayur,

 Thanks. This step is for creating the Bloom Filter, not using it to
 filter data, actually. But your answer still stands.

 Partitioning by key, having the bloom filters as a broadcast variable
 and then doing mappartition makes sense.

 Are there performance implications for this approach, such as with using
 the broadcast variable, versus the approach we used, in which the Bloom
 Filter (again, for creating it) is only referenced by the single map
 application?

 -Suren





 On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi 
 mayur.rust...@gmail.comwrote:

 Why are you trying to reducebyKey? Are you looking to work on the data
 sequentially.
 If I understand correctly you are looking to filter your data using the
 bloom filter  each bloom filter is tied to which key is instantiating it.
 Following are some of the options
 *partiition* your data by key  use mappartition operator to run
 function on partition independently. The same function will be applied to
 each partition.
 If your bloomfilter is large then you can bundle all of them in as a
 broadcast variable  use it to apply the transformation on your data using
 a simple map operation, basically you are looking up the right bloom filter
 on each key  applying the filter on it, again here if unserializing bloom
 filter is time consuming then you can partition the data on key  then use
 the broadcast variable to look up the bloom filter for each key  apply
 filter on all data in serial.

 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 We ended up going with:

 map() - set the group_id as the key in a Tuple
 reduceByKey() - end up with (K,Seq[V])
 map() - create the bloom filter and loop through the Seq and persist
 the Bloom filter

 This seems to be fine.

 I guess Spark cannot optimize the reduceByKey and map steps to occur
 together since the fact that we are looping through the Seq is out of
 Spark's control.

 -Suren




 On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Hi,

 My team is trying to replicate an existing Map/Reduce process in
 Spark.

 Basically, we are creating Bloom Filters for quick set membership
 tests within our processing pipeline.

 We have a single column (call it group_id) that we use to partition
 into sets.

 As you would expect, in the map phase, we emit the group_id as the
 key and in the reduce phase, we instantiate the Bloom Filter for a given
 key in the setup() method and persist that Bloom Filter in the cleanup()
 method.

 In Spark, we can do something similar with map() and reduceByKey()
 but we have the following questions.


 1. Accessing the reduce key
 In reduceByKey(), how do we get access to the specific key within the
 reduce function?


 2. Equivalent of setup/cleanup
 Where should we instantiate and persist each Bloom Filter by key? In
 the driver and then pass in the references to the reduce function? But if
 so, how does the reduce function know which set's Bloom Filter it should 
 be
 writing to (question 1 above)?

 It seems if we use groupByKey and then reduceByKey, that gives us
 access to all of the values at one go. I assume there, Spark will manage 
 if
 those values all don't fit in memory in one go.



 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine