Re: [ML] MLeap: Deploy Spark ML Pipelines w/o SparkContext
to date, i haven't seen very good performance coming from mleap. i believe ram from databricks keeps getting you guys on stage at the spark summits, but i've been unimpressed with the performance numbers - as well as your choice to reimplement own non-standard "pmml-like" mechanism which incurs heavy technical debt on the development side. creating technical debt is a very databricks-like thing as seen in their own product - so it's no surprise that databricks supports and encourages this type of engineering effort. @hollin: please correct me if i'm wrong, but the numbers you guys have quoted in the past are at very low scale. at one point you were quoting 40-50ms which is pretty bad. 11ms is better, but these are all at low scale which is not good. i'm not sure where the 2-3ms numbers are coming from, but even that is not realistic in most real-world scenarios at scale. checkout our 100% open source solution to this exact problem starting at http://pipeline.io. you'll find links to the github repo, youtube demos, and slideshare conference talks, online training, and lots more. our entire focus at PipelineIO is optimizing. deploying, a/b + bandit testing, and scaling Scikit-Learn + Spark ML + Tensorflow AI models for high-performance predictions. this focus on performance and scale is an extension of our team's long history of building highly scalable, highly available, and highly performance distributed ML and AI systems at netflix, twitter, mesosphere - and even databricks. :) reminder that everything here is 100% open source. no product pitches here. we work for you guys/gals - aka the community! please contact me directly if you're looking to solve this problem the best way possible. we can get you up and running in your own cloud-based or on-premise environment in minutes. we support aws, google cloud, and azure - basically anywhere that runs docker. any time zone works. we're completely global with free 24x7 support for everyone in the community. thanks! hope this is useful. Chris Fregly Research Scientist @ PipelineIO Founder @ Advanced Spark and TensorFlow Meetup San Francisco - Chicago - Washington DC - London On Feb 4, 2017, 12:06 PM -0600, Debasish Das <debasish.da...@gmail.com>, wrote: > > Except of course lda als and neural net modelfor them the model need to > be either prescored and cached on a kv store or the matrices / graph should > be kept on kv store to access them using a REST API to serve the output..for > neural net its more fun since its a distributed or local graph over which > tensorflow compute needs to run... > > > In trapezium we support writing these models to store like cassandra and > lucene for example and then provide config driven akka-http based API to add > the business logic to access these model from a store and expose the model > serving as REST endpoint > > > Matrix, graph and kernel models we use a lot and for them turned out that > mllib style model predict were useful if we change the underlying store... > > On Feb 4, 2017 9:37 AM, "Debasish Das" <debasish.da...@gmail.com > (mailto:debasish.da...@gmail.com)> wrote: > > > > If we expose an API to access the raw models out of PipelineModel can't we > > call predict directly on it from an API ? Is there a task open to expose > > the model out of PipelineModel so that predict can be called on itthere > > is no dependency of spark context in ml model... > > > > On Feb 4, 2017 9:11 AM, "Aseem Bansal" <asmbans...@gmail.com > > (mailto:asmbans...@gmail.com)> wrote: > > > In Spark 2.0 there is a class called PipelineModel. I know that the title > > > says pipeline but it is actually talking about PipelineModel trained via > > > using a Pipeline. > > > Why PipelineModel instead of pipeline? Because usually there is a series > > > of stuff that needs to be done when doing ML which warrants an ordered > > > sequence of operations. Read the new spark ml docs or one of the > > > databricks blogs related to spark pipelines. If you have used python's > > > sklearn library the concept is inspired from there. > > > "once model is deserialized as ml model from the store of choice within > > > ms" - The timing of loading the model was not what I was referring to > > > when I was talking about timing. > > > "it can be used on incoming features to score through spark.ml.Model > > > predict API". The predict API is in the old mllib package not the new ml > > > package. > > > "why r we using dataframe and not the ML model directly from API" - > > > Because as of now the new ml package does not have the direct API. > > > > > > &
Re: Is Spark 2.0 master node compatible with Spark 1.5 work node?
you'll see errors like this... "java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.netty.RequestMessage; local class incompatible: stream classdesc serialVersionUID = -2221986757032131007, local class serialVersionUID = -5447855329526097695" ...when mixing versions of spark. i'm actually seeing this right now while testing across Spark 1.6.1 and Spark 2.0.1 for my all-in-one, hybrid cloud/on-premise Spark + Zeppelin + Kafka + Kubernetes + Docker + One-Click Spark ML Model Production Deployments initiative documented here: https://github.com/fluxcapacitor/pipeline/wiki/Kubernetes-Docker-Spark-ML and check out my upcoming meetup on this effort either in-person or online: http://www.meetup.com/Advanced-Spark-and-TensorFlow-Meetup/events/233978839/ we're throwing in some GPU/CUDA just to sweeten the offering! :) On Sat, Sep 10, 2016 at 2:57 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > I don't think a 2.0 uber jar will play nicely on a 1.5 standalone cluster. > > > On Saturday, September 10, 2016, Felix Cheung <felixcheun...@hotmail.com> > wrote: > >> You should be able to get it to work with 2.0 as uber jar. >> >> What type cluster you are running on? YARN? And what distribution? >> >> >> >> >> >> On Sun, Sep 4, 2016 at 8:48 PM -0700, "Holden Karau" < >> hol...@pigscanfly.ca> wrote: >> >> You really shouldn't mix different versions of Spark between the master >> and worker nodes, if your going to upgrade - upgrade all of them. Otherwise >> you may get very confusing failures. >> >> On Monday, September 5, 2016, Rex X <dnsr...@gmail.com> wrote: >> >>> Wish to use the Pivot Table feature of data frame which is available >>> since Spark 1.6. But the spark of current cluster is version 1.5. Can we >>> install Spark 2.0 on the master node to work around this? >>> >>> Thanks! >>> >> >> >> -- >> Cell : 425-233-8271 >> Twitter: https://twitter.com/holdenkarau >> >> > > -- > Cell : 425-233-8271 > Twitter: https://twitter.com/holdenkarau > > -- *Chris Fregly* Research Scientist @ *PipelineIO* <http://pipeline.io> *Advanced Spark and TensorFlow Meetup* <http://www.meetup.com/Advanced-Spark-and-TensorFlow-Meetup/> *San Francisco* | *Chicago* | *Washington DC*
Re: Spark 2.0.1 / 2.1.0 on Maven
alrighty then! bcc'ing user list. cc'ing dev list. @user list people: do not read any further or you will be in violation of ASF policies! On Tue, Aug 9, 2016 at 11:50 AM, Mark Hamstra <m...@clearstorydata.com> wrote: > That's not going to happen on the user list, since that is against ASF > policy (http://www.apache.org/dev/release.html): > > During the process of developing software and preparing a release, various >> packages are made available to the developer community for testing >> purposes. Do not include any links on the project website that might >> encourage non-developers to download and use nightly builds, snapshots, >> release candidates, or any other similar package. The only people who >> are supposed to know about such packages are the people following the dev >> list (or searching its archives) and thus aware of the conditions placed on >> the package. If you find that the general public are downloading such test >> packages, then remove them. >> > > On Tue, Aug 9, 2016 at 11:32 AM, Chris Fregly <ch...@fregly.com> wrote: > >> this is a valid question. there are many people building products and >> tooling on top of spark and would like access to the latest snapshots and >> such. today's ink is yesterday's news to these people - including myself. >> >> what is the best way to get snapshot releases including nightly and >> specially-blessed "preview" releases so that we, too, can say "try the >> latest release in our product"? >> >> there was a lot of chatter during the 2.0.0/2.0.1 release that i largely >> ignored because of conflicting/confusing/changing responses. and i'd >> rather not dig through jenkins builds to figure this out as i'll likely get >> it wrong. >> >> please provide the relevant snapshot/preview/nightly/whatever repos (or >> equivalent) that we need to include in our builds to have access to the >> absolute latest build assets for every major and minor release. >> >> thanks! >> >> -chris >> >> >> On Tue, Aug 9, 2016 at 10:00 AM, Mich Talebzadeh < >> mich.talebza...@gmail.com> wrote: >> >>> LOL >>> >>> Ink has not dried on Spark 2 yet so to speak :) >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> On 9 August 2016 at 17:56, Mark Hamstra <m...@clearstorydata.com> wrote: >>> >>>> What are you expecting to find? There currently are no releases beyond >>>> Spark 2.0.0. >>>> >>>> On Tue, Aug 9, 2016 at 9:55 AM, Jestin Ma <jestinwith.a...@gmail.com> >>>> wrote: >>>> >>>>> If we want to use versions of Spark beyond the official 2.0.0 release, >>>>> specifically on Maven + Java, what steps should we take to upgrade? I >>>>> can't >>>>> find the newer versions on Maven central. >>>>> >>>>> Thank you! >>>>> Jestin >>>>> >>>> >>>> >>> >> >> >> -- >> *Chris Fregly* >> Research Scientist @ PipelineIO >> San Francisco, CA >> pipeline.io >> advancedspark.com >> >> > -- *Chris Fregly* Research Scientist @ PipelineIO San Francisco, CA pipeline.io advancedspark.com
Re: Spark 2.0.1 / 2.1.0 on Maven
this is a valid question. there are many people building products and tooling on top of spark and would like access to the latest snapshots and such. today's ink is yesterday's news to these people - including myself. what is the best way to get snapshot releases including nightly and specially-blessed "preview" releases so that we, too, can say "try the latest release in our product"? there was a lot of chatter during the 2.0.0/2.0.1 release that i largely ignored because of conflicting/confusing/changing responses. and i'd rather not dig through jenkins builds to figure this out as i'll likely get it wrong. please provide the relevant snapshot/preview/nightly/whatever repos (or equivalent) that we need to include in our builds to have access to the absolute latest build assets for every major and minor release. thanks! -chris On Tue, Aug 9, 2016 at 10:00 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > LOL > > Ink has not dried on Spark 2 yet so to speak :) > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 9 August 2016 at 17:56, Mark Hamstra <m...@clearstorydata.com> wrote: > >> What are you expecting to find? There currently are no releases beyond >> Spark 2.0.0. >> >> On Tue, Aug 9, 2016 at 9:55 AM, Jestin Ma <jestinwith.a...@gmail.com> >> wrote: >> >>> If we want to use versions of Spark beyond the official 2.0.0 release, >>> specifically on Maven + Java, what steps should we take to upgrade? I can't >>> find the newer versions on Maven central. >>> >>> Thank you! >>> Jestin >>> >> >> > -- *Chris Fregly* Research Scientist @ PipelineIO San Francisco, CA pipeline.io advancedspark.com
Re: ml models distribution
hey everyone- this concept of deploying your Spark ML Pipelines and Algos into Production (user-facing production) has been coming up a lot recently. so much so, that i've dedicated the last few months of my research and engineering efforts to build out the infrastructure to support this in a highly-scalable, highly-available way. i've combined my Netflix + NetflixOSS work experience with my Databricks/IBM + Spark work experience into an open source project, PipelineIO, here: http://pipeline.io we're even serving up TensorFlow AI models using the same infrastructure - incorporating key patterns from TensorFlow Distributed + TensorFlow Serving! everything is open source, based on Docker + Kubernetes + NetflixOSS + Spark + TensorFlow + Redis + Hybrid Cloud + On-Premise + Kafka + Zeppelin + Jupyter/iPython with a heavy emphasis on metrics and monitoring of models and server production statistics. we're doing code generation directly from the saved Spark ML models (thanks Spark 2.0 for giving us save/load parity across all models!) for optimized model serving using both CPUs and GPUs, incremental training of models, autoscaling, the whole works. our friend from Netflix, Chaos Monkey, even makes a grim appearance from time to time to prove that we're resilient to failure. take a peek. it's cool. we've come a long way in the last couple months, and we've got a lot of work left to do, but the core infrastructure is in place, key features have been built, and we're moving quickly. shoot me an email if you'd like to get involved. lots of TODO's. we're dedicating my upcoming Advanced Spark and TensorFlow Meetup on August 4th in SF to demo'ing this infrastructure to you all. here's the link: http://www.meetup.com/Advanced-Spark-and-TensorFlow-Meetup/events/231457813/ video recording + screen capture will be posted afterward, as always. we've got a workshop dedicated to building an end-to-end Spark ML and Kafka-based Recommendation Pipeline - including the PipelineIO serving platform. link is here: http://pipeline.io and i'm finishing a blog post soon to detail everything we've done so far - and everything we're actively building. this post will be available on http://pipeline.io - as well as cross-posted to a number of my favorite engineering blogs. global demo roadshow starts 8/8. shoot me an email if you want to see all this in action, otherwise i'll see you at a workshop or meetup near you! :) On Fri, Jul 22, 2016 at 10:34 AM, Inam Ur Rehman <inam.rehma...@gmail.com> wrote: > Hello guys..i know its irrelevant to this topic but i've been looking > desperately for the solution. I am facing en exception > http://apache-spark-user-list.1001560.n3.nabble.com/how-to-resolve-you-must-build-spark-with-hive-exception-td27390.html > > plz help me.. I couldn't find any solution.. plz > > On Fri, Jul 22, 2016 at 6:12 PM, Sean Owen <so...@cloudera.com> wrote: > >> No there isn't anything in particular, beyond the various bits of >> serialization support that write out something to put in your storage >> to begin with. What you do with it after reading and before writing is >> up to your app, on purpose. >> >> If you mean you're producing data outside the model that your model >> uses, your model data might be produced by an RDD operation, and saved >> that way. There it's no different than anything else you do with RDDs. >> >> What part are you looking to automate beyond those things? that's most of >> it. >> >> On Fri, Jul 22, 2016 at 2:04 PM, Sergio Fernández <wik...@apache.org> >> wrote: >> > Hi Sean, >> > >> > On Fri, Jul 22, 2016 at 12:52 PM, Sean Owen <so...@cloudera.com> wrote: >> >> >> >> If you mean, how do you distribute a new model in your application, >> >> then there's no magic to it. Just reference the new model in the >> >> functions you're executing in your driver. >> >> >> >> If you implemented some other manual way of deploying model info, just >> >> do that again. There's no special thing to know. >> > >> > >> > Well, because some huge model, we typically bundle both logic >> > (pipeline/application) and models separately. Normally we use a shared >> > stores (e.g., HDFS) or coordinated distribution of the models. But I >> wanted >> > to know if there is any infrastructure in Spark that specifically >> addresses >> > such need. >> > >> > Thanks. >> > >> > Cheers, >> > >> > P.S.: sorry Jacek, with "ml" I meant "Machine Learning". I thought is a >> > quite spread acronym. Sorry for the possible confusion. >> > >> > >> > -- >> > Sergio Fernández >> > Partner Technology Manager >> > Redlink GmbH >> > m: +43 6602747925 >> > e: sergio.fernan...@redlink.co >> > w: http://redlink.co >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > -- *Chris Fregly* Research Scientist @ PipelineIO San Francisco, CA pipeline.io advancedspark.com
Re: [Spark 2.0.0] Structured Stream on Kafka
+1 vote, +1 watch this would be huge. On Tue, Jun 14, 2016 at 10:47 AM, andy petrella <andy.petre...@gmail.com> wrote: > kool, voted and watched! > tx > > On Tue, Jun 14, 2016 at 4:44 PM Cody Koeninger <c...@koeninger.org> wrote: > >> I haven't done any significant work on using structured streaming with >> kafka, there's a jira ticket for tracking purposes >> >> https://issues.apache.org/jira/browse/SPARK-15406 >> >> >> >> On Tue, Jun 14, 2016 at 9:21 AM, andy petrella <andy.petre...@gmail.com> >> wrote: >> > Heya folks, >> > >> > Just wondering if there are some doc regarding using kafka directly >> from the >> > reader.stream? >> > Has it been integrated already (I mean the source)? >> > >> > Sorry if the answer is RTFM (but then I'd appreciate a pointer anyway^^) >> > >> > thanks, >> > cheers >> > andy >> > -- >> > andy >> > -- > andy > -- *Chris Fregly* Research Scientist @ PipelineIO San Francisco, CA http://pipeline.io
Re: Book for Machine Learning (MLIB and other libraries on Spark)
two of my faves: https://www.amazon.com/Advanced-Analytics-Spark-Patterns-Learning/dp/1491912766/ (Cloudera authors) https://www.amazon.com/Machine-Learning-Spark-Powerful-Algorithms/dp/1783288515/ (IBM author) (most) authors are Spark Committers. while not totally up to date w/ ML pipelines and such, these 2 books give you relevant use cases and make you think about ML in terms of distributed systems. classics. On Sun, Jun 12, 2016 at 7:57 AM, Deepak Goel <deic...@gmail.com> wrote: > Thank You...Please see inline.. > > > On Sun, Jun 12, 2016 at 3:39 PM, <mylistt...@gmail.com> wrote: > >> Machine learning - I would suggest that you pick up a fine book that >> explains machine learning. That's the way I went about - pick up each type >> of machine learning concept - say Linear regression then understand the >> why/when/how etc and infer results etc. >> >> Then apply the learning to a small data set using python or R or scala >> without Spark. This is to familiarize the learning. >> >> > Then run the same with MLlib and see it with a big data set on Spark. I >> would call this consolidation. >> > *Deepak > > Sorry for the confusion in my question. However, I was more interested in > getting hold of a book which explains how I can use MLlib and Spark for > machine learning problems. > > *Deepak > >> >> Few things to remember - all Machine learning algorithms are not >> available On spark. There is a list of machine learning supported in spark. >> Kindly look at that. Also look at how to integrate mahout / h20 with spark >> and see how you can run the machine learning stuff supported by mahout with >> spark. >> >> And then your journey begins :-). >> >> Regards, >> Harmeet >> >> >> >> >> On Jun 12, 2016, at 0:31, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >> yes absolutely Ted. >> >> Thanks for highlighting it >> >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 11 June 2016 at 19:00, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Another source is the presentation on various ocnferences. >>> e.g. >>> >>> http://www.slideshare.net/databricks/apache-spark-mllib-20-preview-data-science-and-production >>> >>> FYI >>> >>> On Sat, Jun 11, 2016 at 8:47 AM, Mich Talebzadeh < >>> mich.talebza...@gmail.com> wrote: >>> >>>> Interesting. >>>> >>>> The pace of development in this field is such that practically every >>>> single book in Big Data landscape gets out of data before the ink dries on >>>> it :) >>>> >>>> I concur that they serve as good reference for starters but in my >>>> opinion the best way to learn is to start from on-line docs (and these are >>>> pretty respectful when it comes to Spark) and progress from there. >>>> >>>> If you have a certain problem then put to this group and I am sure >>>> someone somewhere in this forum has come across it. Also most of these >>>> books' authors actively contribute to this mailing list. >>>> >>>> >>>> HTH >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn * >>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> On 11 June 2016 at 16:10, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> >>>>> https://www.amazon.com/Machine-Learning-Spark-Powerful-Algorithms/dp/1783288515/ref=sr_1_1?ie=UTF8=1465657706=8-1=spark+mllib >>>>> >>>>> >>>>> https://www.amazon.com/Spark-Practical-Machine-Learning-Chinese/dp/7302420424/ref=sr_1_3?ie=UTF8=1465657706=8-3=spark+mllib >>>>> >>>>> >>>>> https://www.amazon.com/Advanced-Analytics-Spark-Patterns-Learning/dp/1491912766/ref=sr_1_2?ie=UTF8=1465657706=8-2=spark+mllib >>>>> >>>>> >>>>> On Sat, Jun 11, 2016 at 8:04 AM, Deepak Goel <deic...@gmail.com> >>>>> wrote: >>>>> >>>>>> >>>>>> Hey >>>>>> >>>>>> Namaskara~Nalama~Guten Tag~Bonjour >>>>>> >>>>>> I am a newbie to Machine Learning (MLIB and other libraries on Spark) >>>>>> >>>>>> Which would be the best book to learn up? >>>>>> >>>>>> Thanks >>>>>> Deepak >>>>>>-- >>>>>> Keigu >>>>>> >>>>>> Deepak >>>>>> 73500 12833 >>>>>> www.simtree.net, dee...@simtree.net >>>>>> deic...@gmail.com >>>>>> >>>>>> LinkedIn: www.linkedin.com/in/deicool >>>>>> Skype: thumsupdeicool >>>>>> Google talk: deicool >>>>>> Blog: http://loveandfearless.wordpress.com >>>>>> Facebook: http://www.facebook.com/deicool >>>>>> >>>>>> "Contribute to the world, environment and more : >>>>>> http://www.gridrepublic.org >>>>>> " >>>>>> >>>>> >>>>> >>>> >>> >> > -- *Chris Fregly* Research Scientist @ PipelineIO San Francisco, CA http://pipeline.io
Re: Classpath hell and Elasticsearch 2.3.2...
gt;>>>>> at >>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>> at >>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>> >>>>>> ... but I think its caused by this: >>>>>> >>>>>> 16/06/03 00:26:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 >>>>>> (TID 0, localhost): java.lang.Error: Multiple ES-Hadoop versions detected >>>>>> in the classpath; please use only one >>>>>> >>>>>> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-2.3.2.jar >>>>>> >>>>>> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-spark_2.11-2.3.2.jar >>>>>> >>>>>> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-mr-2.3.2.jar >>>>>> >>>>>> at org.elasticsearch.hadoop.util.Version.(Version.java:73) >>>>>> at >>>>>> org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:376) >>>>>> at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) >>>>>> at >>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >>>>>> at >>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67) >>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>> at >>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> .. still tracking this down but was wondering if there is someting >>>>>> obvious I'm dong wrong. I'm going to take out >>>>>> elasticsearch-hadoop-2.3.2.jar and try again. >>>>>> >>>>>> Lots of trial and error here :-/ >>>>>> >>>>>> Kevin >>>>>> >>>>>> -- >>>>>> >>>>>> We’re hiring if you know of any awesome Java Devops or Linux >>>>>> Operations Engineers! >>>>>> >>>>>> Founder/CEO Spinn3r.com >>>>>> Location: *San Francisco, CA* >>>>>> blog: http://burtonator.wordpress.com >>>>>> … or check out my Google+ profile >>>>>> <https://plus.google.com/102718274791889610666/posts> >>>>>> >>>>>> >>>> >>>> >>>> -- >>>> >>>> We’re hiring if you know of any awesome Java Devops or Linux Operations >>>> Engineers! >>>> >>>> Founder/CEO Spinn3r.com >>>> Location: *San Francisco, CA* >>>> blog: http://burtonator.wordpress.com >>>> … or check out my Google+ profile >>>> <https://plus.google.com/102718274791889610666/posts> >>>> >>>> >> >> >> -- >> >> We’re hiring if you know of any awesome Java Devops or Linux Operations >> Engineers! >> >> Founder/CEO Spinn3r.com >> Location: *San Francisco, CA* >> blog: http://burtonator.wordpress.com >> … or check out my Google+ profile >> <https://plus.google.com/102718274791889610666/posts> >> >> -- *Chris Fregly* Research Scientist @ PipelineIO San Francisco, CA http://pipeline.io
Re: GraphX Java API
btw, GraphX in Action is one of the better books out on Spark. Michael did a great job with this one. He even breaks down snippets of Scala for newbies to understand the seemingly-arbitrary syntax. I learned quite a bit about not only Spark, but also Scala. And of course, we shouldn't forget about Sean's Advanced Analytics with Spark which, of course, is a classic that I still reference regularly. :) On Mon, May 30, 2016 at 7:42 AM, Michael Malak < michaelma...@yahoo.com.invalid> wrote: > Yes, it is possible to use GraphX from Java but it requires 10x the amount > of code and involves using obscure typing and pre-defined lambda prototype > facilities. I give an example of it in my book, the source code for which > can be downloaded for free from > https://www.manning.com/books/spark-graphx-in-action The relevant example > is EdgeCount.java in chapter 10. > > As I suggest in my book, likely the only reason you'd want to put yourself > through that torture is corporate mandate or compatibility with Java > bytecode tools. > > > -- > *From:* Sean Owen <so...@cloudera.com> > *To:* Takeshi Yamamuro <linguin@gmail.com>; "Kumar, Abhishek (US - > Bengaluru)" <abhishekkuma...@deloitte.com> > *Cc:* "user@spark.apache.org" <user@spark.apache.org> > *Sent:* Monday, May 30, 2016 7:07 AM > *Subject:* Re: GraphX Java API > > No, you can call any Scala API in Java. It is somewhat less convenient if > the method was not written with Java in mind but does work. > > On Mon, May 30, 2016, 00:32 Takeshi Yamamuro <linguin@gmail.com> > wrote: > > These package are used only for Scala. > > On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) < > abhishekkuma...@deloitte.com> wrote: > > Hey, > · I see some graphx packages listed here: > http://spark.apache.org/docs/latest/api/java/index.html > · org.apache.spark.graphx > <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/package-frame.html> > · org.apache.spark.graphx.impl > <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/impl/package-frame.html> > · org.apache.spark.graphx.lib > <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/lib/package-frame.html> > · org.apache.spark.graphx.util > <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/util/package-frame.html> > Aren’t they meant to be used with JAVA? > Thanks > > *From:* Santoshakhilesh [mailto:santosh.akhil...@huawei.com] > *Sent:* Friday, May 27, 2016 4:52 PM > *To:* Kumar, Abhishek (US - Bengaluru) <abhishekkuma...@deloitte.com>; > user@spark.apache.org > *Subject:* RE: GraphX Java API > > GraphX APis are available only in Scala. If you need to use GraphX you > need to switch to Scala. > > *From:* Kumar, Abhishek (US - Bengaluru) [ > mailto:abhishekkuma...@deloitte.com <abhishekkuma...@deloitte.com>] > *Sent:* 27 May 2016 19:59 > *To:* user@spark.apache.org > *Subject:* GraphX Java API > > Hi, > > We are trying to consume the Java API for GraphX, but there is no > documentation available online on the usage or examples. It would be great > if we could get some examples in Java. > > Thanks and regards, > > *Abhishek Kumar* > > > > This message (including any attachments) contains confidential information > intended for a specific individual and purpose, and is protected by law. If > you are not the intended recipient, you should delete this message and any > disclosure, copying, or distribution of this message, or the taking of any > action based on it, by you is strictly prohibited. > v.E.1 > > > > > > > > > -- > --- > Takeshi Yamamuro > > > > -- *Chris Fregly* Research Scientist @ Flux Capacitor AI "Bringing AI Back to the Future!" San Francisco, CA http://fluxcapacitor.ai
Re: local Vs Standalonecluster production deployment
t;>>>>>> spark-submit in the host where the cluster manager is running. >>>>>>>>> >>>>>>>>> The Driver node runs on the same host that the cluster manager is >>>>>>>>> running. The Driver requests the Cluster Manager for resources to run >>>>>>>>> tasks. The worker is tasked to create the executor (in this case >>>>>>>>> there is >>>>>>>>> only one executor) for the Driver. The Executor runs tasks for the >>>>>>>>> Driver. >>>>>>>>> Only one executor can be allocated on each worker per application. In >>>>>>>>> your >>>>>>>>> case you only have >>>>>>>>> >>>>>>>>> >>>>>>>>> The minimum you will need will be 2-4G of RAM and two cores. Well >>>>>>>>> that is my experience. Yes you can submit more than one spark-submit >>>>>>>>> (the >>>>>>>>> driver) but they may queue up behind the running one if there is not >>>>>>>>> enough >>>>>>>>> resources. >>>>>>>>> >>>>>>>>> >>>>>>>>> You pointed out that you will be running few applications in >>>>>>>>> parallel on the same host. The likelihood is that you are using a VM >>>>>>>>> machine for this purpose and the best option is to try running the >>>>>>>>> first >>>>>>>>> one, Check Web GUI on 4040 to see the progress of this Job. If you >>>>>>>>> start >>>>>>>>> the next JVM then assuming it is working, it will be using port 4041 >>>>>>>>> and so >>>>>>>>> forth. >>>>>>>>> >>>>>>>>> >>>>>>>>> In actual fact try the command "free" to see how much free memory >>>>>>>>> you have. >>>>>>>>> >>>>>>>>> >>>>>>>>> HTH >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn * >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 28 May 2016 at 16:42, sujeet jog <sujeet@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I have a question w.r.t production deployment mode of spark, >>>>>>>>>> >>>>>>>>>> I have 3 applications which i would like to run independently on >>>>>>>>>> a single machine, i need to run the drivers in the same machine. >>>>>>>>>> >>>>>>>>>> The amount of resources i have is also limited, like 4- 5GB RAM , >>>>>>>>>> 3 - 4 cores. >>>>>>>>>> >>>>>>>>>> For deployment in standalone mode : i believe i need >>>>>>>>>> >>>>>>>>>> 1 Driver JVM, 1 worker node ( 1 executor ) >>>>>>>>>> 1 Driver JVM, 1 worker node ( 1 executor ) >>>>>>>>>> 1 Driver JVM, 1 worker node ( 1 executor ) >>>>>>>>>> >>>>>>>>>> The issue here is i will require 6 JVM running in parallel, for >>>>>>>>>> which i do not have sufficient CPU/MEM resources, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hence i was looking more towards a local mode deployment mode, >>>>>>>>>> would like to know if anybody is using local mode where Driver + >>>>>>>>>> Executor >>>>>>>>>> run in a single JVM in production mode. >>>>>>>>>> >>>>>>>>>> Are there any inherent issues upfront using local mode for >>>>>>>>>> production base systems.?.. >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > -- *Chris Fregly* Research Scientist @ Flux Capacitor AI "Bringing AI Back to the Future!" San Francisco, CA http://fluxcapacitor.ai
Re: Does Structured Streaming support count(distinct) over all the streaming data?
you can use HyperLogLog with Spark Streaming to accomplish this. here is an example from my fluxcapacitor GitHub repo: https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx here's an accompanying SlideShare presentation from one of my recent meetups (slides 70-83): http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037 <http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037> and a YouTube video for those that prefer video (starting at 32 mins into the video for your convenience): https://youtu.be/wM9Z0PLx3cw?t=1922 On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <mich.talebza...@gmail.com > wrote: > Ok but how about something similar to > > val countByValueAndWindow = price.filter(_ > > 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval)) > > > Using a new count => c*ountDistinctByValueAndWindow ?* > > val countDistinctByValueAndWindow = price.filter(_ > > 95.0).countDistinctByValueAndWindow(Seconds(windowLength), > Seconds(slidingInterval)) > > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 17 May 2016 at 20:02, Michael Armbrust <mich...@databricks.com> wrote: > >> In 2.0 you won't be able to do this. The long term vision would be to >> make this possible, but a window will be required (like the 24 hours you >> suggest). >> >> On Tue, May 17, 2016 at 1:36 AM, Todd <bit1...@163.com> wrote: >> >>> Hi, >>> We have a requirement to do count(distinct) in a processing batch >>> against all the streaming data(eg, last 24 hours' data),that is,when we do >>> count(distinct),we actually want to compute distinct against last 24 hours' >>> data. >>> Does structured streaming support this scenario?Thanks! >>> >> >> > -- *Chris Fregly* Research Scientist @ Flux Capacitor AI "Bringing AI Back to the Future!" San Francisco, CA http://fluxcapacitor.com
Re: Any NLP lib could be used on spark?
this took me a bit to get working, but I finally got it up and running so with the package that Burak pointed out. here's some relevant links to my project that should give you some clues: https://github.com/fluxcapacitor/pipeline/blob/master/myapps/spark/ml/src/main/scala/com/advancedspark/ml/nlp/ItemDescriptionsDF.scala https://github.com/fluxcapacitor/pipeline/blob/master/myapps/spark/ml/build.sbt https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/pipeline.bashrc (look for the SPARK_SUBMIT_PACKAGES export var) there's also a few Zeppelin notebooks in that repo that will show you got to use it. I'm doing sentiment analysis in one - as well as entity recognition and other fun stuff. it's a pain to setup, unfortunately. not sure why it. lots of missing pieces that had to be manually cobbled together. > On Apr 19, 2016, at 5:00 PM, Burak Yavuzwrote: > > A quick search on spark-packages returns: > http://spark-packages.org/package/databricks/spark-corenlp. > > You may need to build it locally and add it to your session by --jars. > > On Tue, Apr 19, 2016 at 10:47 AM, Gavin Yue wrote: >> Hey, >> >> Want to try the NLP on the spark. Could anyone recommend any easy to run NLP >> open source lib on spark? >> >> Also is there any recommended semantic network? >> >> Thanks a lot. >
Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0
perhaps renaming to Spark ML would actually clear up code and documentation confusion? +1 for rename > On Apr 5, 2016, at 7:00 PM, Reynold Xinwrote: > > +1 > > This is a no brainer IMO. > > >> On Tue, Apr 5, 2016 at 7:32 PM, Joseph Bradley wrote: >> +1 By the way, the JIRA for tracking (Scala) API parity is: >> https://issues.apache.org/jira/browse/SPARK-4591 >> >>> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia >>> wrote: >>> This sounds good to me as well. The one thing we should pay attention to is >>> how we update the docs so that people know to start with the spark.ml >>> classes. Right now the docs list spark.mllib first and also seem more >>> comprehensive in that area than in spark.ml, so maybe people naturally move >>> towards that. >>> >>> Matei >>> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng wrote: Yes, DB (cc'ed) is working on porting the local linear algebra library over (SPARK-13944). There are also frequent pattern mining algorithms we need to port over in order to reach feature parity. -Xiangrui > On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman > wrote: > Overall this sounds good to me. One question I have is that in > addition to the ML algorithms we have a number of linear algebra > (various distributed matrices) and statistical methods in the > spark.mllib package. Is the plan to port or move these to the spark.ml > namespace in the 2.x series ? > > Thanks > Shivaram > > On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen wrote: > > FWIW, all of that sounds like a good plan to me. Developing one API is > > certainly better than two. > > > > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng wrote: > >> Hi all, > >> > >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API > >> built > >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based > >> API has > >> been developed under the spark.ml package, while the old RDD-based API > >> has > >> been developed in parallel under the spark.mllib package. While it was > >> easier to implement and experiment with new APIs under a new package, > >> it > >> became harder and harder to maintain as both packages grew bigger and > >> bigger. And new users are often confused by having two sets of APIs > >> with > >> overlapped functions. > >> > >> We started to recommend the DataFrame-based API over the RDD-based API > >> in > >> Spark 1.5 for its versatility and flexibility, and we saw the > >> development > >> and the usage gradually shifting to the DataFrame-based API. Just > >> counting > >> the lines of Scala code, from 1.5 to the current master we added ~1 > >> lines to the DataFrame-based API while ~700 to the RDD-based API. So, > >> to > >> gather more resources on the development of the DataFrame-based API > >> and to > >> help users migrate over sooner, I want to propose switching RDD-based > >> MLlib > >> APIs to maintenance mode in Spark 2.0. What does it mean exactly? > >> > >> * We do not accept new features in the RDD-based spark.mllib package, > >> unless > >> they block implementing new features in the DataFrame-based spark.ml > >> package. > >> * We still accept bug fixes in the RDD-based API. > >> * We will add more features to the DataFrame-based API in the 2.x > >> series to > >> reach feature parity with the RDD-based API. > >> * Once we reach feature parity (possibly in Spark 2.2), we will > >> deprecate > >> the RDD-based API. > >> * We will remove the RDD-based API from the main Spark repo in Spark > >> 3.0. > >> > >> Though the RDD-based API is already in de facto maintenance mode, this > >> announcement will make it clear and hence important to both MLlib > >> developers > >> and users. So we’d greatly appreciate your feedback! > >> > >> (As a side note, people sometimes use “Spark ML” to refer to the > >> DataFrame-based API or even the entire MLlib component. This also > >> causes > >> confusion. To be clear, “Spark ML” is not an official name and there > >> are no > >> plans to rename MLlib to “Spark ML” at this time.) > >> > >> Best, > >> Xiangrui > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > >
Re: Spark for Log Analytics
oh, and I forgot to mention Kafka Streams which has been heavily talked about the last few days at Strata here in San Jose. Streams can simplify a lot of this architecture by perform some light-to-medium-complex transformations in Kafka itself. i'm waiting anxiously for Kafka 0.10 with production-ready Kafka Streams, so I can try this out myself - and hopefully remove a lot of extra plumbing. On Thu, Mar 31, 2016 at 4:42 AM, Chris Fregly <ch...@fregly.com> wrote: > this is a very common pattern, yes. > > note that in Netflix's case, they're currently pushing all of their logs > to a Fronting Kafka + Samza Router which can route to S3 (or HDFS), > ElasticSearch, and/or another Kafka Topic for further consumption by > internal apps using other technologies like Spark Streaming (instead of > Samza). > > this Fronting Kafka + Samza Router also helps to differentiate between > high-priority events (Errors or High Latencies) and normal-priority events > (normal User Play or Stop events). > > here's a recent presentation i did which details this configuration > starting at slide 104: > http://www.slideshare.net/cfregly/dc-spark-users-group-march-15-2016-spark-and-netflix-recommendations > . > > btw, Confluent's distribution of Kafka does have a direct Http/REST API > which is not recommended for production use, but has worked well for me in > the past. > > these are some additional options to think about, anyway. > > > On Thu, Mar 31, 2016 at 4:26 AM, Steve Loughran <ste...@hortonworks.com> > wrote: > >> >> On 31 Mar 2016, at 09:37, ashish rawat <dceash...@gmail.com> wrote: >> >> Hi, >> >> I have been evaluating Spark for analysing Application and Server Logs. I >> believe there are some downsides of doing this: >> >> 1. No direct mechanism of collecting log, so need to introduce other >> tools like Flume into the pipeline. >> >> >> you need something to collect logs no matter what you run. Flume isn't so >> bad; if you bring it up on the same host as the app then you can even >> collect logs while the network is playing up. >> >> Or you can just copy log4j files to HDFS and process them later >> >> 2. Need to write lots of code for parsing different patterns from logs, >> while some of the log analysis tools like logstash or loggly provide it out >> of the box >> >> >> >> Log parsing is essentially an ETL problem, especially if you don't try to >> lock down the log event format. >> >> You can also configure Log4J to save stuff in an easy-to-parse format >> and/or forward directly to your application. >> >> There's a log4j to flume connector to do that for you, >> >> >> http://www.thecloudavenue.com/2013/11/using-log4jflume-to-log-application.html >> >> or you can output in, say, JSON ( >> https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/Log4Json.java >> ) >> >> I'd go with flume unless you had a need to save the logs locally and copy >> them to HDFS laster. >> >> >> >> On the benefits side, I believe Spark might be more performant (although >> I am yet to benchmark it) and being a generic processing engine, might work >> with complex use cases where the out of the box functionality of log >> analysis tools is not sufficient (although I don't have any such use case >> right now). >> >> One option I was considering was to use logstash for collection and basic >> processing and then sink the processed logs to both elastic search and >> kafka. So that Spark Streaming can pick data from Kafka for the complex use >> cases, while logstash filters can be used for the simpler use cases. >> >> I was wondering if someone has already done this evaluation and could >> provide me some pointers on how/if to create this pipeline with Spark. >> >> Regards, >> Ashish >> >> >> >> > > > -- > > *Chris Fregly* > Principal Data Solutions Engineer > IBM Spark Technology Center, San Francisco, CA > http://spark.tc | http://advancedspark.com > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Spark for Log Analytics
this is a very common pattern, yes. note that in Netflix's case, they're currently pushing all of their logs to a Fronting Kafka + Samza Router which can route to S3 (or HDFS), ElasticSearch, and/or another Kafka Topic for further consumption by internal apps using other technologies like Spark Streaming (instead of Samza). this Fronting Kafka + Samza Router also helps to differentiate between high-priority events (Errors or High Latencies) and normal-priority events (normal User Play or Stop events). here's a recent presentation i did which details this configuration starting at slide 104: http://www.slideshare.net/cfregly/dc-spark-users-group-march-15-2016-spark-and-netflix-recommendations . btw, Confluent's distribution of Kafka does have a direct Http/REST API which is not recommended for production use, but has worked well for me in the past. these are some additional options to think about, anyway. On Thu, Mar 31, 2016 at 4:26 AM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 31 Mar 2016, at 09:37, ashish rawat <dceash...@gmail.com> wrote: > > Hi, > > I have been evaluating Spark for analysing Application and Server Logs. I > believe there are some downsides of doing this: > > 1. No direct mechanism of collecting log, so need to introduce other tools > like Flume into the pipeline. > > > you need something to collect logs no matter what you run. Flume isn't so > bad; if you bring it up on the same host as the app then you can even > collect logs while the network is playing up. > > Or you can just copy log4j files to HDFS and process them later > > 2. Need to write lots of code for parsing different patterns from logs, > while some of the log analysis tools like logstash or loggly provide it out > of the box > > > > Log parsing is essentially an ETL problem, especially if you don't try to > lock down the log event format. > > You can also configure Log4J to save stuff in an easy-to-parse format > and/or forward directly to your application. > > There's a log4j to flume connector to do that for you, > > > http://www.thecloudavenue.com/2013/11/using-log4jflume-to-log-application.html > > or you can output in, say, JSON ( > https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/Log4Json.java > ) > > I'd go with flume unless you had a need to save the logs locally and copy > them to HDFS laster. > > > > On the benefits side, I believe Spark might be more performant (although I > am yet to benchmark it) and being a generic processing engine, might work > with complex use cases where the out of the box functionality of log > analysis tools is not sufficient (although I don't have any such use case > right now). > > One option I was considering was to use logstash for collection and basic > processing and then sink the processed logs to both elastic search and > kafka. So that Spark Streaming can pick data from Kafka for the complex use > cases, while logstash filters can be used for the simpler use cases. > > I was wondering if someone has already done this evaluation and could > provide me some pointers on how/if to create this pipeline with Spark. > > Regards, > Ashish > > > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Can we use spark inside a web service?
great discussion, indeed. Mark Hamstra and i spoke offline just now. Below is a quick recap of our discussion on how they've achieved acceptable performance from Spark on the user request/response path (@mark- feel free to correct/comment). 1) there is a big difference in request/response latency between submitting a full Spark Application (heavy weight) versus having a long-running Spark Application (like Spark Job Server) that submits lighter-weight Jobs using a shared SparkContext. mark is obviously using the latter - a long-running Spark App. 2) there are some enhancements to Spark that are required to achieve acceptable user request/response times. some links that Mark provided are as follows: - https://issues.apache.org/jira/browse/SPARK-11838 - https://github.com/apache/spark/pull/11036 - https://github.com/apache/spark/pull/11403 - https://issues.apache.org/jira/browse/SPARK-13523 - https://issues.apache.org/jira/browse/SPARK-13756 Essentially, a deeper level of caching at the shuffle file layer to reduce compute and memory between queries. Note that Mark is running a slightly-modified version of stock Spark. (He's mentioned this in prior posts, as well.) And I have to say that I'm, personally, seeing more and more slightly-modified versions of Spark being deployed to production to workaround outstanding PR's and Jiras. this may not be what people want to hear, but it's a trend that i'm seeing lately as more and more customize Spark to their specific use cases. Anyway, thanks for the good discussion, everyone! This is why we have these lists, right! :) On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com> wrote: > One of the premises here is that if you can restrict your workload to > fewer cores - which is easier with FiloDB and careful data modeling - > you can make this work for much higher concurrency and lower latency > than most typical Spark use cases. > > The reason why it typically does not work in production is that most > people are using HDFS and files. These data sources are designed for > running queries and workloads on all your cores across many workers, > and not for filtering your workload down to only one or two cores. > > There is actually nothing inherent in Spark that prevents people from > using it as an app server. However, the insistence on using it with > HDFS is what kills concurrency. This is why FiloDB is important. > > I agree there are more optimized stacks for running app servers, but > the choices that you mentioned: ES is targeted at text search; Cass > and HBase by themselves are not fast enough for analytical queries > that the OP wants; and MySQL is great but not scalable. Probably > something like VectorWise, HANA, Vertica would work well, but those > are mostly not free solutions. Druid could work too if the use case > is right. > > Anyways, great discussion! > > On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly <ch...@fregly.com> wrote: > > you are correct, mark. i misspoke. apologies for the confusion. > > > > so the problem is even worse given that a typical job requires multiple > > tasks/cores. > > > > i have yet to see this particular architecture work in production. i > would > > love for someone to prove otherwise. > > > > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra <m...@clearstorydata.com> > > wrote: > >>> > >>> For example, if you're looking to scale out to 1000 concurrent > requests, > >>> this is 1000 concurrent Spark jobs. This would require a cluster with > 1000 > >>> cores. > >> > >> > >> This doesn't make sense. A Spark Job is a driver/DAGScheduler concept > >> without any 1:1 correspondence between Worker cores and Jobs. Cores are > >> used to run Tasks, not Jobs. So, yes, a 1000 core cluster can run at > most > >> 1000 simultaneous Tasks, but that doesn't really tell you anything > about how > >> many Jobs are or can be concurrently tracked by the DAGScheduler, which > will > >> be apportioning the Tasks from those concurrent Jobs across the > available > >> Executor cores. > >> > >> On Thu, Mar 10, 2016 at 2:00 PM, Chris Fregly <ch...@fregly.com> wrote: > >>> > >>> Good stuff, Evan. Looks like this is utilizing the in-memory > >>> capabilities of FiloDB which is pretty cool. looking forward to the > webcast > >>> as I don't know much about FiloDB. > >>> > >>> My personal thoughts here are to removed Spark from the user > >>> request/response hot path. > >>> > >>> I can't tell you how many times i've had to unroll that architecture at > >
Re: [MLlib - ALS] Merging two Models?
@Colin- you're asking the $1 million dollar question that a lot of people are trying to do. This was literally the #1 most-asked question in every city on my recent world-wide meetup tour. I've been pointing people to my old Databricks co-worker's streaming-matrix-factorization project: https://github.com/brkyvz/streaming-matrix-factorization He got tired of everyone asking about this - and cranked it out over a weekend. Love that guy, Burak! :) I've attempted (unsuccessfully, so far) to deploy exactly what you're trying to do here: https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/ml/TrainMFIncremental.scala We're a couple pull requests away from making this happen. You can see my comments and open github issues for the remaining bits. And this will be my focus in the next week or so as I prepare for an upcoming conference. Keep an eye on this repo if you'd like. @Sean: thanks for the link. I knew Oryx was doing this somehow - and I kept meaning to see how you were doing it. I'll likely incorporate some of your stuff into my final solution. On Thu, Mar 10, 2016 at 3:35 PM, Sean Owen <so...@cloudera.com> wrote: > While it isn't crazy, I am not sure how valid it is to build a model > off of only a chunk of recent data and then merge it into another > model in any direct way. They're not really sharing a basis, so you > can't just average them. > > My experience with this aspect suggests you should try to update the > existing model in place on the fly. In short, you figure out how much > the new input ought to change your estimate of the (user,item) > association. Positive interactions should increase it a bit, etc. Then > you work out how the item vector would change if the user vector were > fixed in order to accomplish that change, with a bit of linear > algebra. Vice versa for user vector. Of course, those changes affect > the rest of the matrix too but that's the 'approximate' bit. > > I so happen to have an implementation of this in the context of a > Spark ALS model, though raw source code may be hard to read. If it's > of interest we can discuss offline (or online here to the extent it's > relevant to Spark users) > > > https://github.com/OryxProject/oryx/blob/91004a03413eef0fdfd6e75a61b68248d11db0e5/app/oryx-app/src/main/java/com/cloudera/oryx/app/speed/als/ALSSpeedModelManager.java#L192 > > On Thu, Mar 10, 2016 at 8:01 PM, Colin Woodbury <coli...@gmail.com> wrote: > > Hi there, I'm wondering if it's possible (or feasible) to combine the > > feature matrices of two MatrixFactorizationModels that share a user and > > product set. > > > > Specifically, one model would be the "on-going" model, and the other is > one > > trained only on the most recent aggregation of some event data. My > overall > > goal is to try to approximate "online" training, as ALS doesn't support > > streaming, and it also isn't possible to "seed" the ALS training process > > with an already trained model. > > > > Since the two Models would share a user/product ID space, can their > feature > > matrices be merged? For instance via: > > > > 1. Adding feature vectors together for user/product vectors that appear > in > > both models > > 2. Averaging said vectors instead > > 3. Some other linear algebra operation > > > > Unfortunately, I'm fairly ignorant as to the internal mechanics of ALS > > itself. Is what I'm asking possible? > > > > Thank you, > > Colin > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Can we use spark inside a web service?
you are correct, mark. i misspoke. apologies for the confusion. so the problem is even worse given that a typical job requires multiple tasks/cores. i have yet to see this particular architecture work in production. i would love for someone to prove otherwise. On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra <m...@clearstorydata.com> wrote: > For example, if you're looking to scale out to 1000 concurrent requests, >> this is 1000 concurrent Spark jobs. This would require a cluster with 1000 >> cores. > > > This doesn't make sense. A Spark Job is a driver/DAGScheduler concept > without any 1:1 correspondence between Worker cores and Jobs. Cores are > used to run Tasks, not Jobs. So, yes, a 1000 core cluster can run at most > 1000 simultaneous Tasks, but that doesn't really tell you anything about > how many Jobs are or can be concurrently tracked by the DAGScheduler, which > will be apportioning the Tasks from those concurrent Jobs across the > available Executor cores. > > On Thu, Mar 10, 2016 at 2:00 PM, Chris Fregly <ch...@fregly.com> wrote: > >> Good stuff, Evan. Looks like this is utilizing the in-memory >> capabilities of FiloDB which is pretty cool. looking forward to the >> webcast as I don't know much about FiloDB. >> >> My personal thoughts here are to removed Spark from the user >> request/response hot path. >> >> I can't tell you how many times i've had to unroll that architecture at >> clients - and replace with a real database like Cassandra, ElasticSearch, >> HBase, MySql. >> >> Unfortunately, Spark - and Spark Streaming, especially - lead you to >> believe that Spark could be used as an application server. This is not a >> good use case for Spark. >> >> Remember that every job that is launched by Spark requires 1 CPU core, >> some memory, and an available Executor JVM to provide the CPU and memory. >> >> Yes, you can horizontally scale this because of the distributed nature of >> Spark, however it is not an efficient scaling strategy. >> >> For example, if you're looking to scale out to 1000 concurrent requests, >> this is 1000 concurrent Spark jobs. This would require a cluster with 1000 >> cores. this is just not cost effective. >> >> Use Spark for what it's good for - ad-hoc, interactive, and iterative >> (machine learning, graph) analytics. Use an application server for what >> it's good - managing a large amount of concurrent requests. And use a >> database for what it's good for - storing/retrieving data. >> >> And any serious production deployment will need failover, throttling, >> back pressure, auto-scaling, and service discovery. >> >> While Spark supports these to varying levels of production-readiness, >> Spark is a batch-oriented system and not meant to be put on the user >> request/response hot path. >> >> For the failover, throttling, back pressure, autoscaling that i mentioned >> above, it's worth checking out the suite of Netflix OSS - particularly >> Hystrix, Eureka, Zuul, Karyon, etc: http://netflix.github.io/ >> >> Here's my github project that incorporates a lot of these: >> https://github.com/cfregly/fluxcapacitor >> >> Here's a netflix Skunkworks github project that packages these up in >> Docker images: https://github.com/Netflix-Skunkworks/zerotodocker >> >> >> On Thu, Mar 10, 2016 at 1:40 PM, velvia.github <velvia.git...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I just wrote a blog post which might be really useful to you -- I have >>> just >>> benchmarked being able to achieve 700 queries per second in Spark. So, >>> yes, >>> web speed SQL queries are definitely possible. Read my new blog post: >>> >>> http://velvia.github.io/Spark-Concurrent-Fast-Queries/ >>> >>> and feel free to email me (at vel...@gmail.com) if you would like to >>> follow >>> up. >>> >>> -Evan >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-use-spark-inside-a-web-service-tp26426p26451.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> >> -- >> >> *Chris Fregly* >> Principal Data Solutions Engineer >> IBM Spark Technology Center, San Francisco, CA >> http://spark.tc | http://advancedspark.com >> > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Can we use spark inside a web service?
Good stuff, Evan. Looks like this is utilizing the in-memory capabilities of FiloDB which is pretty cool. looking forward to the webcast as I don't know much about FiloDB. My personal thoughts here are to removed Spark from the user request/response hot path. I can't tell you how many times i've had to unroll that architecture at clients - and replace with a real database like Cassandra, ElasticSearch, HBase, MySql. Unfortunately, Spark - and Spark Streaming, especially - lead you to believe that Spark could be used as an application server. This is not a good use case for Spark. Remember that every job that is launched by Spark requires 1 CPU core, some memory, and an available Executor JVM to provide the CPU and memory. Yes, you can horizontally scale this because of the distributed nature of Spark, however it is not an efficient scaling strategy. For example, if you're looking to scale out to 1000 concurrent requests, this is 1000 concurrent Spark jobs. This would require a cluster with 1000 cores. this is just not cost effective. Use Spark for what it's good for - ad-hoc, interactive, and iterative (machine learning, graph) analytics. Use an application server for what it's good - managing a large amount of concurrent requests. And use a database for what it's good for - storing/retrieving data. And any serious production deployment will need failover, throttling, back pressure, auto-scaling, and service discovery. While Spark supports these to varying levels of production-readiness, Spark is a batch-oriented system and not meant to be put on the user request/response hot path. For the failover, throttling, back pressure, autoscaling that i mentioned above, it's worth checking out the suite of Netflix OSS - particularly Hystrix, Eureka, Zuul, Karyon, etc: http://netflix.github.io/ Here's my github project that incorporates a lot of these: https://github.com/cfregly/fluxcapacitor Here's a netflix Skunkworks github project that packages these up in Docker images: https://github.com/Netflix-Skunkworks/zerotodocker On Thu, Mar 10, 2016 at 1:40 PM, velvia.github <velvia.git...@gmail.com> wrote: > Hi, > > I just wrote a blog post which might be really useful to you -- I have just > benchmarked being able to achieve 700 queries per second in Spark. So, > yes, > web speed SQL queries are definitely possible. Read my new blog post: > > http://velvia.github.io/Spark-Concurrent-Fast-Queries/ > > and feel free to email me (at vel...@gmail.com) if you would like to > follow > up. > > -Evan > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-use-spark-inside-a-web-service-tp26426p26451.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Using netlib-java in Spark 1.6 on linux
I have all of this pre-wired up and Docker-ized for your instant enjoyment here: https://github.com/fluxcapacitor/pipeline/wiki You can review the Dockerfile <https://github.com/fluxcapacitor/pipeline/blob/master/Dockerfile> for the details (Ubuntu 14.04-based). This is easy BREEZEy. Also, here's a link to a related post from 1.5 yrs ago: http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-td19662.html Hit me up off-line if you want to dive into the details. We can circle back here if we find anything out of the ordinary. On Thu, Mar 3, 2016 at 12:16 PM, cindymc <cindy.mcmul...@oracle.com> wrote: > I have these on my system: > > /usr/lib64:$ /sbin/ldconfig -p | grep liblapack > liblapack.so.3 (libc6,x86-64) => /usr/lib64/atlas/liblapack.so.3 > liblapack.so.3 (libc6,x86-64) => > /usr/lib64/atlas-sse3/liblapack.so.3 > liblapack.so.3 (libc6,x86-64) => /usr/lib64/liblapack.so.3 > liblapack.so.3 (libc6) => /usr/lib/atlas/liblapack.so.3 > liblapack.so.3 (libc6) => /usr/lib/liblapack.so.3 > liblapack.so (libc6,x86-64) => /usr/lib64/liblapack.so > liblapack.so (libc6) => /usr/lib/liblapack.so > /usr/lib64:$ /sbin/ldconfig -p | grep libblas > libblas.so.3 (libc6,x86-64) => /usr/lib64/libblas.so.3 > libblas.so.3 (libc6) => /usr/lib/libblas.so.3 > libblas.so (libc6,x86-64) => /usr/lib64/libblas.so > libblas.so (libc6) => /usr/lib/libblas.so > > And this in my /etc/ld.so.conf: > include ld.so.conf.d/*.conf > /usr/lib64 > > Then ran 'ldconfig'. > > I also have this: > /usr/lib64:$ yum list libgfortran > Loaded plugins: aliases, changelog, kabi, presto, refresh-packagekit, > security, tmprepo, ulninfo, verify, versionlock > Loading support for kernel ABI > Installed Packages > libgfortran.i686 > 4.4.7-16.el6 @base > libgfortran.x86_64 > 4.4.7-16.el6 @anacond > > I've set: > LD_LIBRARY_PATH=/usr/lib64 > > Still no luck. Suggestions? Do I have to > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Using-netlib-java-in-Spark-1-6-on-linux-tp26386p26392.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Recommendation for a good book on Spark, beginner to moderate knowledge
for hands-on, check out the end-to-end reference data pipeline available either from the github or docker repo's described here: http://advancedspark.com/ i use these assets to training folks of all levels of Spark knowledge. also, some relevant videos and slideshare presentations, but might be a bit advanced for Spark n00bs. On Sun, Feb 28, 2016 at 4:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > In my opinion the best way to learn something is trying it on the spot. > > As suggested if you have Hadoop, Hive and Spark installed and you are OK > with SQL then you will have to focus on Scala and Spark pretty much. > > Your best bet is interactive work through Spark shell with Scala, > understanding RDD, DataFrame, Transformation and actions. You also have > online docs and a great number of users in this forum that can potentially > help you with your questions. Buying books can help but nothing takes the > place of getting your hands dirty so to speak. > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 28 February 2016 at 22:32, Jules Damji <dmat...@comcast.net> wrote: > >> Hello Ashoka, >> >> "Learning Spark," from O'Reilly, is certainly a good start, and all basic >> video tutorials from Spark Summit Training, "Spark Essentials", are >> excellent supplementary materials. >> >> And the best (and most effective) way to teach yourself is really firing >> up the spark-shell or pyspark and doing it yourself—immersing yourself by >> trying all basic transformations and actions on RDDs, with contrived small >> data sets. >> >> I've discovered that learning Scala & Python through their interactive >> shell, where feedback is immediate and response is quick, as the best >> learning experience. >> >> Same is true for Scala or Python Notebooks interacting with a Spark, >> running in local or cluster mode. >> >> Cheers, >> >> Jules >> >> Sent from my iPhone >> Pardon the dumb thumb typos :) >> >> On Feb 28, 2016, at 1:48 PM, Ashok Kumar <ashok34...@yahoo.com.INVALID >> <ashok34...@yahoo.com.invalid>> wrote: >> >> Hi Gurus, >> >> Appreciate if you recommend me a good book on Spark or documentation for >> beginner to moderate knowledge >> >> I very much like to skill myself on transformation and action methods. >> >> FYI, I have already looked at examples on net. However, some of them not >> clear at least to me. >> >> Warmest regards >> >> > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Evaluating spark streaming use case
good catch on the cleaner.ttl @jatin- when you say "memory-persisted RDD", what do you mean exactly? and how much data are you talking about? remember that spark can evict these memory-persisted RDDs at any time. they can be recovered from Kafka, but this is not a good situation to be in. also, is this spark 1.6 with the new mapState() or the old updateStateByKey()? you definitely want the newer 1.6 mapState(). and is there any other way to store and aggregate this data outside of spark? I get a bit nervous when I see people treat spark/streaming like an in-memory database. perhaps redis or another type of in-memory store is more appropriate. or just write to long-term storage using parquet. if this is a lot of data, you may want to use approximate probabilistic data structures like CountMin Sketch or HyperLogLog. here's some relevant links with more info - including how to use these with redis: https://www.slideshare.net/cfregly/advanced-apache-spark-meetup-approximations-and-probabilistic-data-structures-jan-28-2016-galvanize https://github.com/fluxcapacitor/pipeline/tree/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx you can then setup a cron (or airflow) spark job to do the compute and aggregate against either redis or long-term storage. this reference pipeline contains the latest airflow workflow scheduler: https://github.com/fluxcapacitor/pipeline/wiki my advice with spark streaming is to get the data out of spark streaming as quickly as possible - and into a more durable format more suitable for aggregation and compute. this greatly simplifies your operational concerns, in my opinion. good question. very common use case. > On Feb 21, 2016, at 12:22 PM, Ted Yuwrote: > > w.r.t. cleaner TTL, please see: > > [SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0 > > FYI > >> On Sun, Feb 21, 2016 at 4:16 AM, Gerard Maas wrote: >> >> It sounds like another window operation on top of the 30-min window will >> achieve the desired objective. >> Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl) >> to a long enough value and you will require enough resources (mem & disk) to >> keep the required data. >> >> -kr, Gerard. >> >>> On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar >>> wrote: >>> Hello Spark users, >>> >>> I have to aggregate messages from kafka and at some fixed interval (say >>> every half hour) update a memory persisted RDD and run some computation. >>> This computation uses last one day data. Steps are: >>> >>> - Read from realtime Kafka topic X in spark streaming batches of 5 seconds >>> - Filter the above DStream messages and keep some of them >>> - Create windows of 30 minutes on above DStream and aggregate by Key >>> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd >>> - Maintain last N such RDDs in a deque persisting them on disk. While >>> adding new RDD, subtract oldest RDD from the combinedRdd. >>> - Final step consider last N such windows (of 30 minutes each) and do final >>> aggregation >>> >>> Does the above way of using spark streaming looks reasonable? Is there a >>> better way of doing the above? >>> >>> -- >>> Thanks >>> Jatin >
Re: Communication between two spark streaming Job
if you need update notifications, you could introduce ZooKeeper (eek!) or a Kafka queue between the jobs. I've seen internal Kafka queues (relative to external spark streaming queues) used for this type of incremental update use case. think of the updates as transaction logs. > On Feb 19, 2016, at 10:35 PM, Ted Yuwrote: > > Have you considered using a Key Value store which is accessible to both jobs ? > > The communication would take place through this store. > > Cheers > >> On Fri, Feb 19, 2016 at 11:48 AM, Ashish Soni wrote: >> Hi , >> >> Is there any way we can communicate across two different spark streaming job >> , as below is the scenario >> >> we have two spark streaming job one to process metadata and one to process >> actual data ( this needs metadata ) >> >> So if someone did the metadata update we need to update the cache maintained >> in the second job so that it can take use of new metadata >> >> Please help >> >> Ashish >
Re: Allowing parallelism in spark local mode
sounds like the first job is occupying all resources. you should limit the resources that a single job can acquire. fair scheduler is one way to do that. a possibly simpler way is to configured spark.deploy.defaultCores or spark.cores.max? the defaults for these values - for the Spark default cluster resource manager (aka Spark Standalone) - is infinite. every job will try to acquire every resource. https://spark.apache.org/docs/latest/spark-standalone.html here's an example config that i use for my reference data pipeline project: https://github.com/fluxcapacitor/pipeline/blob/master/config/spark/spark-defaults.conf i'm always playing with these values to simulate different conditions, but that's the current snapshot that might be helpful. also, don't forget about executor memory... On Fri, Feb 12, 2016 at 1:40 PM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > You’ll want to setup the FAIR scheduler as described here: > https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application > > From: yael aharon <yael.aharo...@gmail.com> > Date: Friday, February 12, 2016 at 2:00 PM > To: "user@spark.apache.org" <user@spark.apache.org> > Subject: Allowing parallelism in spark local mode > > Hello, > I have an application that receives requests over HTTP and uses spark in > local mode to process the requests. Each request is running in its own > thread. > It seems that spark is queueing the jobs, processing them one at a time. > When 2 requests arrive simultaneously, the processing time for each of them > is almost doubled. > I tried setting spark.default.parallelism, spark.executor.cores, > spark.driver.cores but that did not change the time in a meaningful way. > > Am I missing something obvious? > thanks, Yael > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Do we need to enabled Tungsten sort in Spark 1.6?
Yeah, this confused me, as well. Good question, Umesh. As Ted pointed out: between Spark 1.5 and 1.6, o.a.s.shuffle.unsafe.UnsafeShuffleManager no longer exists as a separate shuffle manager. Here's the old code (notice the o.a.s.shuffle.unsafe package): https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala The functionality has essentially been rolled into o.a.s.shuffle.sort.SortShuffleManager with the help of a Scala match/case statement. Here's the newer code (notice the o.a.s.shuffle.unsafe package is gone): https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala On Fri, Jan 8, 2016 at 1:14 PM, Ted Yu <yuzhih...@gmail.com> wrote: > For "spark.shuffle.manager", the default is "sort" > From core/src/main/scala/org/apache/spark/SparkEnv.scala : > > val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") > > "tungsten-sort" is the same as "sort" : > > val shortShuffleMgrNames = Map( > "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", > "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager", > "tungsten-sort" -> > "org.apache.spark.shuffle.sort.SortShuffleManager") > > FYI > > On Fri, Jan 8, 2016 at 12:59 PM, Umesh Kacha <umesh.ka...@gmail.com> > wrote: > >> ok thanks so it will be enabled by default always if yes then in >> documentation why default shuffle manager is mentioned as sort? >> >> On Sat, Jan 9, 2016 at 1:55 AM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> From >>> sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala : >>> >>> case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) => >>> val runFunc = (sqlContext: SQLContext) => { >>> logWarning( >>> s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is >>> deprecated and " + >>> s"will be ignored. Tungsten will continue to be used.") >>> Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true")) >>> } >>> >>> FYI >>> >>> On Fri, Jan 8, 2016 at 12:21 PM, unk1102 <umesh.ka...@gmail.com> wrote: >>> >>>> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark >>>> 1.6 I >>>> dont see any difference I was expecting Spark 1.6 to be faster. Anyways >>>> do >>>> we need to enable Tunsten and unsafe options or they are enabled by >>>> default >>>> I see in documentation that default sort manager is sort I though it is >>>> Tungsten no? Please guide. >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Date Time Regression as Feature
Here's a good blog post by Sandy Ryza @ Cloudera on Spark + Time Series Data: http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/ Might give you some things to try. On Thu, Jan 7, 2016 at 11:40 PM, dEEPU <mdeep...@hotmail.com> wrote: > Maybe u want to convert the date to a duration in form of number of > hours/days and then do calculation on it > On Jan 8, 2016 12:39 AM, Jorge Machado <jorge.w.mach...@hotmail.com> > wrote: > Hello all, > > I'm new to machine learning. I'm trying to predict some electric usage > with a decision Free > The data is : > 2015-12-10-10:00, 1200 > 2015-12-11-10:00, 1150 > > My question is : What is the best way to turn date and time into feature > on my Vector ? > > Something like this : Vector (1200, [2015,12,10,10,10] )? > I could not fine any example with value prediction where features had > dates in it. > > Thanks > > Jorge Machado > > Jorge Machado > jo...@jmachado.me > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: how to extend java transformer from Scala UnaryTransformer ?
Looks like you're not registering the input param correctly. Below are examples from the Spark Java source that show how to build a custom transformer. Note that a Model is a Transformer. Also, that chimpler/wordpress/naive bayes example is a bit dated. I tried to implement it a while ago, but didn't get very far given that the ML API has charged ahead in favor of pipelines and the new spark.ml package. https://github.com/apache/spark/blob/branch-1.5/examples/src/main/java/org/apache/spark/examples/ml/JavaDeveloperApiExample.java#L126 https://github.com/apache/spark/blob/branch-1.5/examples/src/main/java/org/apache/spark/examples/ml/JavaDeveloperApiExample.java#L191 On Fri, Jan 1, 2016 at 11:38 AM, Andy Davidson < a...@santacruzintegration.com> wrote: > I am trying to write a trivial transformer I use use in my pipeline. I am > using java and spark 1.5.2. It was suggested that I use the Tokenize.scala > class as an example. This should be very easy how ever I do not understand > Scala, I am having trouble debugging the following exception. > > Any help would be greatly appreciated. > > Happy New Year > > Andy > > java.lang.IllegalArgumentException: requirement failed: Param > null__inputCol does not belong to > Stemmer_2f3aa96d-7919-4eaa-ad54-f7c620b92d1c. > at scala.Predef$.require(Predef.scala:233) > at org.apache.spark.ml.param.Params$class.shouldOwn(params.scala:557) > at org.apache.spark.ml.param.Params$class.set(params.scala:436) > at org.apache.spark.ml.PipelineStage.set(Pipeline.scala:37) > at org.apache.spark.ml.param.Params$class.set(params.scala:422) > at org.apache.spark.ml.PipelineStage.set(Pipeline.scala:37) > at org.apache.spark.ml.UnaryTransformer.setInputCol(Transformer.scala:83) > at com.pws.xxx.ml.StemmerTest.test(StemmerTest.java:30) > > > > public class StemmerTest extends AbstractSparkTest { > > @Test > > public void test() { > > Stemmer stemmer = new Stemmer() > > .setInputCol("raw”) //*line 30* > > .setOutputCol("filtered"); > > } > > } > > > /** > > * @ see spark-1.5.1/mllib/src/main/scala/org/apache > /spark/ml/feature/Tokenizer.scala > > * @ see https://chimpler.wordpress.com/2014/06/11/classifiying-documents- > using-naive-bayes-on-apache-spark-mllib/ > > * @ see http://www.tonytruong.net/movie-rating-prediction-with-apache- > spark-and-hortonworks/ > > * > > * @author andrewdavidson > > * > > */ > > public class Stemmer extends UnaryTransformer<List, List, > Stemmer> implements Serializable{ > > static Logger logger = LoggerFactory.getLogger(Stemmer.class); > > private static final long serialVersionUID = 1L; > > private static final ArrayType inputType = > DataTypes.createArrayType(DataTypes.StringType, true); > > private final String uid = Stemmer.class.getSimpleName() + "_" + > UUID.randomUUID().toString(); > > > @Override > > public String uid() { > > return uid; > > } > > > /* > >override protected def validateInputType(inputType: DataType): > Unit = { > > require(inputType == StringType, s"Input type must be string type but > got $inputType.") > > } > > */ > > @Override > > public void validateInputType(DataType inputTypeArg) { > > String msg = "inputType must be " + inputType.simpleString() + " > but got " + inputTypeArg.simpleString(); > > assert (inputType.equals(inputTypeArg)) : msg; > > } > > > > @Override > > public Function1<List, List> createTransformFunc() { > > // > http://stackoverflow.com/questions/6545066/using-scala-from-java-passing-functions-as-parameters > > Function1<List, List> f = new > AbstractFunction1<List, List>() { > > public List apply(List words) { > > for(String word : words) { > > logger.error("AEDWIP input word: {}", word); > > } > > return words; > > } > > }; > > > > return f; > > } > > > @Override > > public DataType outputDataType() { > > return DataTypes.createArrayType(DataTypes.StringType, true); > > } > > } > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: How to specify the numFeatures in HashingTF
You can use CrossValidator/TrainingValidationSplit with ParamGridBuilder and Evaluator to empirically choose the model hyper parameters (ie. numFeatures) per the following: http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-cross-validation http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-train-validation-split On Fri, Jan 1, 2016 at 7:48 AM, Yanbo Liang <yblia...@gmail.com> wrote: > You can refer the following code snippet to set numFeatures for HashingTF: > > val hashingTF = new HashingTF() > .setInputCol("words") > .setOutputCol("features") > .setNumFeatures(n) > > > 2015-10-16 0:17 GMT+08:00 Nick Pentreath <nick.pentre...@gmail.com>: > >> Setting the numfeatures higher than vocab size will tend to reduce the >> chance of hash collisions, but it's not strictly necessary - it becomes a >> memory / accuracy trade off. >> >> Surprisingly, the impact on model performance of moderate hash collisions >> is often not significant. >> >> So it may be worth trying a few settings out (lower than vocab, higher >> etc) and see what the impact is on evaluation metrics. >> >> — >> Sent from Mailbox <https://www.dropbox.com/mailbox> >> >> >> On Thu, Oct 15, 2015 at 5:46 PM, Jianguo Li <flyingfromch...@gmail.com> >> wrote: >> >>> Hi, >>> >>> There is a parameter in the HashingTF called "numFeatures". I was >>> wondering what is the best way to set the value to this parameter. In the >>> use case of text categorization, do you need to know in advance the number >>> of words in your vocabulary? or do you set it to be a large value, greater >>> than the number of words in your vocabulary? >>> >>> Thanks, >>> >>> Jianguo >>> >> >> > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Run ad-hoc queries at runtime against cached RDDs
There are a few diff ways to apply approximation algorithms and probabilistic data structures to your Spark data - including Spark's countApproxDistinct() methods as you pointed out. There's also Twitter Algebird, and Redis HyperLogLog (PFCOUNT, PFADD). Here's some examples from my *pipeline Github project* <https://github.com/fluxcapacitor/pipeline/wiki> that demonstrates how to use these in a streaming context - if that's interesting to you, at all: *1) Algebird CountMin Sketch* https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/AlgebirdCountMinSketchTopK.scala *2) Algebird HyperLogLog* https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/AlgebirdHyperLogLog.scala *3) Redis HyperLogLog* https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/RedisHyperLogLog.scala In addition, my *Global Advanced Apache Spark Meetup* (based in SF) has an entire evening dedicated to this exact topic next month: http://www.meetup.com/Advanced-Apache-Spark-Meetup/events/22616/. Video, slides, and live streaming urls will be available the day of the meetup. On Mon, Dec 14, 2015 at 12:20 PM, Krishna Rao <krishnanj...@gmail.com> wrote: > Thanks for the response Jörn. So to elaborate, I have a large dataset with > userIds, each tagged with a property, e.g.: > > user_1prop1=X > user_2prop1=Yprop2=A > user_3prop2=B > > > I would like to be able to get the number of distinct users that have a > particular property (or combination of properties). The cardinality of each > property is in the 1000s and will only grow, as will the number of > properties. I'm happy with approximate values to trade accuracy for > performance. > > Spark's performance when doing this via spark-shell is more that excellent > using the "countApproxDistinct" method on a "javaRDD". However, I've no > idea what's the best way to be able to run a query programatically like I > can do manually via spark-shell. > > Hope this clarifies things. > > > On 14 December 2015 at 17:04, Jörn Franke <jornfra...@gmail.com> wrote: > >> Can you elaborate a little bit more on the use case? It looks a little >> bit like an abuse of Spark in general . Interactive queries that are not >> suitable for in-memory batch processing might be better supported by ignite >> that has in-memory indexes, concept of hot, warm, cold data etc. or hive on >> tez+llap . >> >> > On 14 Dec 2015, at 17:19, Krishna Rao <krishnanj...@gmail.com> wrote: >> > >> > Hi all, >> > >> > What's the best way to run ad-hoc queries against a cached RDDs? >> > >> > For example, say I have an RDD that has been processed, and persisted >> to memory-only. I want to be able to run a count (actually >> "countApproxDistinct") after filtering by an, at compile time, unknown >> (specified by query) value. >> > >> > I've experimented with using (abusing) Spark Streaming, by streaming >> queries and running these against the cached RDD. However, as I say I don't >> think that this is an intended use-case of Streaming. >> > >> > Cheers, >> > >> > Krishna >> > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: 回复: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²
@Jim- I'm wondering if those docs are outdated as its my understanding (please correct if I'm wrong), that we should never be seeing OOMs as 1.5/Tungsten not only improved (reduced) the memory footprint of our data, but also introduced better task level - and even key level - external spilling before an OOM occurs. Michael's comment seems to indicate there was a missed case that is fixed in 1.6. I personally see far too many OOMs for what I expected out of 1.5, so I'm anxious to try 1.6 and hopefully squash more of these edge cases. while increasing parallelism is definitely a best practice and applies either way, the docs could use some updating, I feel, by the contributors of this code. > On Dec 30, 2015, at 12:18 PM, SparkUserwrote: > > Sounds like you guys are on the right track, this is purely FYI because I > haven't seen it posted, just responding to the line in the original post that > your data structure should fit in memory. > > OK two more disclaimers "FWIW" and "maybe this is not relevant or already > covered" OK here goes... > > from > http://spark.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks > > Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit > in memory, but because the working set of one of your tasks, such as one of > the reduce tasks in groupByKey, was too large. Spark’s shuffle operations > (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within > each task to perform the grouping, which can often be large. The simplest fix > here is to increase the level of parallelism, so that each task’s input set > is smaller. Spark can efficiently support tasks as short as 200 ms, because > it reuses one executor JVM across many tasks and it has a low task launching > cost, so you can safely increase the level of parallelism to more than the > number of cores in your clusters. > > I would be curious if that helps at all. Sounds like an interesting problem > you are working on. > > Jim > >> On 12/29/2015 05:51 PM, Davies Liu wrote: >> Hi Andy, >> >> Could you change logging level to INFO and post some here? There will be >> some logging about the memory usage of a task when OOM. >> >> In 1.6, the memory for a task is : (HeapSize - 300M) * 0.75 / number of >> tasks. Is it possible that the heap is too small? >> >> Davies >> >> -- >> Davies Liu >> Sent with Sparrow (http://www.sparrowmailapp.com/?sig) >> >> 已使用 Sparrow (http://www.sparrowmailapp.com/?sig) >> >> 在 2015年12月29日 星期二,下午4:28,Andy Davidson 写道: >> >>> Hi Michael >>> >>> https://github.com/apache/spark/archive/v1.6.0.tar.gz >>> >>> Both 1.6.0 and 1.5.2 my unit test work when I call reparation(1) before >>> saving output. Coalesce still fails. >>> >>> Coalesce(1) spark-1.5.2 >>> Caused by: >>> java.io.IOException: Unable to acquire 33554432 bytes of memory >>> >>> >>> Coalesce(1) spark-1.6.0 >>> >>> Caused by: >>> java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 >>> >>> Hope this helps >>> >>> Andy >>> >>> From: Michael Armbrust >> (mailto:mich...@databricks.com)> >>> Date: Monday, December 28, 2015 at 2:41 PM >>> To: Andrew Davidson >> (mailto:a...@santacruzintegration.com)> >>> Cc: "user @spark" >>> Subject: Re: trouble understanding data frame memory usage >>> ³java.io.IOException: Unable to acquire memory² >>> Unfortunately in 1.5 we didn't force operators to spill when ran out of memory so there is not a lot you can do. It would be awesome if you could test with 1.6 and see if things are any better? On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson wrote: > I am using spark 1.5.1. I am running into some memory problems with a > java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how > ever I want to better understand what is going on so I can write better > code in the future. The test runs on a Mac, master="Local[2]" > > I have a java unit test that starts by reading a 672K ascii file. I my > output data file is 152K. Its seems strange that such a small amount of > data would cause an out of memory exception. I am running a pretty > standard machine learning process > > Load data > create a ML pipeline > transform the data > Train a model > Make predictions > Join the predictions back to my original data set > Coalesce(1), I only have a small amount of data and want to save it in a > single file > Save final results back to disk > > > Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to > acquire memory” > > To try and figure out what is going I put log messages in to count
Re: SparkSQL integration issue with AWS S3a
are the credentials visible from each Worker node to all the Executor JVMs on each Worker? > On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev >wrote: > > Dear Spark community, > > I faced the following issue with trying accessing data on S3a, my code is the > following: > > val sparkConf = new SparkConf() > > val sc = new SparkContext(sparkConf) > sc.hadoopConfiguration.set("fs.s3a.impl", > "org.apache.hadoop.fs.s3a.S3AFileSystem") > sc.hadoopConfiguration.set("fs.s3a.access.key", "---") > sc.hadoopConfiguration.set("fs.s3a.secret.key", "---") > val sqlContext = SQLContext.getOrCreate(sc) > val df = sqlContext.read.parquet(...) > df.count > > It results in the following exception and log messages: > 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load > credentials from BasicAWSCredentialsProvider: Access key or secret key is null > 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance > metadata service at URL: > http://x.x.x.x/latest/meta-data/iam/security-credentials/ > 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load > credentials from InstanceProfileCredentialsProvider: The requested metadata > is not found at http://x.x.x.x/latest/meta-data/iam/security-credentials/ > 15/12/30 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3) > com.amazonaws.AmazonClientException: Unable to load AWS credentials from any > provider in the chain > at > com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521) > at > com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) > at > com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297) > > I run standalone spark 1.5.2 and using hadoop 2.7.1 > > any ideas/workarounds? > > AWS credentials are correct for this bucket > > Thank you, > Konstantin Kudryavtsev
Re: SparkSQL integration issue with AWS S3a
couple things: 1) switch to IAM roles if at all possible - explicitly passing AWS credentials is a long and lonely road in the end 2) one really bad workaround/hack is to run a job that hits every worker and writes the credentials to the proper location (~/.awscredentials or whatever) ^^ i wouldn't recommend this. ^^ it's horrible and doesn't handle autoscaling, but i'm mentioning it anyway as it is a temporary fix. if you switch to IAM roles, things become a lot easier as you can authorize all of the EC2 instances in the cluster - and handles autoscaling very well - and at some point, you will want to autoscale. On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev < kudryavtsev.konstan...@gmail.com> wrote: > Chris, > > good question, as you can see from the code I set up them on driver, so I > expect they will be propagated to all nodes, won't them? > > Thank you, > Konstantin Kudryavtsev > > On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly <ch...@fregly.com> wrote: > >> are the credentials visible from each Worker node to all the Executor >> JVMs on each Worker? >> >> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev < >> kudryavtsev.konstan...@gmail.com> wrote: >> >> Dear Spark community, >> >> I faced the following issue with trying accessing data on S3a, my code is >> the following: >> >> val sparkConf = new SparkConf() >> >> val sc = new SparkContext(sparkConf) >> sc.hadoopConfiguration.set("fs.s3a.impl", >> "org.apache.hadoop.fs.s3a.S3AFileSystem") >> sc.hadoopConfiguration.set("fs.s3a.access.key", "---") >> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---") >> >> val sqlContext = SQLContext.getOrCreate(sc) >> >> val df = sqlContext.read.parquet(...) >> >> df.count >> >> >> It results in the following exception and log messages: >> >> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load >> credentials from BasicAWSCredentialsProvider: *Access key or secret key is >> null* >> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance >> metadata service at URL: >> http://x.x.x.x/latest/meta-data/iam/security-credentials/ >> 15/12/30 <http://x.x.x.x/latest/meta-data/iam/security-credentials/15/12/30> >> 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials from >> InstanceProfileCredentialsProvider: The requested metadata is not found at >> http://x.x.x.x/latest/meta-data/iam/security-credentials/ >> 15/12/30 <http://x.x.x.x/latest/meta-data/iam/security-credentials/15/12/30> >> 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3) >> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any >> provider in the chain >> at >> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) >> at >> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521) >> at >> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) >> at >> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) >> at >> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297) >> >> >> I run standalone spark 1.5.2 and using hadoop 2.7.1 >> >> any ideas/workarounds? >> >> AWS credentials are correct for this bucket >> >> Thank you, >> Konstantin Kudryavtsev >> >> > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Using Experminal Spark Features
A lot of folks are using the new Kafka Direct Stream API in production. And a lot of folks who used the old Kafka Receiver-based API are migrating over. The usual downside to "Experimental" features in Spark is that the API might change, so you'll need to rewrite some code. Stability-wise, the Spark codebase has a TON of tests around every new feature - including experimental features. >From experience, the Kafka Direct Stream API is very stable and a lot of momentum is behind this implementation. Check out my *pipeline* Github project to see this impl fully configured and working within a Docker instance: https://github.com/fluxcapacitor/pipeline/wiki Here's a link to the kafka, kafka-rest-api, and kafka schema registry configuration: https://github.com/fluxcapacitor/pipeline/tree/master/config And here's a link to a sample app that uses the Kafka Direct API and stores data in Cassandra: https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/store/Cassandra.scala https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/start-streaming-ratings-cassandra.sh Here's a link to the Docker image which contains the installation scripts for Confluent's Kafka Distribution: https://github.com/fluxcapacitor/pipeline/blob/master/Dockerfile All code is in Scala. On Wed, Dec 30, 2015 at 11:26 AM, David Newberger < david.newber...@wandcorp.com> wrote: > Hi All, > > > > I’ve been looking at the Direct Approach for streaming Kafka integration ( > http://spark.apache.org/docs/latest/streaming-kafka-integration.html) > because it looks like a good fit for our use cases. My concern is the > feature is experimental according to the documentation. Has anyone used > this approach yet and if so what has you experience been with using it? If > it helps we’d be looking to implement it using Scala. Secondly, in general > what has people experience been with using experimental features in Spark? > > > > Cheers, > > > > David Newberger > > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: SparkSQL Hive orc snappy table
; at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1466) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.IndexOutOfBoundsException: Index: 0 >> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1016) >> ... 48 more >> Caused by: java.lang.IndexOutOfBoundsException: Index: 0 >> at java.util.Collections$EmptyList.get(Collections.java:4454) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcProto$Type.getSubtypes(OrcProto.java:12240) >> at >> org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getColumnIndicesFromNames(ReaderImpl.java:651) >> at >> org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getRawDataSizeOfColumns(ReaderImpl.java:634) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:927) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:836) >> at >> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:702) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> I will be glad for any help on that matter. >> >> Regards >> Dawid Wysakowicz >> > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Problem with WINDOW functions?
on quick glance, it appears that you're calling collect() in there which is bringing down a huge amount of data down to the single Driver. this is why, when you allocated more memory to the Driver, a different error emerges most -definitely related to stop-the-world GC to cause the node to become unresponsive. in general, collect() is bad and should only be used on small datasets for debugging or sanity-check purposes. anything serious should be done within the executors running on worker nodes - not on the Driver node. think of the Driver not as simply a coordinator - coordinating and allocating tasks to workers with the help of the cluster resource manager (i.e. YARN, Spark Standalone, Mesos, etc) very little memory should be allocated to the Driver. just enough to coordinate and a little extra for Driver-side debugging, but that's it. leave the rest up to the cluster nodes. > On Dec 29, 2015, at 8:28 PM, vadimtkwrote: > > table > > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame Vs RDDs ... Which one to use When ?
here's a good article that sums it up, in my opinion: https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/ basically, building apps with RDDs is like building with apps with primitive JVM bytecode. haha. @richard: remember that even if you're currently writing RDDs in Java/Scala, you're not gaining the code gen/rewrite performance benefits of the Catalyst optimizer. i agree with @daniel who suggested that you start with DataFrames and revert to RDDs only when DataFrames don't give you what you need. the only time i use RDDs directly these days is when i'm dealing with a Spark library that has not yet moved to DataFrames - ie. GraphX - and it's kind of annoying switching back and forth. almost everything you need should be in the DataFrame API. Datasets are similar to RDDs, but give you strong compile-time typing, tabular structure, and Catalyst optimizations. hopefully Datasets is the last API we see from Spark SQL... i'm getting tired of re-writing slides and book chapters! :) On Mon, Dec 28, 2015 at 4:55 PM, Richard Eggert <richard.egg...@gmail.com> wrote: > One advantage of RDD's over DataFrames is that RDD's allow you to use your > own data types, whereas DataFrames are backed by RDD's of Record objects, > which are pretty flexible but don't give you much in the way of > compile-time type checking. If you have an RDD of case class elements or > JSON, then Spark SQL can automatically figure out how to convert it into an > RDD of Record objects (and therefore a DataFrame), but there's no way to > automatically go the other way (from DataFrame/Record back to custom types). > > In general, you can ultimately do more with RDDs than DataFrames, but > DataFrames give you a lot of niceties (automatic query optimization, table > joins, SQL-like syntax, etc.) for free, and can avoid some of the runtime > overhead associated with writing RDD code in a non-JVM language (such as > Python or R), since the query optimizer is effectively creating the > required JVM code under the hood. There's little to no performance benefit > if you're already writing Java or Scala code, however (and RDD-based code > may actually perform better in some cases, if you're willing to carefully > tune your code). > > On Mon, Dec 28, 2015 at 3:05 PM, Daniel Siegmann < > daniel.siegm...@teamaol.com> wrote: > >> DataFrames are a higher level API for working with tabular data - RDDs >> are used underneath. You can use either and easily convert between them in >> your code as necessary. >> >> DataFrames provide a nice abstraction for many cases, so it may be easier >> to code against them. Though if you're used to thinking in terms of >> collections rather than tables, you may find RDDs more natural. Data frames >> can also be faster, since Spark will do some optimizations under the hood - >> if you are using PySpark, this will avoid the overhead. Data frames may >> also perform better if you're reading structured data, such as a Hive table >> or Parquet files. >> >> I recommend you prefer data frames, switching over to RDDs as necessary >> (when you need to perform an operation not supported by data frames / Spark >> SQL). >> >> HOWEVER (and this is a big one), Spark 1.6 will have yet another API - >> datasets. The release of Spark 1.6 is currently being finalized and I would >> expect it in the next few days. You will probably want to use the new API >> once it's available. >> >> >> On Sun, Dec 27, 2015 at 9:18 PM, Divya Gehlot <divya.htco...@gmail.com> >> wrote: >> >>> Hi, >>> I am new bee to spark and a bit confused about RDDs and DataFames in >>> Spark. >>> Can somebody explain me with the use cases which one to use when ? >>> >>> Would really appreciate the clarification . >>> >>> Thanks, >>> Divya >>> >> >> > > > -- > Rich > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Help: Driver OOM when shuffle large amount of data
which version of spark is this? is there any chance that a single key - or set of keys- key has a large number of values relative to the other keys (aka. skew)? if so, spark 1.5 *should* fix this issue with the new tungsten stuff, although I had some issues still with 1.5.1 in a similar situation. I'm waiting to test with 1.6.0 before I start asking/creating jiras. > On Dec 28, 2015, at 5:23 AM, Eugene Morozov> wrote: > > Kendal, > > have you tried to reduce number of partitions? > > -- > Be well! > Jean Morozov > >> On Mon, Dec 28, 2015 at 9:02 AM, kendal wrote: >> My driver is running OOM with my 4T data set... I don't collect any data to >> driver. All what the program done is map - reduce - saveAsTextFile. But the >> partitions to be shuffled is quite large - 20K+. >> >> The symptom what I'm seeing the timeout when GetMapOutputStatuses from >> Driver. >> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Don't have map outputs >> for shuffle 0, fetching them >> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Doing the fetch; >> tracker endpoint = >> AkkaRpcEndpointRef(Actor[akka.tcp://sparkDriver@10.115.58.55:52077/user/MapOutputTracker#-1937024516]) >> 15/12/24 02:06:21 WARN akka.AkkaRpcEndpointRef: Error sending message >> [message = GetMapOutputStatuses(0)] in 1 attempts >> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 >> seconds]. This timeout is controlled by spark.rpc.askTimeout >> at >> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214) >> at >> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229) >> at >> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225) >> at >> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242) >> >> But the root cause is OOM: >> 15/12/24 02:05:36 ERROR actor.ActorSystemImpl: Uncaught fatal error from >> thread [sparkDriver-akka.remote.default-remote-dispatcher-24] shutting down >> ActorSystem [sparkDriver] >> java.lang.OutOfMemoryError: Java heap space >> at java.util.Arrays.copyOf(Arrays.java:2271) >> at >> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178) >> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:131) >> at >> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) >> at >> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) >> at >> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842) >> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743) >> at >> akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:718) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> >> I've already allocated 16G memory for my driver - which is the hard limit >> MAX of my Yarn cluster. And I also applied Kryo serialization... Any idea to >> reduce memory foot point? >> And what confuses me is that, even I have 20K+ partition to shuffle, why I >> need so much memory?! >> >> Thank you so much for any help! >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Help-Driver-OOM-when-shuffle-large-amount-of-data-tp25818.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >
Re: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²
the 200 number looks strangely similar to the following default number of post-shuffle partitions which is often left untuned: spark.sql.shuffle.partitions here's the property defined in the Spark source: https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L232 note that Spark 1.6+ will make this config param obsolete in favor of adaptive execution which uses the following as a low watermark for # of post-shuffle partitions: spark.sql.adaptive.minNumPostShufflePartitions here's the property defined in the Spark source: https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L245 On Mon, Dec 28, 2015 at 5:41 PM, Michael Armbrust <mich...@databricks.com> wrote: > Unfortunately in 1.5 we didn't force operators to spill when ran out of > memory so there is not a lot you can do. It would be awesome if you could > test with 1.6 and see if things are any better? > > On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson < > a...@santacruzintegration.com> wrote: > >> I am using spark 1.5.1. I am running into some memory problems with a >> java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how >> ever I want to better understand what is going on so I can write better >> code in the future. The test runs on a Mac, master="Local[2]" >> >> I have a java unit test that starts by reading a 672K ascii file. I my >> output data file is 152K. Its seems strange that such a small amount of >> data would cause an out of memory exception. I am running a pretty standard >> machine learning process >> >> >>1. Load data >>2. create a ML pipeline >>3. transform the data >>4. Train a model >>5. Make predictions >>6. Join the predictions back to my original data set >>7. Coalesce(1), I only have a small amount of data and want to save >>it in a single file >>8. Save final results back to disk >> >> >> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to >> acquire memory” >> >> To try and figure out what is going I put log messages in to count the >> number of partitions >> >> Turns out I have 20 input files, each one winds up in a separate >> partition. Okay so after loading I call coalesce(1) and check to make sure >> I only have a single partition. >> >> The total number of observations is 1998. >> >> After calling step 7 I count the number of partitions and discovered I >> have 224 partitions!. Surprising given I called Coalesce(1) before I >> did anything with the data. My data set should easily fit in memory. When I >> save them to disk I get 202 files created with 162 of them being empty! >> >> In general I am not explicitly using cache. >> >> Some of the data frames get registered as tables. I find it easier to use >> sql. >> >> Some of the data frames get converted back to RDDs. I find it easier to >> create RDD this way >> >> I put calls to unpersist(true). In several places >> >>private void memoryCheck(String name) { >> >> Runtime rt = Runtime.getRuntime(); >> >> logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {} >> df.size: {}", >> >> name, >> >> String.format("%,d", rt.totalMemory()), >> >> String.format("%,d", rt.freeMemory())); >> >> } >> >> Any idea how I can get a better understanding of what is going on? My >> goal is to learn to write better spark code. >> >> Kind regards >> >> Andy >> >> Memory usages at various points in my unit test >> >> name: rawInput totalMemory: 447,741,952 freeMemory: 233,203,184 >> >> name: naiveBayesModel totalMemory: 509,083,648 freeMemory: >> 403,504,128 >> >> name: lpRDD totalMemory: 509,083,648 freeMemory: 402,288,104 >> >> name: results totalMemory: 509,083,648 freeMemory: 368,011,008 >> >> >>DataFrame exploreDF = results.select(results.col("id"), >> >> results.col("label"), >> >> results.col("binomialLabel"), >> >> >> results.col("labelIndex"), >> >> >> results.col("prediction"), >> >> >>
Re: fishing for help!
note that with AWS, you can use Placement Groups <http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/placement-groups.html> and EC2 instances with Enhanced Networking <http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/enhanced-networking.html> to lower network latency and increase network throughput within the same AZ (data center). On Tue, Dec 22, 2015 at 12:11 AM, Eran Witkon <eranwit...@gmail.com> wrote: > I'll check it out. > > On Tue, 22 Dec 2015 at 00:30 Michal Klos <michal.klo...@gmail.com> wrote: > >> If you are running on Amazon, then it's always a crapshoot as well. >> >> M >> >> On Dec 21, 2015, at 4:41 PM, Josh Rosen <joshro...@databricks.com> wrote: >> >> @Eran, are Server 1 and Server 2 both part of the same cluster / do they >> have similar positions in the network topology w.r.t the Spark executors? >> If Server 1 had fast network access to the executors but Server 2 was >> across a WAN then I'd expect the job to run slower from Server 2 duet to >> the extra network latency / reduced bandwidth. This is assuming that you're >> running the driver in non-cluster deploy mode (so the driver process runs >> on the machine which submitted the job). >> >> On Mon, Dec 21, 2015 at 1:30 PM, Igor Berman <igor.ber...@gmail.com> >> wrote: >> >>> look for differences: packages versions, cpu/network/memory diff etc etc >>> >>> >>> On 21 December 2015 at 14:53, Eran Witkon <eranwit...@gmail.com> wrote: >>> >>>> Hi, >>>> I know it is a wide question but can you think of reasons why a pyspark >>>> job which runs on from server 1 using user 1 will run faster then the same >>>> job when running on server 2 with user 1 >>>> Eran >>>> >>> >>> >> -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Memory allocation for Broadcast values
Note that starting with Spark 1.6, memory can be dynamically allocated by the Spark execution engine based on workload heuristics. You can still set a low watermark for the spark.storage.memoryFraction (RDD cache), but the rest can be dynamic. Here's some relevant slides from a recent presentation I did in Toronto: http://www.slideshare.net/cfregly/toronto-spark-meetup-dec-14-2015 (slide 63+) This adaptiveness will be a continual theme with Spark moving forward. For example, Spark 1.6 includes adaptive query execution, as well, including an adaptive, hybrid join that can broadcast just the hot, popular keys using BroadcastHashJoin, but use the regular ShuffleHashJoin for low-medium popular keys - within the same Job - and changing from stage to stage. On Wed, Dec 23, 2015 at 2:12 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > If you are creating a huge map on the driver, then spark.driver.memory > should be set to a higher value to hold your map. Since you are going to > broadcast this map, your spark executors must have enough memory to hold > this map as well which can be set using the spark.executor.memory, and > spark.storage.memoryFraction configurations. > > Thanks > Best Regards > > On Mon, Dec 21, 2015 at 5:50 AM, Pat Ferrel <p...@occamsmachete.com> wrote: > >> I have a large Map that is assembled in the driver and broadcast to each >> node. >> >> My question is how best to allocate memory for this. The Driver has to >> have enough memory for the Maps, but only one copy is serialized to each >> node. What type of memory should I size to match the Maps? Is the broadcast >> Map taking a little from each executor, all from every executor, or is >> there something other than driver and executor memory I can size? >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: error while defining custom schema in Spark 1.5.0
what are you trying to do, exactly? On Tue, Dec 22, 2015 at 3:03 AM, Divya Gehlot <divya.htco...@gmail.com> wrote: > Hi, > I am new bee to Apache Spark ,using CDH 5.5 Quick start VM.having spark > 1.5.0. > I working on custom schema and getting error > > import org.apache.spark.sql.hive.HiveContext >>> >>> scala> import org.apache.spark.sql.hive.orc._ >>> import org.apache.spark.sql.hive.orc._ >>> >>> scala> import org.apache.spark.sql.types.{StructType, StructField, >>> StringType, IntegerType}; >>> import org.apache.spark.sql.types.{StructType, StructField, StringType, >>> IntegerType} >>> >>> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) >>> 15/12/21 23:41:53 INFO hive.HiveContext: Initializing execution hive, >>> version 1.1.0 >>> 15/12/21 23:41:53 INFO client.ClientWrapper: Inspected Hadoop version: >>> 2.6.0-cdh5.5.0 >>> 15/12/21 23:41:53 INFO client.ClientWrapper: Loaded >>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0 >>> hiveContext: org.apache.spark.sql.hive.HiveContext = >>> org.apache.spark.sql.hive.HiveContext@214bd538 >>> >>> scala> val customSchema = StructType(Seq(StructField("year", >>> IntegerType, true),StructField("make", StringType, >>> true),StructField("model", StringType, true),StructField("comment", >>> StringType, true),StructField("blank", StringType, true))) >>> customSchema: org.apache.spark.sql.types.StructType = >>> StructType(StructField(year,IntegerType,true), >>> StructField(make,StringType,true), StructField(model,StringType,true), >>> StructField(comment,StringType,true), StructField(blank,StringType,true)) >>> >>> scala> val customSchema = (new StructType).add("year", IntegerType, >>> true).add("make", StringType, true).add("model", StringType, >>> true).add("comment", StringType, true).add("blank", StringType, true) >>> customSchema: org.apache.spark.sql.types.StructType = >>> StructType(StructField(year,IntegerType,true), >>> StructField(make,StringType,true), StructField(model,StringType,true), >>> StructField(comment,StringType,true), StructField(blank,StringType,true)) >>> >>> scala> val customSchema = StructType( StructField("year", IntegerType, >>> true) :: StructField("make", StringType, true) :: StructField("model", >>> StringType, true) :: StructField("comment", StringType, true) :: >>> StructField("blank", StringType, true)::StructField("blank", StringType, >>> true)) >>> :24: error: value :: is not a member of >>> org.apache.spark.sql.types.StructField >>>val customSchema = StructType( StructField("year", IntegerType, >>> true) :: StructField("make", StringType, true) :: StructField("model", >>> StringType, true) :: StructField("comment", StringType, true) :: >>> StructField("blank", StringType, true)::StructField("blank", StringType, >>> true)) >>> >> > Tried like like below also > > scala> val customSchema = StructType( StructField("year", IntegerType, > true), StructField("make", StringType, true) ,StructField("model", > StringType, true) , StructField("comment", StringType, true) , > StructField("blank", StringType, true),StructField("blank", StringType, > true)) > :24: error: overloaded method value apply with alternatives: > (fields: > Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType > > (fields: > java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType > > (fields: > Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType > cannot be applied to (org.apache.spark.sql.types.StructField, > org.apache.spark.sql.types.StructField, > org.apache.spark.sql.types.StructField, > org.apache.spark.sql.types.StructField, > org.apache.spark.sql.types.StructField, > org.apache.spark.sql.types.StructField) >val customSchema = StructType( StructField("year", IntegerType, > true), StructField("make", StringType, true) ,StructField("model", > StringType, true) , StructField("comment", StringType, true) , > StructField("blank", StringType, true),StructField("blank", StringType, > true)) > ^ >Would really appreciate if somebody share the example which works with > Spark 1.4 or Spark 1.5.0 > > Thanks, > Divya > > ^ > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Tips for Spark's Random Forest slow performance
so it looks like you're increasing num trees by 5x and you're seeing an 8x increase in runtime, correct? did you analyze the Spark cluster resources to monitor the memory usage, spillage, disk I/O, etc? you may need more Workers. On Tue, Dec 22, 2015 at 8:57 AM, Alexander Ratnikov < ratnikov.alexan...@gmail.com> wrote: > Hi All, > > It would be good to get some tips on tuning Apache Spark for Random > Forest classification. > Currently, we have a model that looks like: > > featureSubsetStrategy all > impurity gini > maxBins 32 > maxDepth 11 > numberOfClasses 2 > numberOfTrees 100 > > We are running Spark 1.5.1 as a standalone cluster. > > 1 Master and 2 Worker nodes. > The amount of RAM is 32GB on each node with 4 Cores. > The classification takes 440ms. > > When we increase the number of trees to 500, it takes 8 sec already. > We tried to reduce the depth but then error rate is higher. We have > around 246 attributes. > > Probably we are doing something wrong. Any ideas how we could improve > the performance ? > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Tips-for-Spark-s-Random-Forest-slow-performance-tp25766.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: java.io.FileNotFoundException(Too many open files) in Spark streaming
and which version of Spark/Spark Streaming are you using? are you explicitly setting the spark.streaming.concurrentJobs to something larger than the default of 1? if so, please try setting that back to 1 and see if the problem still exists. this is a dangerous parameter to modify from the default - which is why it's not well-documented. On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge <vijay.gha...@gmail.com> wrote: > Few indicators - > > 1) during execution time - check total number of open files using lsof > command. Need root permissions. If it is cluster not sure much ! > 2) which exact line in the code is triggering this error ? Can you paste > that snippet ? > > > On Wednesday 23 December 2015, Priya Ch <learnings.chitt...@gmail.com> > wrote: > >> ulimit -n 65000 >> >> fs.file-max = 65000 ( in etc/sysctl.conf file) >> >> Thanks, >> Padma Ch >> >> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote: >> >>> Could you share the ulimit for your setup please ? >>> >>> - Thanks, via mobile, excuse brevity. >>> On Dec 22, 2015 6:39 PM, "Priya Ch" <learnings.chitt...@gmail.com> >>> wrote: >>> >>>> Jakob, >>>> >>>>Increased the settings like fs.file-max in /etc/sysctl.conf and >>>> also increased user limit in /etc/security/limits.conf. But still see >>>> the same issue. >>>> >>>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky <joder...@gmail.com> >>>> wrote: >>>> >>>>> It might be a good idea to see how many files are open and try >>>>> increasing the open file limit (this is done on an os level). In some >>>>> application use-cases it is actually a legitimate need. >>>>> >>>>> If that doesn't help, make sure you close any unused files and streams >>>>> in your code. It will also be easier to help diagnose the issue if you >>>>> send >>>>> an error-reproducing snippet. >>>>> >>>> >>>> >> > > -- > Regards, > Vijay Gharge > > > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Fat jar can't find jdbc
JDBC Drivers need to be on the system classpath. try passing --jars /path/to/local/mysql-connector.jar when you submit the job. this will also copy the jars to each of the worker nodes and should set you straight. On Tue, Dec 22, 2015 at 11:42 AM, Igor Berman <igor.ber...@gmail.com> wrote: > imho, if you succeeded to fetch something from your mysql with same jar in > classpath, then Manifest is ok and you indeed should look at your spark sql > - jdbc configs > > On 22 December 2015 at 12:21, David Yerrington <da...@yerrington.net> > wrote: > >> Igor, I think it's available. After I extract the jar file, I see a >> directory with class files that look very relevant in "/com/mysql/jdbc". >> >> After reading this, I started to wonder if MySQL connector was really the >> problem. Perhaps it's something to do with SQLcontext? I just wired a >> test endpoint to run a very basic mysql query, outside of Spark, and it >> worked just fine (yay!). I copied and pasted this example to verify my >> MySQL connector availability, and it worked just fine: >> https://mkaz.github.io/2011/05/27/using-scala-with-jdbc-to-connect-to-mysql/ >> >> As far as the Maven manifest goes, I'm really not sure. I will research >> it though. Now I'm wondering if my mergeStrategy is to blame? I'm going >> to try there next. >> >> Thank you for the help! >> >> On Tue, Dec 22, 2015 at 1:18 AM, Igor Berman <igor.ber...@gmail.com> >> wrote: >> >>> David, can you verify that mysql connector classes indeed in your single >>> jar? >>> open it with zip tool available at your platform >>> >>> another options that might be a problem - if there is some dependency in >>> MANIFEST(not sure though this is the case of mysql connector) then it might >>> be broken after preparing single jar >>> so you need to verify that it's ok(in maven usually it's possible to >>> define merging policy for resources while creating single jar) >>> >>> On 22 December 2015 at 10:04, Vijay Kiran <m...@vijaykiran.com> wrote: >>> >>>> Can you paste your libraryDependencies from build.sbt ? >>>> >>>> ./Vijay >>>> >>>> > On 22 Dec 2015, at 06:12, David Yerrington <da...@yerrington.net> >>>> wrote: >>>> > >>>> > Hi Everyone, >>>> > >>>> > I'm building a prototype that fundamentally grabs data from a MySQL >>>> instance, crunches some numbers, and then moves it on down the pipeline. >>>> I've been using SBT with assembly tool to build a single jar for >>>> deployment. >>>> > >>>> > I've gone through the paces of stomping out many dependency problems >>>> and have come down to one last (hopefully) zinger. >>>> > >>>> > java.lang.ClassNotFoundException: Failed to load class for data >>>> source: jdbc. >>>> > >>>> > at >>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67) >>>> > >>>> > at >>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87) >>>> > >>>> > at >>>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) >>>> > >>>> > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203) >>>> > >>>> > at her.recommender.getDataframe(her.recommender.scala:45) >>>> > >>>> > at her.recommender.getRecommendations(her.recommender.scala:60) >>>> > >>>> > >>>> > I'm assuming this has to do with mysql-connector because this is the >>>> problem I run into when I'm working with spark-shell and I forget to >>>> include my classpath with my mysql-connect jar file. >>>> > >>>> > I've tried: >>>> > • Using different versions of mysql-connector-java in my >>>> build.sbt file >>>> > • Copying the connector jar to my_project/src/main/lib >>>> > • Copying the connector jar to my_project/lib <-- (this is >>>> where I keep my build.sbt) >>>> > Everything loads fine and works, except my call that does >>>> "sqlContext.load("jdbc", myOptions)". I know this is a total newbie >>>> question but in my defense, I'm fairly new to Scala, and this is my first >>>> go at deploying a fat jar with sbt-assembly. >>>> > >>>> > Thanks for any advice! >>>> > >>>> > -- >>>> > David Yerrington >>>> > yerrington.net >>>> >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >> >> >> -- >> David Yerrington >> yerrington.net >> > > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Stuck with DataFrame df.select("select * from table");
I assume by "The same code perfectly works through Zeppelin 0.5.5" that you're using the %sql interpreter with your regular SQL SELECT statement, correct? If so, the Zeppelin interpreter is converting the that follows %sql to sqlContext.sql() per the following code: https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L125 https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L140-L141 Also, keep in mind that you can do something like this if you want to stay in DataFrame land: df.selectExpr("*").limit(5).show() On Fri, Dec 25, 2015 at 12:53 PM, Eugene Morozov <evgeny.a.moro...@gmail.com > wrote: > Ted, Igor, > > Oh my... thanks a lot to both of you! > Igor was absolutely right, but I missed that I have to use sqlContext =( > > Everything's perfect. > Thank you. > > -- > Be well! > Jean Morozov > > On Fri, Dec 25, 2015 at 8:31 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> DataFrame uses different syntax from SQL query. >> I searched unit tests but didn't find any in the form of df.select("select >> ...") >> >> Looks like you should use sqlContext as other people suggested. >> >> On Fri, Dec 25, 2015 at 8:29 AM, Eugene Morozov < >> evgeny.a.moro...@gmail.com> wrote: >> >>> Thanks for the comments, although the issue is not in limit() predicate. >>> It's something with spark being unable to resolve the expression. >>> >>> I can do smth like this. It works as it suppose to: >>> df.select(df.col("*")).where(df.col("x1").equalTo(3.0)).show(5); >>> >>> But I think old fashioned sql style have to work also. I have >>> df.registeredTempTable("tmptable") and then >>> >>> df.select("select * from tmptable where x1 = '3.0'").show(); >>> >>> org.apache.spark.sql.AnalysisException: cannot resolve 'select * from >>> tmp where x1 = '1.0'' given input columns x1, x4, x5, x3, x2; >>> >>> at >>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) >>> at >>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56) >>> at >>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.sca >>> >>> >>> From the first statement I conclude that my custom datasource is >>> perfectly fine. >>> Just wonder how to fix / workaround that. >>> -- >>> Be well! >>> Jean Morozov >>> >>> On Fri, Dec 25, 2015 at 6:13 PM, Igor Berman <igor.ber...@gmail.com> >>> wrote: >>> >>>> sqlContext.sql("select * from table limit 5").show() (not sure if limit >>>> 5 supported) >>>> >>>> or use Dmitriy's solution. select() defines your projection when you've >>>> specified entire query >>>> >>>> On 25 December 2015 at 15:42, Василец Дмитрий <pronix.serv...@gmail.com >>>> > wrote: >>>> >>>>> hello >>>>> you can try to use df.limit(5).show() >>>>> just trick :) >>>>> >>>>> On Fri, Dec 25, 2015 at 2:34 PM, Eugene Morozov < >>>>> evgeny.a.moro...@gmail.com> wrote: >>>>> >>>>>> Hello, I'm basically stuck as I have no idea where to look; >>>>>> >>>>>> Following simple code, given that my Datasource is working gives me >>>>>> an exception. >>>>>> >>>>>> DataFrame df = sqlc.load(filename, >>>>>> "com.epam.parso.spark.ds.DefaultSource"); >>>>>> df.cache(); >>>>>> df.printSchema(); <-- prints the schema perfectly fine! >>>>>> >>>>>> df.show(); <-- Works perfectly fine (shows table >>>>>> with 20 lines)! >>>>>> df.registerTempTable("table"); >>>>>> df.select("select * from table limit 5").show(); <-- gives weird >>>>>> exception >>>>>> >>>>>> Exception is: >>>>>> >>>>>> AnalysisException: cannot resolve 'select * from table limit 5' given >>>>>> input columns VER, CREATED, SOC, SOCC, HLTC, HLGTC, STATUS >>>>>> >>>>>> I can do a collect on a dataframe, but cannot select any specific >>>>>> columns either "select * from table" or "select VER, CREATED from table". >>>>>> >>>>>> I use spark 1.5.2. >>>>>> The same code perfectly works through Zeppelin 0.5.5. >>>>>> >>>>>> Thanks. >>>>>> -- >>>>>> Be well! >>>>>> Jean Morozov >>>>>> >>>>> >>>>> >>>> >>> >> > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Spark SQL 1.5.2 missing JDBC driver for PostgreSQL?
Configuring JDBC drivers with Spark is a bit tricky as the JDBC driver needs to be on the Java System Classpath per this <http://spark.apache.org/docs/latest/sql-programming-guide.html#troubleshooting> troubleshooting section in the Spark SQL programming guide. Here <https://github.com/fluxcapacitor/pipeline/blob/master/bin/start-hive-thriftserver.sh> is an example hive-thrift-server start script from my Spark-based reference pipeline project. Here <https://github.com/fluxcapacitor/pipeline/blob/master/bin/pipeline-spark-sql.sh> is an example script that decorates the out-of-the-box spark-sql command to use the MySQL JDBC driver. These scripts explicitly set --jars to $SPARK_SUBMIT_JARS which is defined here <https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L144> and here <https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L87> and includes the path to the local MySQL JDBC driver. This approach is described here <http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management> in the Spark docs that describe the advanced spark-submit options. Any jar specified with --jars will be passed to each worker node in the cluster - specifically in the work directory for each SparkContext for isolation purposes. Cleanup of these jars on the worker nodes is handled by YARN automatically, and by Spark Standalone per the spark.worker.cleanup.appDataTtl config param. The Spark SQL programming guide says to use SPARK_CLASSPATH for this purpose, but I couldn't get this to work for whatever reason, so i'm sticking to the --jars approach used in my examples. On Tue, Dec 22, 2015 at 9:51 PM, Benjamin Kim <bbuil...@gmail.com> wrote: > Stephen, > > Let me confirm. I just need to propagate these settings I put in > spark-defaults.conf to all the worker nodes? Do I need to do the same with > the PostgreSQL driver jar file too? If so, is there a way to have it read > from HDFS rather than copying out to the cluster manually. > > Thanks for your help, > Ben > > > On Tuesday, December 22, 2015, Stephen Boesch <java...@gmail.com> wrote: > >> HI Benjamin, yes by adding to the thrift server then the create table >> would work. But querying is performed by the workers: so you need to add >> to the classpath of all nodes for reads to work. >> >> 2015-12-22 18:35 GMT-08:00 Benjamin Kim <bbuil...@gmail.com>: >> >>> Hi Stephen, >>> >>> I forgot to mention that I added these lines below to the >>> spark-default.conf on the node with Spark SQL Thrift JDBC/ODBC Server >>> running on it. Then, I restarted it. >>> >>> >>> spark.driver.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar >>> >>> spark.executor.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar >>> >>> I read in another thread that this would work. I was able to create the >>> table and could see it in my SHOW TABLES list. But, when I try to query the >>> table, I get the same error. It looks like I’m getting close. >>> >>> Are there any other things that I have to do that you can think of? >>> >>> Thanks, >>> Ben >>> >>> >>> On Dec 22, 2015, at 6:25 PM, Stephen Boesch <java...@gmail.com> wrote: >>> >>> The postgres jdbc driver needs to be added to the classpath of your >>> spark workers. You can do a search for how to do that (multiple ways). >>> >>> 2015-12-22 17:22 GMT-08:00 b2k70 <bbuil...@gmail.com>: >>> >>>> I see in the Spark SQL documentation that a temporary table can be >>>> created >>>> directly onto a remote PostgreSQL table. >>>> >>>> CREATE TEMPORARY TABLE >>>> USING org.apache.spark.sql.jdbc >>>> OPTIONS ( >>>> url "jdbc:postgresql:///", >>>> dbtable "impressions" >>>> ); >>>> When I run this against our PostgreSQL server, I get the following >>>> error. >>>> >>>> Error: java.sql.SQLException: No suitable driver found for >>>> jdbc:postgresql:/// >>>> (state=,code=0) >>>> >>>> Can someone help me understand why this is? >>>> >>>> Thanks, Ben >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-5-2-missing-JDBC-driver-for-PostgreSQL-tp25773.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com >>>> <http://nabble.com>. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>> >>> >> -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Getting estimates and standard error using ml.LinearRegression
try http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.RegressionEvaluator if you're using *spark.ml.LinearRegression *(which it appears you are) or http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.evaluation.RegressionMetrics if you're using *spark.mllib.LinearRegressionModel *(included for completeness) On Mon, Dec 21, 2015 at 12:47 AM, Arunkumar Pillai <arunkumar1...@gmail.com> wrote: > Hi > > I'm using ml.LinearRegession package > > > How to get estimates and standard Error for the coefficient > > PFB the code snippet > > val lr = new LinearRegression() > lr.setMaxIter(10) > .setRegParam(0.01) > .setFitIntercept(true) > val model= lr.fit(test) > val estimates = model.summary > > > -- > Thanks and Regards > Arun > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Stuck with DataFrame df.select("select * from table");
oh, and it's worth noting that - starting with Spark 1.6 - you'll be able to just do the following: SELECT * FROM json.`/path/to/json/file` (note the back ticks) instead of calling registerTempTable() for the sole purpose of using SQL. https://issues.apache.org/jira/browse/SPARK-11197 On Fri, Dec 25, 2015 at 2:17 PM, Chris Fregly <ch...@fregly.com> wrote: > I assume by "The same code perfectly works through Zeppelin 0.5.5" that > you're using the %sql interpreter with your regular SQL SELECT statement, > correct? > > If so, the Zeppelin interpreter is converting the that > follows > > %sql > > to > > sqlContext.sql() > > per the following code: > > > https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L125 > > > https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L140-L141 > > > Also, keep in mind that you can do something like this if you want to stay > in DataFrame land: > > df.selectExpr("*").limit(5).show() > > > > On Fri, Dec 25, 2015 at 12:53 PM, Eugene Morozov < > evgeny.a.moro...@gmail.com> wrote: > >> Ted, Igor, >> >> Oh my... thanks a lot to both of you! >> Igor was absolutely right, but I missed that I have to use sqlContext =( >> >> Everything's perfect. >> Thank you. >> >> -- >> Be well! >> Jean Morozov >> >> On Fri, Dec 25, 2015 at 8:31 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> DataFrame uses different syntax from SQL query. >>> I searched unit tests but didn't find any in the form of df.select("select >>> ...") >>> >>> Looks like you should use sqlContext as other people suggested. >>> >>> On Fri, Dec 25, 2015 at 8:29 AM, Eugene Morozov < >>> evgeny.a.moro...@gmail.com> wrote: >>> >>>> Thanks for the comments, although the issue is not in limit() >>>> predicate. >>>> It's something with spark being unable to resolve the expression. >>>> >>>> I can do smth like this. It works as it suppose to: >>>> df.select(df.col("*")).where(df.col("x1").equalTo(3.0)).show(5); >>>> >>>> But I think old fashioned sql style have to work also. I have >>>> df.registeredTempTable("tmptable") and then >>>> >>>> df.select("select * from tmptable where x1 = '3.0'").show(); >>>> >>>> org.apache.spark.sql.AnalysisException: cannot resolve 'select * from >>>> tmp where x1 = '1.0'' given input columns x1, x4, x5, x3, x2; >>>> >>>> at >>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) >>>> at >>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56) >>>> at >>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.sca >>>> >>>> >>>> From the first statement I conclude that my custom datasource is >>>> perfectly fine. >>>> Just wonder how to fix / workaround that. >>>> -- >>>> Be well! >>>> Jean Morozov >>>> >>>> On Fri, Dec 25, 2015 at 6:13 PM, Igor Berman <igor.ber...@gmail.com> >>>> wrote: >>>> >>>>> sqlContext.sql("select * from table limit 5").show() (not sure if >>>>> limit 5 supported) >>>>> >>>>> or use Dmitriy's solution. select() defines your projection when >>>>> you've specified entire query >>>>> >>>>> On 25 December 2015 at 15:42, Василец Дмитрий < >>>>> pronix.serv...@gmail.com> wrote: >>>>> >>>>>> hello >>>>>> you can try to use df.limit(5).show() >>>>>> just trick :) >>>>>> >>>>>> On Fri, Dec 25, 2015 at 2:34 PM, Eugene Morozov < >>>>>> evgeny.a.moro...@gmail.com> wrote: >>>>>> >>>>>>> Hello, I'm basically stuck as I have no idea where to look; >>>>>>> >>>>>>> Following simple code, given that my Datasource is working gives me >>>>>>> an exception. >>>>>>> >&
Re: Tips for Spark's Random Forest slow performance
ah, so with that much serialization happening, you might actually need *less* workers! :) in the next couple releases of Spark ML should, we should see better scoring/predicting functionality using a single node for exactly this reason. to get there, we need model.save/load support (PMML?), optimized single-node linear algebra support, and a few other goodies. useNodeIdCache only affects training. btw, are you checkpointing per this <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala#L151>? (code snippet from DecisionTreeParams copy/pasted below for convenience) /** * Specifies how often to checkpoint the cached node IDs. * E.g. 10 means that the cache will get checkpointed every 10 iterations. * This is only used if cacheNodeIds is true and if the checkpoint directory is set in * [[org.apache.spark.SparkContext]]. * Must be >= 1. * (default = 10) * @group expertSetParam */ def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) i'm not actually sure how this will affect training performance with the new ml.RandomForest impl, but i'm curious to hear what you find. On Fri, Dec 25, 2015 at 6:03 PM, Alexander Ratnikov < ratnikov.alexan...@gmail.com> wrote: > Definitely the biggest difference is the maxDepth of the trees. With > values smaller or equal to 5 the time goes into milliseconds. > The amount of trees affects the performance but not that much. > I tried to profile the app and I see decent time spent in serialization. > I'm wondering if Spark isn't somehow caching model on workers during > classification ? > > useNodeIdCache is ON but docs aren't clear if Spark is using it only > on training. > Also, I must say we didn't have this problem in the old mllib API so > it might be something in the new ml that I'm missing. > I will dig deeper into the problem after holidays. > > 2015-12-25 16:26 GMT+01:00 Chris Fregly <ch...@fregly.com>: > > so it looks like you're increasing num trees by 5x and you're seeing an > 8x > > increase in runtime, correct? > > > > did you analyze the Spark cluster resources to monitor the memory usage, > > spillage, disk I/O, etc? > > > > you may need more Workers. > > > > On Tue, Dec 22, 2015 at 8:57 AM, Alexander Ratnikov > > <ratnikov.alexan...@gmail.com> wrote: > >> > >> Hi All, > >> > >> It would be good to get some tips on tuning Apache Spark for Random > >> Forest classification. > >> Currently, we have a model that looks like: > >> > >> featureSubsetStrategy all > >> impurity gini > >> maxBins 32 > >> maxDepth 11 > >> numberOfClasses 2 > >> numberOfTrees 100 > >> > >> We are running Spark 1.5.1 as a standalone cluster. > >> > >> 1 Master and 2 Worker nodes. > >> The amount of RAM is 32GB on each node with 4 Cores. > >> The classification takes 440ms. > >> > >> When we increase the number of trees to 500, it takes 8 sec already. > >> We tried to reduce the depth but then error rate is higher. We have > >> around 246 attributes. > >> > >> Probably we are doing something wrong. Any ideas how we could improve > >> the performance ? > >> > >> > >> > >> > >> -- > >> View this message in context: > >> > http://apache-spark-user-list.1001560.n3.nabble.com/Tips-for-Spark-s-Random-Forest-slow-performance-tp25766.html > >> Sent from the Apache Spark User List mailing list archive at Nabble.com. > >> > >> - > >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> For additional commands, e-mail: user-h...@spark.apache.org > >> > > > > > > > > -- > > > > Chris Fregly > > Principal Data Solutions Engineer > > IBM Spark Technology Center, San Francisco, CA > > http://spark.tc | http://advancedspark.com > -- *Chris Fregly* Principal Data Solutions Engineer IBM Spark Technology Center, San Francisco, CA http://spark.tc | http://advancedspark.com
Re: Kafka - streaming from multiple topics
separating out your code into separate streaming jobs - especially when there are no dependencies between the jobs - is almost always the best route. it's easier to combine atoms (fusion), then split them (fission). I recommend splitting out jobs along batch window, stream window, and state-tracking characteristics. for example, imagine 3 separate jobs for the following: 1) light storing of raw data into Cassandra (500ms batch interval) 2) medium aggregations/window roll ups (2000ms batch interval) 3) heavy training a ML model (1ms batch interval). and reminder that you can control (isolate or combine) the spark resources used by these separate, single purpose streaming jobs using scheduler pools just like your batch spark jobs. @cody: curious about neelesh's question, as well. does the Kafka Direct Stream API treat each Kafka Topic Partition separately in terms of parallel retrieval? more context: within a Kafka Topic partition, Kafka guarantees order, but not total ordering across partitions. this is normal and expected. so I assume the the Kafka Direct Streaming connector can retrieve (and recover/retry) from separate partitions in parallel and still maintain the ordering guarantees offered by Kafka. if this is true, then I'd suggest @neelesh create more partitions within the Kafka Topic to improve parallelism - same as any distributed, partitioned data processing engine including spark. if this is not true, is there a technical limitation to prevent this parallelism within the connector? > On Dec 19, 2015, at 5:51 PM, Neeleshwrote: > > A related issue - When I put multiple topics in a single stream, the > processing delay is as bad as the slowest task in the number of tasks > created. Even though the topics are unrelated to each other, RDD at time "t1" > has to wait for the RDD at "t0" is fully executed, even if most cores are > idling, and just one task is still running and the rest of them have > completed. Effectively, a lightly loaded topic gets the worst deal because of > a heavily loaded topic > > Is my understanding correct? > > > >> On Thu, Dec 17, 2015 at 9:53 AM, Cody Koeninger wrote: >> You could stick them all in a single stream, and do mapPartitions, then >> switch on the topic for that partition. It's probably cleaner to do >> separate jobs, just depends on how you want to organize your code. >> >>> On Thu, Dec 17, 2015 at 11:11 AM, Jean-Pierre OCALAN >>> wrote: >>> Hi Cody, >>> >>> First of all thanks for the note about spark.streaming.concurrentJobs. I >>> guess this is why it's not mentioned in the actual spark streaming doc. >>> Since those 3 topics contain completely different data on which I need to >>> apply different kind of transformations, I am not sure joining them would >>> be really efficient, unless you know something that I don't. >>> >>> As I really don't need any interaction between those streams, I think I >>> might end up running 3 different streaming apps instead of one. >>> >>> Thanks again! >>> On Thu, Dec 17, 2015 at 11:43 AM, Cody Koeninger wrote: Using spark.streaming.concurrentJobs for this probably isn't a good idea, as it allows the next batch to start processing before current one is finished, which may have unintended consequences. Why can't you use a single stream with all the topics you care about, or multiple streams if you're e.g. joining them? > On Wed, Dec 16, 2015 at 3:00 PM, jpocalan wrote: > Nevermind, I found the answer to my questions. > The following spark configuration property will allow you to process > multiple KafkaDirectStream in parallel: > --conf spark.streaming.concurrentJobs= > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25723.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> >>> -- >>> jean-pierre ocalan >>> jpoca...@gmail.com >
Re: spark 1.5.2 memory leak? reading JSON
hey Eran, I run into this all the time with Json. the problem is likely that your Json is "too pretty" and extending beyond a single line which trips up the Json reader. my solution is usually to de-pretty the Json - either manually or through an ETL step - by stripping all white space before pointing my DataFrame/JSON reader at the file. this tool is handy for one-off scenerios: http://jsonviewer.stack.hu for streaming use cases, you'll want to have a light de-pretty ETL step either within the Spark Streaming job after ingestion - or upstream using something like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka transformation assuming those exist by now. a similar problem exists for XML, btw. there's lots of wonky workarounds for this that use MapPartitions and all kinds of craziness. the best option, in my opinion, is to just ETL/flatten the data to make the DataFrame reader happy. > On Dec 19, 2015, at 4:55 PM, Eran Witkonwrote: > > Hi, > I tried the following code in spark-shell on spark1.5.2: > > val df = > sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json") > df.count() > > 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size = > 67108864 bytes, TID = 3 > 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3) > java.lang.RuntimeException: Failed to parse a value for data type > StructType() (current token: VALUE_STRING). > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172) > at > org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251) > at > org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) > at > org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) > > Am I am doing something wrong? > Eran
Re: Hive error when starting up spark-shell in 1.5.2
hopping on a plane, but check the hive-site.xml that's in your spark/conf directory (or should be, anyway). I believe you can change the root path thru this mechanism. if not, this should give you more info google on. let me know as this comes up a fair amount. > On Dec 19, 2015, at 4:58 PM, Marco Mistroniwrote: > > HI all > posting again this as i was experiencing this error also under 1.5.1 > I am running spark 1.5.2 on a Windows 10 laptop (upgraded from Windows 8) > When i launch spark-shell i am getting this exception, presumably becaus ei > hav eno > admin right to /tmp directory on my latpop (windows 8-10 seems very > restrictive) > > java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: > /tmp/hive on HDFS should be writable. Current permissions are: - > at > org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522) > at > org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171) > at > org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162) > at > org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160) > at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:167) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:422) > at > org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028) > at $iwC$$iwC.(:9) > at $iwC.(:18) > at (:20) > at .(:24) > at .() > at .(:7) > at .() > at $print() > .. > Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on > HDFS should be writable. Current permissions are: - > at > org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:612) > at > org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554) > at > org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508) > ... 56 more > > :10: error: not found: value sqlContext >import sqlContext.implicits._ > ^ > :10: error: not found: value sqlContext >import sqlContext.sql > > I was wondering how can i configure hive to point to a different > directorywhere i have more permissions > > kr > marco > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to do map join in Spark SQL
this type of broadcast should be handled by Spark SQL/DataFrames automatically. this is the primary cost-based, physical-plan query optimization that the Spark SQL Catalyst optimizer supports. in Spark 1.5 and before, you can trigger this optimization by properly setting the spark.sql.autobroadcastThreshold to a value that is *above* the size of your smaller table when fully bloated in JVM memory (not the serialized size of the data on disk - very common mistake). in Spark 1.6+, there are heuristics to make this decision dynamically - and even allow hybrid execution where certain keys - within the same Spark job - will be broadcast and others won't depending on their relative " "hotness" for that particular job. common theme of Spark 1.6 and beyond will be adaptive physical plan execution, adaptive memory allocation to RDD Cache vs Spark Execution Engine, adaptive cluster resource allocation, etc. the goal being to minimize manual configuration and enable many diff types of workloads to run efficiently on the same Spark cluster. > On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov> wrote: > > I collected small DF to array of tuple3 > Then I registered UDF with function which is doing lookup in the array > Then I just run select which uses the UDF. > >> On Dec 18, 2015 1:06 AM, "Akhil Das" wrote: >> You can broadcast your json data and then do a map side join. This article >> is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/ >> >> Thanks >> Best Regards >> >>> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov >>> wrote: >>> I have big folder having ORC files. Files have duration field (e.g. >>> 3,12,26, etc) >>> Also I have small json file (just 8 rows) with ranges definition (min, max >>> , name) >>> 0, 10, A >>> 10, 20, B >>> 20, 30, C >>> etc >>> >>> Because I can not do equi-join btw duration and range min/max I need to do >>> cross join and apply WHERE condition to take records which belong to the >>> range >>> Cross join is an expensive operation I think that it's better if this >>> particular join done using Map Join >>> >>> How to do Map join in Spark Sql?
Re: Pyspark SQL Join Failure
how does Spark SQL/DataFrame know that train_users_2.csv has a field named, "id" or anything else domain specific? is there a header? if so, does sc.textFile() know about this header? I'd suggest using the Databricks spark-csv package for reading csv data. there is an option in there to specify whether or not a header exists and, if so, use this as the column names. one caveat: I've had some issues relying on this specific feature of the spark-csv package, so I almost always call .toDF("id", "firstname", "lastname", ...) on the created DataFrame to explicitly name the columns exactly what I want them to be. I've also had some uppercase/lowercase and "_" and "-" issues with some of these connectors including spark-csv, spark-cassandra, and a few others, so I try to stick to simple lower-case column names without special chars just to be safe. note: these issues are connector specific, so not all should be subjected to this type of pessimism. just pointing these out for completeness. > On Dec 19, 2015, at 5:30 PM, Weiwei Zhangwrote: > > Hi all, > > I got this error when I tried to use the 'join' function to left outer join > two data frames in pyspark 1.4.1. > Please kindly point out the places where I made mistakes. Thank you. > > Traceback (most recent call last): > File "/Users/wz/PycharmProjects/PysparkTraining/Airbnb/src/driver.py", line > 46, in > trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, > 'left_outer') > File > "/Users/wz/Downloads/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py", > line 701, in __getattr__ > "'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) > AttributeError: 'DataFrame' object has no attribute 'id' - It does have a > column called "id" > 15/12/19 14:15:00 INFO SparkContext: Invoking stop() from shutdown hook > > Here is the code: > from pyspark import SparkContext > from pyspark import SparkConf > from pyspark.sql import SQLContext > from pyspark.mllib.regression import LabeledPoint > from pyspark.mllib.classification import LogisticRegressionWithSGD > from pyspark.sql.functions import * > conf = SparkConf().set("spark.executor.memory", "4g") > sc = SparkContext(conf= conf) > sqlCtx = SQLContext(sc) > > train = sc.textFile("../train_users_2.csv").map(lambda line: line.split(",")) > print train.first() > trainDF = sqlCtx.createDataFrame(train) > > test = sc.textFile("../test_users.csv").map(lambda line: line.split(",")) > testDF = sqlCtx.createDataFrame(test) > > session = sc.textFile("../sessions.csv").map(lambda line: line.split(",")) > sessionDF = sqlCtx.createDataFrame(session) > > # join train with session (Error) > trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, > 'left_outer') > > Best Regards, > WZ
Re: Multiple Kinesis Streams in a single Streaming job
have you tried to union the 2 streams per the KinesisWordCountASL example https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120 where 2 streams (against the same Kinesis stream in this case) are created and union'd? it should work the same way - including union() of streams from totally different source types (kafka, kinesis, flume). On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote: What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multiple Kinesis Streams in a single Streaming job
another option (not really recommended, but worth mentioning) would be to change the region of dynamodb to be separate from the other stream - and even separate from the stream itself. this isn't available right now, but will be in Spark 1.4. On May 14, 2015, at 6:47 PM, Erich Ess er...@simplerelevance.com wrote: Hi Tathagata, I think that's exactly what's happening. The error message is: com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49550673839151225431779125105915140284622031848663416866 used in GetShardIterator on shard shardId-0002 in stream erich-test under account xxx is invalid because it did not come from this stream. I looked at the DynamoDB table and each job has single table and that table does not contain any stream identification information, only shard checkpointing data. I think the error is that when it tries to read from stream B, it's using checkpointing data for stream A and errors out. So it appears, at first glance, that currently you can't read from multiple Kinesis streams in a single job. I haven't tried this, but it might be possible for this to work if I force each stream to have different shard IDs so there is no ambiguity in the DynamoDB table; however, that's clearly not a feasible production solution. Thanks, -Erich On Thu, May 14, 2015 at 8:34 PM, Tathagata Das t...@databricks.com wrote: A possible problem may be that the kinesis stream in 1.3 uses the SparkContext app name, as the Kinesis Application Name, that is used by the Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis DStreams are using the Kinesis application name (as they are in the same StreamingContext / SparkContext / Spark app name), KCL may be doing weird overwriting checkpoint information of both Kinesis streams into the same DynamoDB table. Either ways, this is going to be fixed in Spark 1.4. On Thu, May 14, 2015 at 4:10 PM, Chris Fregly ch...@fregly.com wrote: have you tried to union the 2 streams per the KinesisWordCountASL example where 2 streams (against the same Kinesis stream in this case) are created and union'd? it should work the same way - including union() of streams from totally different source types (kafka, kinesis, flume). On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote: What is the error you are seeing? TD On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com wrote: Hi, Is it possible to setup streams from multiple Kinesis streams and process them in a single job? From what I have read, this should be possible, however, the Kinesis layer errors out whenever I try to receive from more than a single Kinesis Stream. Here is the code. Currently, I am focused on just getting receivers setup and working for the two Kinesis Streams, as such, this code just attempts to print out the contents of both streams: implicit val formats = Serialization.formats(NoTypeHints) val conf = new SparkConf().setMaster(local[*]).setAppName(test) val ssc = new StreamingContext(conf, Seconds(1)) val rawStream = KinesisUtils.createStream(ssc, erich-test, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) rawStream.map(msg = new String(msg)).print val loaderStream = KinesisUtils.createStream( ssc, dev-loader, kinesis.us-east-1.amazonaws.com, Duration(1000), InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY) val loader = loaderStream.map(msg = new String(msg)).print ssc.start() Thanks, -Erich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Erich Ess | CTO c. 310-703-6058 @SimpleRelevance | 130 E Randolph, Ste 1650 | Chicago, IL 60601 Machine Learning For Marketers Named a top startup to watch in Crain's — View the Article. SimpleRelevance.com | Facebook | Twitter | Blog
Re: Spark + Kinesis + Stream Name + Cache?
hey mike- as you pointed out here from my docs, changing the stream name is sometimes problematic due to the way the Kinesis Client Library manages leases and checkpoints, etc in DynamoDB. I noticed this directly while developing the Kinesis connector which is why I highlighted the issue here. is wiping out the DynamoDB table a suitable workaround for now? usually in production, you wouldn't be changing stream names often, so hopefully that's ok for dev. otherwise, can you share the relevant spark streaming logs that are generated when you do this? I saw a lot of lease not owned by this Kinesis Client type of errors, from what I remember. lemme know! -Chris On May 8, 2015, at 4:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: [Kinesis stream name]: The Kinesis stream that this streaming application receives from The application name used in the streaming context becomes the Kinesis application name The application name must be unique for a given account and region. The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table. On Fri, May 8, 2015 at 2:06 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I am submitting the assembled fat jar file by the command: bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class com.xxx.Consumer -0.1-SNAPSHOT.jar It reads the data file from kinesis using the stream name defined in a configuration file. It turns out that it reads the data perfectly fine for one stream name (i.e. the default), however, if I switch the stream name and re-submit the jar, it no longer reads the data. From CloudWatch, I can see that there is data put into the stream and spark is actually sending get requests as well. However, it doesn't seem to be outputting the data. Has anyone else encountered a similar issue? Does spark cache the stream name somewhere? I also have checkpointing enabled as well. Thanks, Mike.
Re: Spark Streaming S3 Performance Implications
hey mike! you'll definitely want to increase your parallelism by adding more shards to the stream - as well as spinning up 1 receiver per shard and unioning all the shards per the KinesisWordCount example that is included with the kinesis streaming package. you'll need more cores (cluster) or threads (local) to support this - equalling at least the number of shards/receivers + 1. also, it looks like you're writing to S3 per RDD. you'll want to broaden that out to write DStream batches - or expand even further and write window batches (where the window interval is a multiple of the batch interval). this goes for any spark streaming implementation - not just Kinesis. lemme know if that works for you. thanks! -Chris _ From: Mike Trienis mike.trie...@orcsol.com Sent: Wednesday, March 18, 2015 2:45 PM Subject: Spark Streaming S3 Performance Implications To: user@spark.apache.org Hi All, I am pushing data from Kinesis stream to S3 using Spark Streaming and noticed that during testing (i.e. master=local[2]) the batches (1 second intervals) were falling behind the incoming data stream at about 5-10 events / second. It seems that the rdd.saveAsTextFile(s3n://...) is taking at a few seconds to complete. val saveFunc = (rdd: RDD[String], time: Time) = { val count = rdd.count() if (count 0) { val s3BucketInterval = time.milliseconds.toString rdd.saveAsTextFile(s3n://...) } } dataStream.foreachRDD(saveFunc) Should I expect the same behaviour in a deployed cluster? Or does the rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node? Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. Thanks, Mike.
Re: Connection pool in workers
hey AKM! this is a very common problem. the streaming programming guide addresses this issue here, actually: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd the tl;dr is this: 1) you want to use foreachPartition() to operate on a whole partition versus a single record with foreachRDD() 2) you want to get/release the ConnectionPool within each worker 3) make sure you initialize the ConnectionPool first - or do it lazily upon getting the first connection. here's the sample code referenced in the link above with some additional comments for clarity: dstream.foreachRDD { rdd = // everything within here runs on the Driver rdd.foreachPartition { partitionOfRecords = // everything within here runs on the Worker and operates on a partition of records // ConnectionPool is a static, lazily initialized singleton pool of connections that runs within the Worker JVM // retrieve a connection from the pool val connection = ConnectionPool.getConnection() // perform the application logic here - parse and write to mongodb using the connection partitionOfRecords.foreach(record = connection.send(record)) // return to the pool for future reuse ConnectionPool.returnConnection(connection) } } hope that helps! -chris On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman ashrafuzzaman...@gmail.com wrote: Sorry guys may bad, Here is a high level code sample, val unionStreams = ssc.union(kinesisStreams) unionStreams.foreachRDD(rdd = { rdd.foreach(tweet = val strTweet = new String(tweet, UTF-8) val interaction = InteractionParser.parser(strTweet) interactionDAL.insert(interaction) ) }) Here I have to close the connection for interactionDAL other wise the JVM gives me error that the connection is open. I tried with sticky connection as well with keep_alive true. So my guess was that at the point of “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to workers and workers un-marshals and execute the process, which is why the connection is alway opened for each RDD. I might be completely wrong. I would love to know what is going on underneath. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Pushing data from AWS Kinesis - Spark Streaming - AWS Redshift
Hey Mike- Great to see you're using the AWS stack to its fullest! I've already created the Kinesis-Spark Streaming connector with examples, documentation, test, and everything. You'll need to build Spark from source with the -Pkinesis-asl profile, otherwise they won't be included in the build. This is due to the Amazon Software License (ASL). Here's the link to the Kinesis-Spark Streaming integration guide: http://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html. Here's a link to the source: https://github.com/apache/spark/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark Here's the original jira as well as some follow-up jiras that I'm working on now: - https://issues.apache.org/jira/browse/SPARK-1981 - https://issues.apache.org/jira/browse/SPARK-5959 - https://issues.apache.org/jira/browse/SPARK-5960 I met a lot of folks at the Strata San Jose conference a couple weeks ago who are using this Kinesis connector with good results, so you should be OK there. Regarding Redshift, the current Redshift connector only pulls data from Redshift - it doesn't not write to Redshift. And actually, it only reads files that have been UNLOADed from Redshift into S3, so it's not pulling from Redshift directly. This is confusing, I know. Here's the jira for this work: https://issues.apache.org/jira/browse/SPARK-3205 For now, you'd have to use the AWS Java SDK to write to Redshift directly from within your Spark Streaming worker as batches of data come in from the Kinesis Stream. This is roughly how your Spark Streaming app would look: dstream.foreachRDD { rdd = // everything within here runs on the Driver rdd.foreachPartition { partitionOfRecords = // everything within here runs on the Worker and operates on a partition of records // RedshiftConnectionPool is a static, lazily initialized singleton pool of connections that runs within the Worker JVM // retrieve a connection from the pool val connection = RedshiftConnectionPool.getConnection() // perform the application logic here - parse and write to Redshift using the connection partitionOfRecords.foreach(record = connection.send(record)) // return to the pool for future reuse RedshiftConnectionPool.returnConnection(connection) } } Note that you would need to write the RedshiftConnectionPool single class yourself using the AWS Java SDK as I mentioned. There is a relatively new Spark SQL DataSources API that supports reading and writing to these data sources. An example Avro implementation is here: http://spark-packages.org/package/databricks/spark-avro, although I think that's a read-only impl, but you get the idea. I've created 2 jiras for creating libraries for DynamoDB and Redshift that implement this DataSources API. Here are the jiras: - https://issues.apache.org/jira/browse/SPARK-6101 (DynamoDB) - https://issues.apache.org/jira/browse/SPARK-6102 (Redshift) Perhaps you can take a stab at SPARK-6102? I should be done with the DynamoDB impl in the next few weeks - scheduled for Spark 1.4.0. Hope that helps! :) -Chris On Sun, Mar 1, 2015 at 11:06 AM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I am looking at integrating a data stream from AWS Kinesis to AWS Redshift and since I am already ingesting the data through Spark Streaming, it seems convenient to also push that data to AWS Redshift at the same time. I have taken a look at the AWS kinesis connector although I am not sure it was designed to integrate with Apache Spark. It seems more like a standalone approach: - https://github.com/awslabs/amazon-kinesis-connectors There is also a Spark redshift integration library, however, it looks like it was intended for pulling data rather than pushing data to AWS Redshift: - https://github.com/databricks/spark-redshift I finally took a look at a general Scala library that integrates with AWS Redshift: - http://scalikejdbc.org/ Does anyone have any experience pushing data from Spark Streaming to AWS Redshift? Does it make sense conceptually, or does it make more sense to push data from AWS Kinesis to AWS Redshift VIA another standalone approach such as the AWS Kinesis connectors. Thanks, Mike.
Re: Out of memory with Spark Streaming
curious about why you're only seeing 50 records max per batch. how many receivers are you running? what is the rate that you're putting data onto the stream? per the default AWS kinesis configuration, the producer can do 1000 PUTs per second with max 50k bytes per PUT and max 1mb per second per shard. on the consumer side, you can only do 5 GETs per second and 2mb per second per shard. my hunch is that the 5 GETs per second is what's limiting your consumption rate. can you verify that these numbers match what you're seeing? if so, you may want to increase your shards and therefore the number of kinesis receivers. otherwise, this may require some further investigation on my part. i wanna stay on top of this if it's an issue. thanks for posting this, aniket! -chris On Fri, Sep 12, 2014 at 5:34 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Hi all Sorry but this was totally my mistake. In my persistence logic, I was creating async http client instance in RDD foreach but was never closing it leading to memory leaks. Apologies for wasting everyone's time. Thanks, Aniket On 12 September 2014 02:20, Tathagata Das tathagata.das1...@gmail.com wrote: Which version of spark are you running? If you are running the latest one, then could try running not a window but a simple event count on every 2 second batch, and see if you are still running out of memory? TD On Thu, Sep 11, 2014 at 10:34 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I did change it to be 1 gb. It still ran out of memory but a little later. The streaming job isnt handling a lot of data. In every 2 seconds, it doesn't get more than 50 records. Each record size is not more than 500 bytes. On Sep 11, 2014 10:54 PM, Bharat Venkat bvenkat.sp...@gmail.com wrote: You could set spark.executor.memory to something bigger than the default (512mb) On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am running a simple Spark Streaming program that pulls in data from Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps data and persists to a store. The program is running in local mode right now and runs out of memory after a while. I am yet to investigate heap dumps but I think Spark isn't releasing memory after processing is complete. I have even tried changing storage level to disk only. Help! Thanks, Aniket
Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?
jira created with comments/references to this discussion: https://issues.apache.org/jira/browse/SPARK-4144 On Tue, Aug 19, 2014 at 4:47 PM, Xiangrui Meng men...@gmail.com wrote: No. Please create one but it won't be able to catch the v1.1 train. -Xiangrui On Tue, Aug 19, 2014 at 4:22 PM, Chris Fregly ch...@fregly.com wrote: this would be awesome. did a jira get created for this? I searched, but didn't find one. thanks! -chris On Tue, Jul 8, 2014 at 1:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Thanks a lot Xiangrui. This will help. On Wed, Jul 9, 2014 at 1:34 AM, Xiangrui Meng men...@gmail.com wrote: Hi Rahul, We plan to add online model updates with Spark Streaming, perhaps in v1.1, starting with linear methods. Please open a JIRA for Naive Bayes. For Naive Bayes, we need to update the priors and conditional probabilities, which means we should also remember the number of observations for the updates. Best, Xiangrui On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, I am using the MLlib Naive Bayes for a text classification problem. I have very less amount of training data. And then the data will be coming continuously and I need to classify it as either A or B. I am training the MLlib Naive Bayes model using the training data but next time when data comes, I want to predict its class and then incorporate that also in the model for next time prediction of new data(I think that is obvious). So I am not able to figure out what is the way to do that using MLlib Naive Bayes. Is it that I have to train the model on the whole data every time new data comes in?? Thanks in Advance! -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Shared variable in Spark Streaming
good question, soumitra. it's a bit confusing. to break TD's code down a bit: dstream.count() is a transformation operation (returns a new DStream), executes lazily, runs in the cluster on the underlying RDDs that come through in that batch, and returns a new DStream with a single element representing the count of the underlying RDDs in each batch. dstream.foreachRDD() is an output/action operation (returns something other than a DStream - nothing in this case), triggers the lazy execution above, returns the results to the driver, and increments the globalCount locally in the driver. per your specific question, RDD.count() is different in that it's an output/action operation that materializes the RDD and collects the count of elements in the RDD locally in the driver. confusing, indeed. accumulators updated in parallel on the worker nodes across the cluster and are read locally in the driver. On Fri, Aug 8, 2014 at 7:36 AM, Soumitra Kumar kumar.soumi...@gmail.com wrote: I want to keep track of the events processed in a batch. How come 'globalCount' work for DStream? I think similar construct won't work for RDD, that's why there is accumulator. On Fri, Aug 8, 2014 at 12:52 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you mean that you want a continuously updated count as more events/records are received in the DStream (remember, DStream is a continuous stream of data)? Assuming that is what you want, you can use a global counter var globalCount = 0L dstream.count().foreachRDD(rdd = { globalCount += rdd.first() } ) This globalCount variable will reside in the driver and will keep being updated after every batch. TD On Thu, Aug 7, 2014 at 10:16 PM, Soumitra Kumar kumar.soumi...@gmail.com wrote: Hello, I want to count the number of elements in the DStream, like RDD.count() . Since there is no such method in DStream, I thought of using DStream.count and use the accumulator. How do I do DStream.count() to count the number of elements in a DStream? How do I create a shared variable in Spark Streaming? -Soumitra.
Re: spark RDD join Error
specifically, you're picking up the following implicit: import org.apache.spark.SparkContext.rddToPairRDDFunctions (in case you're a wildcard-phobe like me) On Thu, Sep 4, 2014 at 5:15 PM, Veeranagouda Mukkanagoudar veera...@gmail.com wrote: Thanks a lot, that fixed the issue :) On Thu, Sep 4, 2014 at 4:51 PM, Zhan Zhang zzh...@hortonworks.com wrote: Try this: Import org.apache.spark.SparkContext._ Thanks. Zhan Zhang On Sep 4, 2014, at 4:36 PM, Veeranagouda Mukkanagoudar veera...@gmail.com wrote: I am planning to use RDD join operation, to test out i was trying to compile some test code, but am getting following compilation error *value join is not a member of org.apache.spark.rdd.RDD[(String, Int)]* *[error] rddA.join(rddB).map { case (k, (a, b)) = (k, a+b) }* Code: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD def joinTest(rddA: RDD[(String, Int)], rddB: RDD[(String, Int)]) : RDD[(String, Int)] = { rddA.join(rddB).map { case (k, (a, b)) = (k, a+b) } } Any help would be great . Veera CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
Re: saveAsSequenceFile for DStream
couple things to add here: 1) you can import the org.apache.spark.streaming.dstream.PairDStreamFunctions implicit which adds a whole ton of functionality to DStream itself. this lets you work at the DStream level versus digging into the underlying RDDs. 2) you can use ssc.fileStream(directory) to create an input stream made up of files in a given directory. new files will be added to the stream as they appear in that directory. note: files must be immutable. On Tue, Jul 22, 2014 at 8:39 AM, Barnaby Falls bfa...@outlook.com wrote: Thanks Sean! I got that working last night similar to how you solved it. Any ideas about how to monitor that same folder in another script by creating a stream? I can use sc.sequenceFile() to read in the RDD, but how do I get the name of the file that got added since there is no sequenceFileStream() method? Thanks again for your help. On Jul 22, 2014, at 1:57, Sean Owen so...@cloudera.com wrote: What about simply: dstream.foreachRDD(_.saveAsSequenceFile(...)) ? On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote: First of all, I do not know Scala, but learning. I'm doing a proof of concept by streaming content from a socket, counting the words and write it to a Tachyon disk. A different script will read the file stream and print out the results. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts) ssc.start() ssc.awaitTermination() I already did a proof of concept to write and read sequence files but there doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the best way to write out an RDD to a stream so that the timestamps are in the filenames and so there is minimal overhead in reading the data back in as objects, see below. My simple successful proof was the following: val rdd = sc.parallelize(Array((a,2), (b,3), (c,1))) rdd.saveAsSequenceFile(tachyon://.../123.sf2) val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2) How can I do something similar with streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: data locality
you can view the Locality Level of each task within a stage by using the Spark Web UI under the Stages tab. levels are as follows (in order of decreasing desirability): 1) PROCESS_LOCAL - data was found directly in the executor JVM 2) NODE_LOCAL - data was found on the same node as the executor JVM 3) RACK_LOCAL - data was found in the same rack 4) ANY - outside the rack also, the Aggregated Metrics by Executor section of the Stage detail view shows how much data is being shuffled across the network (Shuffle Read/Write). 0 is where you wanna be with that metric. -chris On Fri, Jul 25, 2014 at 4:13 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, In the standalone mode, how can we check data locality is working as expected when tasks are assigned? Thanks! On 23 Jul, 2014, at 12:49 am, Sandy Ryza sandy.r...@cloudera.com wrote: On standalone there is still special handling for assigning tasks within executors. There just isn't special handling for where to place executors, because standalone generally places an executor on every node. On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang hw...@qilinsoft.com wrote: Sandy, I just tried the standalone cluster and didn't have chance to try Yarn yet. So if I understand correctly, there are **no** special handling of task assignment according to the HDFS block's location when Spark is running as a **standalone** cluster. Please correct me if I'm wrong. Thank you for your patience! -- *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com] *Sent:* 2014年7月22日 9:47 *To:* user@spark.apache.org *Subject:* Re: data locality This currently only works for YARN. The standalone default is to place an executor on every node for every job. The total number of executors is specified by the user. -Sandy On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote: Sandy, Do you mean the “preferred location” is working for standalone cluster also? Because I check the code of SparkContext and see comments as below: // This is used only by YARN for now, but should be relevant to other cluster types (*Mesos*, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from *hostname* to a list of input format splits on the host. *private*[spark] *var* preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() BTW, even with the preferred hosts, how does Spark decide how many total executors to use for this application? Thanks again! -- *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com] *Sent:* Friday, July 18, 2014 3:44 PM *To:* user@spark.apache.org *Subject:* Re: data locality Hi Haopu, Spark will ask HDFS for file block locations and try to assign tasks based on these. There is a snag. Spark schedules its tasks inside of executor processes that stick around for the lifetime of a Spark application. Spark requests executors before it runs any jobs, i.e. before it has any information about where the input data for the jobs is located. If the executors occupy significantly fewer nodes than exist in the cluster, it can be difficult for Spark to achieve data locality. The workaround for this is an API that allows passing in a set of preferred locations when instantiating a Spark context. This API is currently broken in Spark 1.0, and will likely changed to be something a little simpler in a future release. val locData = InputFormatInfo.computePreferredLocations (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”))) val sc = new SparkContext(conf, locData) -Sandy On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote: I have a standalone spark cluster and a HDFS cluster which share some of nodes. When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS the location for each file block in order to get a right worker node? How about a spark cluster on Yarn? Thank you very much!
Re: Low Level Kafka Consumer for Spark
@bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area as i hear these questions asked a lot in my talks and such. and i think a clear, crisp story on scaling and fault-tolerance will help Spark Streaming's adoption. hope that helps! -chris On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we should be able to access it once more and not having to pool it back again from Kafka or any other stream that is being affected by this issue. If for example there is a big amount of batches to be recomputed I would rather have them done distributed than overloading the batch interval with huge amount of Kafka messages. I do not have yet enough know how on where is the issue and about the internal Spark code so I can't really how much difficult will be the implementation. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kinesis receiver spark streaming partition
great question, wei. this is very important to understand from a performance perspective. and this extends is beyond kinesis - it's for any streaming source that supports shards/partitions. i need to do a little research into the internals to confirm my theory. lemme get back to you! -chris On Tue, Aug 26, 2014 at 11:37 AM, Wei Liu wei@stellarloyalty.com wrote: We are exploring using Kinesis and spark streaming together. I took at a look at the kinesis receiver code in 1.1.0. I have a question regarding kinesis partition spark streaming partition. It seems to be pretty difficult to align these partitions. Kinesis partitions a stream of data into shards, if we follow the example, we will have multiple kinesis receivers reading from the same stream in spark streaming. It seems like kinesis workers will coordinate among themselves and assign shards to themselves dynamically. For a particular shard, it can be consumed by different kinesis workers (thus different spark workers) dynamically (not at the same time). Blocks are generated based on time intervals, RDD are created based on blocks. RDDs are partitioned based on blocks. At the end, the data for a given shard will be spread into multiple blocks (possible located on different spark worker nodes). We will probably need to group these data again for a given shard and shuffle data around to achieve the same partition we had in Kinesis. Is there a better way to achieve this to avoid data reshuffling? Thanks, Wei
Re: Parsing Json object definition spanning multiple lines
i've seen this done using mapPartitions() where each partition represents a single, multi-line json file. you can rip through each partition (json file) and parse the json doc as a whole. this assumes you use sc.textFile(path/*.json) or equivalent to load in multiple files at once. each json file will be a partition. not sure if this satisfies your use case, but might be a good starting point. -chris On Mon, Jul 14, 2014 at 2:55 PM, SK skrishna...@gmail.com wrote: Hi, I have a json file where the definition of each object spans multiple lines. An example of one object definition appears below. { name: 16287e9cdf, width: 500, height: 325, width: 1024, height: 665, obj: [ { x: 395.08, y: 82.09, w: 185.48677, h: 185.48677, min: 50, max: 59, attr1: 2, attr2: 68, attr3: 8 }, { x: 519.1, y: 225.8, w: 170, h: 171, min: 20, max: 29, attr1: 7, attr2: 93, attr3: 10 } ] } I used the following Spark code to parse the file. However, the parsing is failing because I think it expects one Json object definition per line. I can try to preprocess the input file to remove the new lines, but I would like to know if it is possible to parse a Json object definition that spans multiple lines, directly in Spark. val inp = sc.textFile(args(0)) val res = inp.map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val image = (json \ name).extract[String] } ) Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-Json-object-definition-spanning-multiple-lines-tp9659.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark-Streaming collect/take functionality.
good suggestion, td. and i believe the optimization that jon.burns is referring to - from the big data mini course - is a step earlier: the sorting mechanism that produces sortedCounts. you can use mapPartitions() to get a top k locally on each partition, then shuffle only (k * # of partitions) elements to the driver for sorting - versus shuffling the whole dataset from all partitions. network IO saving technique. On Tue, Jul 15, 2014 at 9:41 AM, jon.burns jon.bu...@uleth.ca wrote: It works perfect, thanks!. I feel like I should have figured that out, I'll chalk it up to inexperience with Scala. Thanks again. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670p9772.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Low Level Kafka Consumer for Spark
great work, Dibyendu. looks like this would be a popular contribution. expanding on bharat's question a bit: what happens if you submit multiple receivers to the cluster by creating and unioning multiple DStreams as in the kinesis example here: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123 for more context, the kinesis implementation above uses the Kinesis Client Library (KCL) to automatically assign - and load balance - stream shards among all KCL threads from all receivers (potentially coming and going as nodes die) on all executors/nodes using DynamoDB as the association data store. ZooKeeper would be used for your Kafka consumers, of course. and ZooKeeper watches to handle the ephemeral nodes. and I see you're using Curator, which makes things easier. as bharat suggested, running multiple receivers/dstreams may be desirable from a scalability and fault tolerance standpoint. is this type of load balancing possible among your different Kafka consumers running in different ephemeral JVMs? and isn't it fun proposing a popular piece of code? the question floodgates have opened! haha. :) -chris On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi Bharat, Thanks for your email. If the Kafka Reader worker process dies, it will be replaced by different machine, and it will start consuming from the offset where it left over ( for each partition). Same case can happen even if I tried to have individual Receiver for every partition. Regards, Dibyendu On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Yarn: Connecting to Existing Instance
perhaps the author is referring to Spark Streaming applications? they're examples of long-running applications. the application/domain-level protocol still needs to be implemented yourself, as sandy pointed out. On Wed, Jul 9, 2014 at 11:03 AM, John Omernik j...@omernik.com wrote: So how do I do the long-lived server continually satisfying requests in the Cloudera application? I am very confused by that at this point. On Wed, Jul 9, 2014 at 12:49 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Spark doesn't currently offer you anything special to do this. I.e. if you want to write a Spark application that fires off jobs on behalf of remote processes, you would need to implement the communication between those remote processes and your Spark application code yourself. On Wed, Jul 9, 2014 at 10:41 AM, John Omernik j...@omernik.com wrote: Thank you for the link. In that link the following is written: For those familiar with the Spark API, an application corresponds to an instance of the SparkContext class. An application can be used for a single batch job, an interactive session with multiple jobs spaced apart, or a long-lived server continually satisfying requests So, if I wanted to use a long-lived server continually satisfying requests and then start a shell that connected to that context, how would I do that in Yarn? That's the problem I am having right now, I just want there to be that long lived service that I can utilize. Thanks! On Wed, Jul 9, 2014 at 11:14 AM, Sandy Ryza sandy.r...@cloudera.com wrote: To add to Ron's answer, this post explains what it means to run Spark against a YARN cluster, the difference between yarn-client and yarn-cluster mode, and the reason spark-shell only works in yarn-client mode. http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/ -Sandy On Wed, Jul 9, 2014 at 9:09 AM, Ron Gonzalez zlgonza...@yahoo.com wrote: The idea behind YARN is that you can run different application types like MapReduce, Storm and Spark. I would recommend that you build your spark jobs in the main method without specifying how you deploy it. Then you can use spark-submit to tell Spark how you would want to deploy to it using yarn-cluster as the master. The key point here is that once you have YARN setup, the spark client connects to it using the $HADOOP_CONF_DIR that contains the resource manager address. In particular, this needs to be accessible from the classpath of the submitter since it implicitly uses this when it instantiates a YarnConfiguration instance. If you want more details, read org.apache.spark.deploy.yarn.Client.scala. You should be able to download a standalone YARN cluster from any of the Hadoop providers like Cloudera or Hortonworks. Once you have that, the spark programming guide describes what I mention above in sufficient detail for you to proceed. Thanks, Ron Sent from my iPad On Jul 9, 2014, at 8:31 AM, John Omernik j...@omernik.com wrote: I am trying to get my head around using Spark on Yarn from a perspective of a cluster. I can start a Spark Shell no issues in Yarn. Works easily. This is done in yarn-client mode and it all works well. In multiple examples, I see instances where people have setup Spark Clusters in Stand Alone mode, and then in the examples they connect to this cluster in Stand Alone mode. This is done often times using the spark:// string for connection. Cool. s But what I don't understand is how do I setup a Yarn instance that I can connect to? I.e. I tried running Spark Shell in yarn-cluster mode and it gave me an error, telling me to use yarn-client. I see information on using spark-class or spark-submit. But what I'd really like is a instance I can connect a spark-shell too, and have the instance stay up. I'd like to be able run other things on that instance etc. Is that possible with Yarn? I know there may be long running job challenges with Yarn, but I am just testing, I am just curious if I am looking at something completely bonkers here, or just missing something simple. Thanks!
Re: Spark Streaming - What does Spark Streaming checkpoint?
The StreamingContext can be recreated from a checkpoint file, indeed. check out the following Spark Streaming source files for details: StreamingContext, Checkpoint, DStream, DStreamCheckpoint, and DStreamGraph. On Wed, Jul 9, 2014 at 6:11 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, I am a little confusing by the checkpointing in Spark Streaming. It checkpoints the intermediate data for the stateful operations for sure. Does it also checkpoint the information of StreamingContext? Because it seems we can recreate the SC from the checkpoint in a driver node failure scenario. When I looked at the checkpoint directory, did not find much clue. Any help? Thank you very much. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Re: Task's Scheduler Delay in web ui
Scheduling Delay is the time required to assign a task to an available resource. if you're seeing large scheduler delays, this likely means that other jobs/tasks are using up all of the resources. here's some more info on how to setup Fair Scheduling versus the default FIFO Scheduler: https://spark.apache.org/docs/latest/job-scheduling.html of course, increasing the cluster size would help assuming resources are being allocated fairly. also, delays can vary depending on the cluster resource manager that you're using (spark standalone, yarn, mesos). -chris On Tue, Jul 8, 2014 at 4:14 AM, haopu hw...@qilinsoft.com wrote: What's the meaning of a Task's Scheduler Delay in the web ui? And what could cause that delay? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Task-s-Scheduler-Delay-in-web-ui-tp9019.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?
this would be awesome. did a jira get created for this? I searched, but didn't find one. thanks! -chris On Tue, Jul 8, 2014 at 1:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Thanks a lot Xiangrui. This will help. On Wed, Jul 9, 2014 at 1:34 AM, Xiangrui Meng men...@gmail.com wrote: Hi Rahul, We plan to add online model updates with Spark Streaming, perhaps in v1.1, starting with linear methods. Please open a JIRA for Naive Bayes. For Naive Bayes, we need to update the priors and conditional probabilities, which means we should also remember the number of observations for the updates. Best, Xiangrui On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, I am using the MLlib Naive Bayes for a text classification problem. I have very less amount of training data. And then the data will be coming continuously and I need to classify it as either A or B. I am training the MLlib Naive Bayes model using the training data but next time when data comes, I want to predict its class and then incorporate that also in the model for next time prediction of new data(I think that is obvious). So I am not able to figure out what is the way to do that using MLlib Naive Bayes. Is it that I have to train the model on the whole data every time new data comes in?? Thanks in Advance! -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: slower worker node in the cluster
perhaps creating Fair Scheduler Pools might help? there's no way to pin certain nodes to a pool, but you can specify minShares (cpu's). not sure if that would help, but worth looking in to. On Tue, Jul 8, 2014 at 7:37 PM, haopu hw...@qilinsoft.com wrote: In a standalone cluster, is there way to specify the stage to be running on a faster worker? That stage is reading HDFS file and then doing some filter operations. The tasks are assigned to the slower worker also, but the slower worker delays to launch because it's running some tasks from other stages. So I think it may be better to assign stage to a worker. Any suggestions? And will the cluster on Yarn help? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/slower-worker-node-in-the-cluster-tp9125.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming source from Amazon Kinesis
i took this over from parviz. i recently submitted a new PR for Kinesis Spark Streaming support: https://github.com/apache/spark/pull/1434 others have tested it with good success, so give it a whirl! waiting for it to be reviewed/merged. please put any feedback into the PR directly. thanks! -chris On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia matei.zaha...@gmail.com wrote: No worries, looking forward to it! Matei On Apr 21, 2014, at 1:59 PM, Parviz Deyhim pdey...@gmail.com wrote: sorry Matei. Will definitely start working on making the changes soon :) On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia matei.zaha...@gmail.com wrote: There was a patch posted a few weeks ago ( https://github.com/apache/spark/pull/223), but it needs a few changes in packaging because it uses a license that isn’t fully compatible with Apache. I’d like to get this merged when the changes are made though — it would be a good input source to support. Matei On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm looking to start experimenting with Spark Streaming, and I'd like to use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source. Looking at the list of supported Spark Streaming sources http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking, I don't see any mention of Kinesis. Is it possible to use Spark Streaming with Amazon Kinesis? If not, are there plans to add such support in the future? Nick -- View this message in context: Spark Streaming source from Amazon Kinesis http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com http://nabble.com/.
Re: multiple passes in mapPartitions
also, multiple calls to mapPartitions() will be pipelined by the spark execution engine into a single stage, so the overhead is minimal. On Fri, Jun 13, 2014 at 9:28 PM, zhen z...@latrobe.edu.au wrote: Thank you for your suggestion. We will try it out and see how it performs. We think the single call to mapPartitions will be faster but we could be wrong. It would be nice to have a clone method on the iterator. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555p7616.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Fw: How Spark Choose Worker Nodes for respective HDFS block
yes, spark attempts to achieve data locality (PROCESS_LOCAL or NODE_LOCAL) where possible just like MapReduce. it's a best practice to co-locate your Spark Workers on the same nodes as your HDFS Name Nodes for just this reason. this is achieved through the RDD.preferredLocations() interface method: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD on a related note, you can configure spark.locality.wait as the number of millis to wait before falling back to a less-local data node (RACK_LOCAL): http://spark.apache.org/docs/latest/configuration.html -chris On Fri, Jun 13, 2014 at 11:06 PM, anishs...@yahoo.co.in anishs...@yahoo.co.in wrote: Hi All Is there any communication between Spark MASTER node and Hadoop NameNode while distributing work to WORKER nodes, like we have in MapReduce. Please suggest TIA -- Anish Sneh Experience is the best teacher. http://in.linkedin.com/in/anishsneh -- * From: * anishs...@yahoo.co.in anishs...@yahoo.co.in; * To: * u...@spark.incubator.apache.org u...@spark.incubator.apache.org; * Subject: * How Spark Choose Worker Nodes for respective HDFS block * Sent: * Fri, Jun 13, 2014 9:17:50 PM Hi All I am new to Spark, workin on 3 node test cluster. I am trying to explore Spark scope in analytics, my Spark codes interacts with HDFS mostly. I have a confusion that how Spark choose on which node it will distribute its work. Since we assume that it can be an alternative to Hadoop MapReduce. In MapReduce we know that internally framework will distribute code (or logic) to the nearest TaskTracker which are co-located with DataNode or in same rack or probably nearest to the data blocks. My confusion is when I give HDFS path inside a Spark program how it choose which node is nearest (if it does). If it does not then how it will work when I have TBs of data where high network latency will be involved. My apologies for asking basic question, please suggest. TIA -- Anish Sneh Experience is the best teacher. http://www.anishsneh.com
Re: Selecting first ten values in a RDD/partition
as brian g alluded to earlier, you can use DStream.mapPartitions() to return the partition-local top 10 for each partition. once you collect the results from all the partitions, you can do a global top 10 merge sort across all partitions. this leads to a much much-smaller dataset to be shuffled back to the driver to calculate the global top 10. On Fri, May 30, 2014 at 5:05 AM, nilmish nilmish@gmail.com wrote: My primary goal : To get top 10 hashtag for every 5 mins interval. I want to do this efficiently. I have already done this by using reducebykeyandwindow() and then sorting all hashtag in 5 mins interval taking only top 10 elements. But this is very slow. So I now I am thinking of retaining only top 10 hashtags in each RDD because these only could come in the final answer. I am stuck at : how to retain only top 10 hashtag in each RDD of my DSTREAM ? Basically I need to transform my DTREAM in which each RDD contains only top 10 hashtags so that number of hashtags in 5 mins interval is low. If there is some more efficient way of doing this then please let me know that also. Thanx, Nilesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517p6577.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reconnect to an application/RDD
Tachyon is another option - this is the off heap StorageLevel specified when persisting RDDs: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.storage.StorageLevel or just use HDFS. this requires subsequent Applications/SparkContext's to reload the data from disk, of course. On Tue, Jun 3, 2014 at 6:58 AM, Gerard Maas gerard.m...@gmail.com wrote: I don't think that's supported by default as when the standalone context will close, the related RDDs will be GC'ed You should explore Spark-Job Server, which allows to cache RDDs by name and reuse them within a context. https://github.com/ooyala/spark-jobserver -kr, Gerard. On Tue, Jun 3, 2014 at 3:45 PM, Oleg Proudnikov oleg.proudni...@gmail.com wrote: HI All, Is it possible to run a standalone app that would compute and persist/cache an RDD and then run other standalone apps that would gain access to that RDD? -- Thank you, Oleg
Re: spark streaming question
great questions, weide. in addition, i'd also like to hear more about how to horizontally scale a spark-streaming cluster. i've gone through the samples (standalone mode) and read the documentation, but it's still not clear to me how to scale this puppy out under high load. i assume i add more receivers (kinesis, flume, etc), but physically how does this work? @TD: can you comment? thanks! -chris On Sun, May 4, 2014 at 2:10 PM, Weide Zhang weo...@gmail.com wrote: Hi , It might be a very general question to ask here but I'm curious to know why spark streaming can achieve better throughput than storm as claimed in the spark streaming paper. Does it depend on certain use cases and/or data source ? What drives better performance in spark streaming case or in other ways, what makes storm not as performant as spark streaming ? Also, in order to guarantee exact-once semantics when node failure happens, spark makes replicas of RDDs and checkpoints so that data can be recomputed on the fly while on Trident case, they use transactional object to persist the state and result but it's not obvious to me which approach is more costly and why ? Any one can provide some experience here ? Thanks a lot, Weide
Re: Multiple Streams with Spark Streaming
if you want to use true Spark Streaming (not the same as Hadoop Streaming/Piping, as Mayur pointed out), you can use the DStream.union() method as described in the following docs: http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html our friend, diana carroll, from cloudera recently posted a nice little utility for sending files to a Spark Streaming Receiver to simulate a streaming scenario from disk. here's the link to her post: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tc3431.html -chris On Thu, May 1, 2014 at 3:09 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: File as a stream? I think you are confusing Spark Streaming with buffer reader. Spark streaming is meant to process batches of data (files, packets, messages) as they come in, infact utilizing time of packet reception as a way to create windows etc. In your case you are better off reading the file, partitioning it operating on each column individually if that makes more sense to you. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, May 1, 2014 at 3:24 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote: Hi all, Is it possible to read and process multiple streams with spark. I have eeg(brain waves) csv file with 23 columns Each column is one stream(wave) and each column has one million values. I know one way to do it is to take transpose of the file and then give it to spark and each mapper will get one or more waves out of the 23 waves, but then it will be non-streaming problem and I want to read the file as stream. Please correct me if I am wrong. I have to apply simple operations(mean and SD) on each window of a wave. Regards, Laeeq
Re: Reading multiple S3 objects, transforming, writing back one
not sure if this directly addresses your issue, peter, but it's worth mentioned a handy AWS EMR utility called s3distcp that can upload a single HDFS file - in parallel - to a single, concatenated S3 file once all the partitions are uploaded. kinda cool. here's some info: http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html s3distcp is an extension of the familiar hadoop distcp, of course. On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: The fastest way to save to S3 should be to leave the RDD with many partitions, because all partitions will be written out in parallel. Then, once the various parts are in S3, somehow concatenate the files together into one file. If this can be done within S3 (I don't know if this is possible), then you get the best of both worlds: a highly parallelized write to S3, and a single cleanly named output file. On Thu, May 1, 2014 at 12:52 PM, Peter thenephili...@yahoo.com wrote: Thank you Patrick. I took a quick stab at it: val s3Client = new AmazonS3Client(...) val copyObjectResult = s3Client.copyObject(upload, outputPrefix + /part-0, rolled-up-logs, 2014-04-28.csv) val objectListing = s3Client.listObjects(upload, outputPrefix) s3Client.deleteObjects(new DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s = new KeyVersion(s.getKey)).asJava)) Using a 3GB object I achieved about 33MB/s between buckets in the same AZ. This is a workable solution for the short term but not ideal for the longer term as data size increases. I understand it's a limitation of the Hadoop API but ultimately it must be possible to dump a RDD to a single S3 object :) On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell pwend...@gmail.com wrote: This is a consequence of the way the Hadoop files API works. However, you can (fairly easily) add code to just rename the file because it will always produce the same filename. (heavy use of pseudo code) dir = /some/dir rdd.coalesce(1).saveAsTextFile(dir) f = new File(dir + part-0) f.moveTo(somewhere else) dir.remove() It might be cool to add a utility called `saveAsSingleFile` or something that does this for you. In fact probably we should have called saveAsTextfile saveAsTextFiles to make it more clear... On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote: Thanks Nicholas, this is a bit of a shame, not very practical for log roll up for example when every output needs to be in it's own directory. On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Yes, saveAsTextFile() will give you 1 part per RDD partition. When you coalesce(1), you move everything in the RDD to a single partition, which then gives you 1 output file. It will still be called part-0 or something like that because that's defined by the Hadoop API that Spark uses for reading to/writing from S3. I don't know of a way to change that. On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote: Ah, looks like RDD.coalesce(1) solves one part of the problem. On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote: Hi Playing around with Spark S3, I'm opening multiple objects (CSV files) with: val hfile = sc.textFile(s3n://bucket/2014-04-28/) so hfile is a RDD representing 10 objects that were underneath 2014-04-28. After I've sorted and otherwise transformed the content, I'm trying to write it back to a single object: sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv) unfortunately this results in a folder named concatted.csv with 10 objects underneath, part-0 .. part-00010, corresponding to the 10 original objects loaded. How can I achieve the desired behaviour of putting a single object named concatted.csv ? I've tried 0.9.1 and 1.0.0-RC3. Thanks! Peter
Re: help me
as Mayur indicated, it's odd that you are seeing better performance from a less-local configuration. however, the non-deterministic behavior that you describe is likely caused by GC pauses in your JVM process. take note of the *spark.locality.wait* configuration parameter described here: http://spark.apache.org/docs/latest/configuration.html this is the amount of time the Spark execution engine waits before launching a new task on a less-data-local node (ie. process - node - rack). by default, this is 3 seconds. if there is excessive GC occurring on the original process-local JVM, it is possible that another node-local JVM process could actually load the data from HDFS (on the same node) and complete the processing before the original process's GC finishes. you could bump up the *spark.locality.wait* default (not recommended) or increase your number of nodes/partitions to increase parallelism and reduce hotspots. also, keep an eye on your GC characteristics. perhaps you need to increase your Eden size to reduce the amount of movement through the GC generations and reduce major compactions. (the usual GC tuning fun.) curious if others have experienced this behavior, as well? -chris On Fri, May 2, 2014 at 6:07 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: Spark would be much faster on process_local instead of node_local. Node_local references data from local harddisk, process_local references data from in-memory thread. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Apr 22, 2014 at 4:45 PM, Joe L selme...@yahoo.com wrote: I got the following performance is it normal in spark to be like this. some times spark switchs into node_local mode from process_local and it becomes 10x faster. I am very confused. scala val a = sc.textFile(/user/exobrain/batselem/LUBM1000) scala f.count() Long = 137805557 took 130.809661618 s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/help-me-tp4598.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: function state lost when next RDD is processed
or how about the UpdateStateByKey() operation? https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html the StatefulNetworkWordCount example demonstrates how to keep state across RDDs. On Mar 28, 2014, at 8:44 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: Are you referring to Spark Streaming? Can you save the sum as a RDD keep joining the two rdd together? Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Mar 28, 2014 at 10:47 AM, Adrian Mocanu amoc...@verticalscope.com wrote: Thanks! Ya that’s what I’m doing so far, but I wanted to see if it’s possible to keep the tuples inside Spark for fault tolerance purposes. -A From: Mark Hamstra [mailto:m...@clearstorydata.com] Sent: March-28-14 10:45 AM To: user@spark.apache.org Subject: Re: function state lost when next RDD is processed As long as the amount of state being passed is relatively small, it's probably easiest to send it back to the driver and to introduce it into RDD transformations as the zero value of a fold. On Fri, Mar 28, 2014 at 7:12 AM, Adrian Mocanu amoc...@verticalscope.com wrote: I’d like to resurrect this thread since I don’t have an answer yet. From: Adrian Mocanu [mailto:amoc...@verticalscope.com] Sent: March-27-14 10:04 AM To: u...@spark.incubator.apache.org Subject: function state lost when next RDD is processed Is there a way to pass a custom function to spark to run it on the entire stream? For example, say I have a function which sums up values in each RDD and then across RDDs. I’ve tried with map, transform, reduce. They all apply my sum function on 1 RDD. When the next RDD comes the function starts from 0 so the sum of the previous RDD is lost. Does Spark support a way of passing a custom function so that its state is preserved across RDDs and not only within RDD? Thanks -Adrian
Re: network wordcount example
@eric- i saw this exact issue recently while working on the KinesisWordCount. are you passing local[2] to your example as the MASTER arg versus just local or local[1]? you need at least 2. it's documented as n1 in the scala source docs - which is easy to mistake for n=1. i just ran the NetworkWordCount sample and confirmed that local[1] does not work, but local[2] does work. give that a whirl. -chris On Mon, Mar 31, 2014 at 10:41 AM, Diana Carroll dcarr...@cloudera.comwrote: Not sure what data you are sending in. You could try calling lines.print() instead which should just output everything that comes in on the stream. Just to test that your socket is receiving what you think you are sending. On Mon, Mar 31, 2014 at 12:18 PM, eric perler ericper...@hotmail.comwrote: Hello i just started working with spark today... and i am trying to run the wordcount network example i created a socket server and client.. and i am sending data to the server in an infinite loop when i run the spark class.. i see this output in the console... --- Time: 1396281891000 ms --- 14/03/31 11:04:51 INFO SparkContext: Job finished: take at DStream.scala:586, took 0.056794606 s 14/03/31 11:04:51 INFO JobScheduler: Finished job streaming job 1396281891000 ms.0 from job set of time 1396281891000 ms 14/03/31 11:04:51 INFO JobScheduler: Total delay: 0.101 s for time 1396281891000 ms (execution: 0.058 s) 14/03/31 11:04:51 INFO TaskSchedulerImpl: Remove TaskSet 3.0 from pool but i dont see any output from the workcount operation when i make this call... wordCounts.print(); any help is greatly appreciated thanks in advance