Re: [ML] MLeap: Deploy Spark ML Pipelines w/o SparkContext

2017-02-04 Thread Chris Fregly
to date, i haven't seen very good performance coming from mleap. i believe ram 
from databricks keeps getting you guys on stage at the spark summits, but i've 
been unimpressed with the performance numbers - as well as your choice to 
reimplement own non-standard "pmml-like" mechanism which incurs heavy technical 
debt on the development side.

creating technical debt is a very databricks-like thing as seen in their own 
product - so it's no surprise that databricks supports and encourages this type 
of engineering effort.

@hollin: please correct me if i'm wrong, but the numbers you guys have quoted 
in the past are at very low scale. at one point you were quoting 40-50ms which 
is pretty bad. 11ms is better, but these are all at low scale which is not good.

i'm not sure where the 2-3ms numbers are coming from, but even that is not 
realistic in most real-world scenarios at scale.

checkout our 100% open source solution to this exact problem starting at 
http://pipeline.io. you'll find links to the github repo, youtube demos, and 
slideshare conference talks, online training, and lots more.

our entire focus at PipelineIO is optimizing. deploying, a/b + bandit testing, 
and scaling Scikit-Learn + Spark ML + Tensorflow AI models for high-performance 
predictions.

this focus on performance and scale is an extension of our team's long history 
of building highly scalable, highly available, and highly performance 
distributed ML and AI systems at netflix, twitter, mesosphere - and even 
databricks. :)

reminder that everything here is 100% open source. no product pitches here. we 
work for you guys/gals - aka the community!

please contact me directly if you're looking to solve this problem the best way 
possible.

we can get you up and running in your own cloud-based or on-premise environment 
in minutes. we support aws, google cloud, and azure - basically anywhere that 
runs docker.

any time zone works. we're completely global with free 24x7 support for 
everyone in the community.

thanks! hope this is useful.

Chris Fregly
Research Scientist @ PipelineIO
Founder @ Advanced Spark and TensorFlow Meetup
San Francisco - Chicago - Washington DC - London

On Feb 4, 2017, 12:06 PM -0600, Debasish Das <debasish.da...@gmail.com>, wrote:
>
> Except of course lda als and neural net modelfor them the model need to 
> be either prescored and cached on a kv store or the matrices / graph should 
> be kept on kv store to access them using a REST API to serve the output..for 
> neural net its more fun since its a distributed or local graph over which 
> tensorflow compute needs to run...
>
>
> In trapezium we support writing these models to store like cassandra and 
> lucene for example and then provide config driven akka-http based API to add 
> the business logic to access these model from a store and expose the model 
> serving as REST endpoint
>
>
> Matrix, graph and kernel models we use a lot and for them turned out that 
> mllib style model predict were useful if we change the underlying store...
>
> On Feb 4, 2017 9:37 AM, "Debasish Das" <debasish.da...@gmail.com 
> (mailto:debasish.da...@gmail.com)> wrote:
> >
> > If we expose an API to access the raw models out of PipelineModel can't we 
> > call predict directly on it from an API ? Is there a task open to expose 
> > the model out of PipelineModel so that predict can be called on itthere 
> > is no dependency of spark context in ml model...
> >
> > On Feb 4, 2017 9:11 AM, "Aseem Bansal" <asmbans...@gmail.com 
> > (mailto:asmbans...@gmail.com)> wrote:
> > > In Spark 2.0 there is a class called PipelineModel. I know that the title 
> > > says pipeline but it is actually talking about PipelineModel trained via 
> > > using a Pipeline.
> > > Why PipelineModel instead of pipeline? Because usually there is a series 
> > > of stuff that needs to be done when doing ML which warrants an ordered 
> > > sequence of operations. Read the new spark ml docs or one of the 
> > > databricks blogs related to spark pipelines. If you have used python's 
> > > sklearn library the concept is inspired from there.
> > > "once model is deserialized as ml model from the store of choice within 
> > > ms" - The timing of loading the model was not what I was referring to 
> > > when I was talking about timing.
> > > "it can be used on incoming features to score through spark.ml.Model 
> > > predict API". The predict API is in the old mllib package not the new ml 
> > > package.
> > > "why r we using dataframe and not the ML model directly from API" - 
> > > Because as of now the new ml package does not have the direct API.
> > >
> > >
&

Re: Is Spark 2.0 master node compatible with Spark 1.5 work node?

2016-09-18 Thread Chris Fregly
you'll see errors like this...

"java.lang.RuntimeException: java.io.InvalidClassException:
org.apache.spark.rpc.netty.RequestMessage; local class incompatible: stream
classdesc serialVersionUID = -2221986757032131007, local class
serialVersionUID = -5447855329526097695"

...when mixing versions of spark.

i'm actually seeing this right now while testing across Spark 1.6.1 and
Spark 2.0.1 for my all-in-one, hybrid cloud/on-premise Spark + Zeppelin +
Kafka + Kubernetes + Docker + One-Click Spark ML Model Production
Deployments initiative documented here:

https://github.com/fluxcapacitor/pipeline/wiki/Kubernetes-Docker-Spark-ML

and check out my upcoming meetup on this effort either in-person or online:

http://www.meetup.com/Advanced-Spark-and-TensorFlow-Meetup/events/233978839/

we're throwing in some GPU/CUDA just to sweeten the offering!  :)

On Sat, Sep 10, 2016 at 2:57 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> I don't think a 2.0 uber jar will play nicely on a 1.5 standalone cluster.
>
>
> On Saturday, September 10, 2016, Felix Cheung <felixcheun...@hotmail.com>
> wrote:
>
>> You should be able to get it to work with 2.0 as uber jar.
>>
>> What type cluster you are running on? YARN? And what distribution?
>>
>>
>>
>>
>>
>> On Sun, Sep 4, 2016 at 8:48 PM -0700, "Holden Karau" <
>> hol...@pigscanfly.ca> wrote:
>>
>> You really shouldn't mix different versions of Spark between the master
>> and worker nodes, if your going to upgrade - upgrade all of them. Otherwise
>> you may get very confusing failures.
>>
>> On Monday, September 5, 2016, Rex X <dnsr...@gmail.com> wrote:
>>
>>> Wish to use the Pivot Table feature of data frame which is available
>>> since Spark 1.6. But the spark of current cluster is version 1.5. Can we
>>> install Spark 2.0 on the master node to work around this?
>>>
>>> Thanks!
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


-- 
*Chris Fregly*
Research Scientist @ *PipelineIO* <http://pipeline.io>
*Advanced Spark and TensorFlow Meetup*
<http://www.meetup.com/Advanced-Spark-and-TensorFlow-Meetup/>
*San Francisco* | *Chicago* | *Washington DC*


Re: Spark 2.0.1 / 2.1.0 on Maven

2016-08-09 Thread Chris Fregly
alrighty then!

bcc'ing user list.  cc'ing dev list.

@user list people:  do not read any further or you will be in violation of
ASF policies!

On Tue, Aug 9, 2016 at 11:50 AM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> That's not going to happen on the user list, since that is against ASF
> policy (http://www.apache.org/dev/release.html):
>
> During the process of developing software and preparing a release, various
>> packages are made available to the developer community for testing
>> purposes. Do not include any links on the project website that might
>> encourage non-developers to download and use nightly builds, snapshots,
>> release candidates, or any other similar package. The only people who
>> are supposed to know about such packages are the people following the dev
>> list (or searching its archives) and thus aware of the conditions placed on
>> the package. If you find that the general public are downloading such test
>> packages, then remove them.
>>
>
> On Tue, Aug 9, 2016 at 11:32 AM, Chris Fregly <ch...@fregly.com> wrote:
>
>> this is a valid question.  there are many people building products and
>> tooling on top of spark and would like access to the latest snapshots and
>> such.  today's ink is yesterday's news to these people - including myself.
>>
>> what is the best way to get snapshot releases including nightly and
>> specially-blessed "preview" releases so that we, too, can say "try the
>> latest release in our product"?
>>
>> there was a lot of chatter during the 2.0.0/2.0.1 release that i largely
>> ignored because of conflicting/confusing/changing responses.  and i'd
>> rather not dig through jenkins builds to figure this out as i'll likely get
>> it wrong.
>>
>> please provide the relevant snapshot/preview/nightly/whatever repos (or
>> equivalent) that we need to include in our builds to have access to the
>> absolute latest build assets for every major and minor release.
>>
>> thanks!
>>
>> -chris
>>
>>
>> On Tue, Aug 9, 2016 at 10:00 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> LOL
>>>
>>> Ink has not dried on Spark 2 yet so to speak :)
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 9 August 2016 at 17:56, Mark Hamstra <m...@clearstorydata.com> wrote:
>>>
>>>> What are you expecting to find?  There currently are no releases beyond
>>>> Spark 2.0.0.
>>>>
>>>> On Tue, Aug 9, 2016 at 9:55 AM, Jestin Ma <jestinwith.a...@gmail.com>
>>>> wrote:
>>>>
>>>>> If we want to use versions of Spark beyond the official 2.0.0 release,
>>>>> specifically on Maven + Java, what steps should we take to upgrade? I 
>>>>> can't
>>>>> find the newer versions on Maven central.
>>>>>
>>>>> Thank you!
>>>>> Jestin
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> *Chris Fregly*
>> Research Scientist @ PipelineIO
>> San Francisco, CA
>> pipeline.io
>> advancedspark.com
>>
>>
>


-- 
*Chris Fregly*
Research Scientist @ PipelineIO
San Francisco, CA
pipeline.io
advancedspark.com


Re: Spark 2.0.1 / 2.1.0 on Maven

2016-08-09 Thread Chris Fregly
this is a valid question.  there are many people building products and
tooling on top of spark and would like access to the latest snapshots and
such.  today's ink is yesterday's news to these people - including myself.

what is the best way to get snapshot releases including nightly and
specially-blessed "preview" releases so that we, too, can say "try the
latest release in our product"?

there was a lot of chatter during the 2.0.0/2.0.1 release that i largely
ignored because of conflicting/confusing/changing responses.  and i'd
rather not dig through jenkins builds to figure this out as i'll likely get
it wrong.

please provide the relevant snapshot/preview/nightly/whatever repos (or
equivalent) that we need to include in our builds to have access to the
absolute latest build assets for every major and minor release.

thanks!

-chris


On Tue, Aug 9, 2016 at 10:00 AM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> LOL
>
> Ink has not dried on Spark 2 yet so to speak :)
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 9 August 2016 at 17:56, Mark Hamstra <m...@clearstorydata.com> wrote:
>
>> What are you expecting to find?  There currently are no releases beyond
>> Spark 2.0.0.
>>
>> On Tue, Aug 9, 2016 at 9:55 AM, Jestin Ma <jestinwith.a...@gmail.com>
>> wrote:
>>
>>> If we want to use versions of Spark beyond the official 2.0.0 release,
>>> specifically on Maven + Java, what steps should we take to upgrade? I can't
>>> find the newer versions on Maven central.
>>>
>>> Thank you!
>>> Jestin
>>>
>>
>>
>


-- 
*Chris Fregly*
Research Scientist @ PipelineIO
San Francisco, CA
pipeline.io
advancedspark.com


Re: ml models distribution

2016-07-22 Thread Chris Fregly
hey everyone-

this concept of deploying your Spark ML Pipelines and Algos into Production
(user-facing production) has been coming up a lot recently.

so much so, that i've dedicated the last few months of my research and
engineering efforts to build out the infrastructure to support this in a
highly-scalable, highly-available way.

i've combined my Netflix + NetflixOSS work experience with my
Databricks/IBM + Spark work experience into an open source project,
PipelineIO, here:  http://pipeline.io

we're even serving up TensorFlow AI models using the same infrastructure -
incorporating key patterns from TensorFlow Distributed + TensorFlow Serving!

everything is open source, based on Docker + Kubernetes + NetflixOSS +
Spark + TensorFlow + Redis + Hybrid Cloud + On-Premise + Kafka + Zeppelin +
Jupyter/iPython with a heavy emphasis on metrics and monitoring of models
and server production statistics.

we're doing code generation directly from the saved Spark ML models (thanks
Spark 2.0 for giving us save/load parity across all models!) for optimized
model serving using both CPUs and GPUs, incremental training of models,
autoscaling, the whole works.

our friend from Netflix, Chaos Monkey, even makes a grim appearance from
time to time to prove that we're resilient to failure.

take a peek.  it's cool.  we've come a long way in the last couple months,
and we've got a lot of work left to do, but the core infrastructure is in
place, key features have been built, and we're moving quickly.

shoot me an email if you'd like to get involved.  lots of TODO's.

we're dedicating my upcoming Advanced Spark and TensorFlow Meetup on August
4th in SF to demo'ing this infrastructure to you all.

here's the link:
http://www.meetup.com/Advanced-Spark-and-TensorFlow-Meetup/events/231457813/


video recording + screen capture will be posted afterward, as always.

we've got a workshop dedicated to building an end-to-end Spark ML and
Kafka-based Recommendation Pipeline - including the PipelineIO serving
platform.  link is here:  http://pipeline.io

and i'm finishing a blog post soon to detail everything we've done so far -
and everything we're actively building.  this post will be available on
http://pipeline.io - as well as cross-posted to a number of my favorite
engineering blogs.

global demo roadshow starts 8/8.  shoot me an email if you want to see all
this in action, otherwise i'll see you at a workshop or meetup near you!  :)



On Fri, Jul 22, 2016 at 10:34 AM, Inam Ur Rehman <inam.rehma...@gmail.com>
wrote:

> Hello guys..i know its irrelevant to this topic but i've been looking
> desperately for the solution. I am facing en exception
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-resolve-you-must-build-spark-with-hive-exception-td27390.html
>
> plz help me.. I couldn't find any solution.. plz
>
> On Fri, Jul 22, 2016 at 6:12 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> No there isn't anything in particular, beyond the various bits of
>> serialization support that write out something to put in your storage
>> to begin with. What you do with it after reading and before writing is
>> up to your app, on purpose.
>>
>> If you mean you're producing data outside the model that your model
>> uses, your model data might be produced by an RDD operation, and saved
>> that way. There it's no different than anything else you do with RDDs.
>>
>> What part are you looking to automate beyond those things? that's most of
>> it.
>>
>> On Fri, Jul 22, 2016 at 2:04 PM, Sergio Fernández <wik...@apache.org>
>> wrote:
>> > Hi Sean,
>> >
>> > On Fri, Jul 22, 2016 at 12:52 PM, Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> If you mean, how do you distribute a new model in your application,
>> >> then there's no magic to it. Just reference the new model in the
>> >> functions you're executing in your driver.
>> >>
>> >> If you implemented some other manual way of deploying model info, just
>> >> do that again. There's no special thing to know.
>> >
>> >
>> > Well, because some huge model, we typically bundle both logic
>> > (pipeline/application)  and models separately. Normally we use a shared
>> > stores (e.g., HDFS) or coordinated distribution of the models. But I
>> wanted
>> > to know if there is any infrastructure in Spark that specifically
>> addresses
>> > such need.
>> >
>> > Thanks.
>> >
>> > Cheers,
>> >
>> > P.S.: sorry Jacek, with "ml" I meant "Machine Learning". I thought is a
>> > quite spread acronym. Sorry for the possible confusion.
>> >
>> >
>> > --
>> > Sergio Fernández
>> > Partner Technology Manager
>> > Redlink GmbH
>> > m: +43 6602747925
>> > e: sergio.fernan...@redlink.co
>> > w: http://redlink.co
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


-- 
*Chris Fregly*
Research Scientist @ PipelineIO
San Francisco, CA
pipeline.io
advancedspark.com


Re: [Spark 2.0.0] Structured Stream on Kafka

2016-06-14 Thread Chris Fregly
+1 vote, +1 watch

this would be huge.

On Tue, Jun 14, 2016 at 10:47 AM, andy petrella <andy.petre...@gmail.com>
wrote:

> kool, voted and watched!
> tx
>
> On Tue, Jun 14, 2016 at 4:44 PM Cody Koeninger <c...@koeninger.org> wrote:
>
>> I haven't done any significant work on using structured streaming with
>> kafka, there's a jira ticket for tracking purposes
>>
>> https://issues.apache.org/jira/browse/SPARK-15406
>>
>>
>>
>> On Tue, Jun 14, 2016 at 9:21 AM, andy petrella <andy.petre...@gmail.com>
>> wrote:
>> > Heya folks,
>> >
>> > Just wondering if there are some doc regarding using kafka directly
>> from the
>> > reader.stream?
>> > Has it been integrated already (I mean the source)?
>> >
>> > Sorry if the answer is RTFM (but then I'd appreciate a pointer anyway^^)
>> >
>> > thanks,
>> > cheers
>> > andy
>> > --
>> > andy
>>
> --
> andy
>



-- 
*Chris Fregly*
Research Scientist @ PipelineIO
San Francisco, CA
http://pipeline.io


Re: Book for Machine Learning (MLIB and other libraries on Spark)

2016-06-12 Thread Chris Fregly
two of my faves:

https://www.amazon.com/Advanced-Analytics-Spark-Patterns-Learning/dp/1491912766/
(Cloudera authors)

https://www.amazon.com/Machine-Learning-Spark-Powerful-Algorithms/dp/1783288515/
(IBM author)

(most) authors are Spark Committers.

while not totally up to date w/ ML pipelines and such, these 2 books give
you relevant use cases and make you think about ML in terms of distributed
systems.

classics.

On Sun, Jun 12, 2016 at 7:57 AM, Deepak Goel <deic...@gmail.com> wrote:

> Thank You...Please see inline..
>
>
> On Sun, Jun 12, 2016 at 3:39 PM, <mylistt...@gmail.com> wrote:
>
>> Machine learning - I would suggest that you pick up a fine book that
>> explains machine learning. That's the way I went about - pick up each type
>> of machine learning concept - say Linear regression then understand the
>> why/when/how etc and infer results etc.
>>
>> Then apply the learning to a small data set using python or R or scala
>> without Spark. This is to familiarize the learning.
>>
>>
> Then run the same with MLlib and see it with a big data set on Spark. I
>> would call this consolidation.
>>
> *Deepak
>
> Sorry for the confusion in my question. However, I was more interested in
> getting hold of a book which explains how I can use MLlib and Spark for
> machine learning problems.
>
> *Deepak
>
>>
>> Few things to remember - all Machine learning algorithms are not
>> available On spark. There is a list of machine learning supported in spark.
>> Kindly look at that. Also look at how to integrate mahout / h20 with spark
>> and see how you can run the machine learning stuff supported by mahout with
>> spark.
>>
>> And then your journey begins :-).
>>
>> Regards,
>> Harmeet
>>
>>
>>
>>
>> On Jun 12, 2016, at 0:31, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>> yes absolutely Ted.
>>
>> Thanks for highlighting it
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 11 June 2016 at 19:00, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Another source is the presentation on various ocnferences.
>>> e.g.
>>>
>>> http://www.slideshare.net/databricks/apache-spark-mllib-20-preview-data-science-and-production
>>>
>>> FYI
>>>
>>> On Sat, Jun 11, 2016 at 8:47 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Interesting.
>>>>
>>>> The pace of development in this field is such that practically every
>>>> single book in Big Data landscape gets out of data before the ink dries on
>>>> it  :)
>>>>
>>>> I concur that they serve as good reference for starters but in my
>>>> opinion the best way to learn is to start from on-line docs (and these are
>>>> pretty respectful when it comes to Spark) and progress from there.
>>>>
>>>> If you have a certain problem then put to this group and I am sure
>>>> someone somewhere in this forum has come across it. Also most of these
>>>> books' authors actively contribute to this mailing list.
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * 
>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>> On 11 June 2016 at 16:10, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>>
>>>>> https://www.amazon.com/Machine-Learning-Spark-Powerful-Algorithms/dp/1783288515/ref=sr_1_1?ie=UTF8=1465657706=8-1=spark+mllib
>>>>>
>>>>>
>>>>> https://www.amazon.com/Spark-Practical-Machine-Learning-Chinese/dp/7302420424/ref=sr_1_3?ie=UTF8=1465657706=8-3=spark+mllib
>>>>>
>>>>>
>>>>> https://www.amazon.com/Advanced-Analytics-Spark-Patterns-Learning/dp/1491912766/ref=sr_1_2?ie=UTF8=1465657706=8-2=spark+mllib
>>>>>
>>>>>
>>>>> On Sat, Jun 11, 2016 at 8:04 AM, Deepak Goel <deic...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hey
>>>>>>
>>>>>> Namaskara~Nalama~Guten Tag~Bonjour
>>>>>>
>>>>>> I am a newbie to Machine Learning (MLIB and other libraries on Spark)
>>>>>>
>>>>>> Which would be the best book to learn up?
>>>>>>
>>>>>> Thanks
>>>>>> Deepak
>>>>>>--
>>>>>> Keigu
>>>>>>
>>>>>> Deepak
>>>>>> 73500 12833
>>>>>> www.simtree.net, dee...@simtree.net
>>>>>> deic...@gmail.com
>>>>>>
>>>>>> LinkedIn: www.linkedin.com/in/deicool
>>>>>> Skype: thumsupdeicool
>>>>>> Google talk: deicool
>>>>>> Blog: http://loveandfearless.wordpress.com
>>>>>> Facebook: http://www.facebook.com/deicool
>>>>>>
>>>>>> "Contribute to the world, environment and more :
>>>>>> http://www.gridrepublic.org
>>>>>> "
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
*Chris Fregly*
Research Scientist @ PipelineIO
San Francisco, CA
http://pipeline.io


Re: Classpath hell and Elasticsearch 2.3.2...

2016-06-02 Thread Chris Fregly
gt;>>>>> at
>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>
>>>>>> ... but I think its caused by this:
>>>>>>
>>>>>> 16/06/03 00:26:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0
>>>>>> (TID 0, localhost): java.lang.Error: Multiple ES-Hadoop versions detected
>>>>>> in the classpath; please use only one
>>>>>>
>>>>>> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-2.3.2.jar
>>>>>>
>>>>>> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-spark_2.11-2.3.2.jar
>>>>>>
>>>>>> jar:file:/usr/share/elasticsearch-hadoop/lib/elasticsearch-hadoop-mr-2.3.2.jar
>>>>>>
>>>>>> at org.elasticsearch.hadoop.util.Version.(Version.java:73)
>>>>>> at
>>>>>> org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:376)
>>>>>> at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
>>>>>> at
>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>> at
>>>>>> org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> .. still tracking this down but was wondering if there is someting
>>>>>> obvious I'm dong wrong.  I'm going to take out
>>>>>> elasticsearch-hadoop-2.3.2.jar and try again.
>>>>>>
>>>>>> Lots of trial and error here :-/
>>>>>>
>>>>>> Kevin
>>>>>>
>>>>>> --
>>>>>>
>>>>>> We’re hiring if you know of any awesome Java Devops or Linux
>>>>>> Operations Engineers!
>>>>>>
>>>>>> Founder/CEO Spinn3r.com
>>>>>> Location: *San Francisco, CA*
>>>>>> blog: http://burtonator.wordpress.com
>>>>>> … or check out my Google+ profile
>>>>>> <https://plus.google.com/102718274791889610666/posts>
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>>>> Engineers!
>>>>
>>>> Founder/CEO Spinn3r.com
>>>> Location: *San Francisco, CA*
>>>> blog: http://burtonator.wordpress.com
>>>> … or check out my Google+ profile
>>>> <https://plus.google.com/102718274791889610666/posts>
>>>>
>>>>
>>
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> <https://plus.google.com/102718274791889610666/posts>
>>
>>


-- 
*Chris Fregly*
Research Scientist @ PipelineIO
San Francisco, CA
http://pipeline.io


Re: GraphX Java API

2016-05-30 Thread Chris Fregly
btw, GraphX in Action is one of the better books out on Spark.

Michael did a great job with this one.  He even breaks down snippets of
Scala for newbies to understand the seemingly-arbitrary syntax.  I learned
quite a bit about not only Spark, but also Scala.

And of course, we shouldn't forget about Sean's Advanced Analytics with
Spark which, of course, is a classic that I still reference regularly.  :)

On Mon, May 30, 2016 at 7:42 AM, Michael Malak <
michaelma...@yahoo.com.invalid> wrote:

> Yes, it is possible to use GraphX from Java but it requires 10x the amount
> of code and involves using obscure typing and pre-defined lambda prototype
> facilities. I give an example of it in my book, the source code for which
> can be downloaded for free from
> https://www.manning.com/books/spark-graphx-in-action The relevant example
> is EdgeCount.java in chapter 10.
>
> As I suggest in my book, likely the only reason you'd want to put yourself
> through that torture is corporate mandate or compatibility with Java
> bytecode tools.
>
>
> --
> *From:* Sean Owen <so...@cloudera.com>
> *To:* Takeshi Yamamuro <linguin@gmail.com>; "Kumar, Abhishek (US -
> Bengaluru)" <abhishekkuma...@deloitte.com>
> *Cc:* "user@spark.apache.org" <user@spark.apache.org>
> *Sent:* Monday, May 30, 2016 7:07 AM
> *Subject:* Re: GraphX Java API
>
> No, you can call any Scala API in Java. It is somewhat less convenient if
> the method was not written with Java in mind but does work.
>
> On Mon, May 30, 2016, 00:32 Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
> These package are used only for Scala.
>
> On Mon, May 30, 2016 at 2:23 PM, Kumar, Abhishek (US - Bengaluru) <
> abhishekkuma...@deloitte.com> wrote:
>
> Hey,
> ·   I see some graphx packages listed here:
> http://spark.apache.org/docs/latest/api/java/index.html
> ·   org.apache.spark.graphx
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/package-frame.html>
> ·   org.apache.spark.graphx.impl
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/impl/package-frame.html>
> ·   org.apache.spark.graphx.lib
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/lib/package-frame.html>
> ·   org.apache.spark.graphx.util
> <http://spark.apache.org/docs/latest/api/java/org/apache/spark/graphx/util/package-frame.html>
> Aren’t they meant to be used with JAVA?
> Thanks
>
> *From:* Santoshakhilesh [mailto:santosh.akhil...@huawei.com]
> *Sent:* Friday, May 27, 2016 4:52 PM
> *To:* Kumar, Abhishek (US - Bengaluru) <abhishekkuma...@deloitte.com>;
> user@spark.apache.org
> *Subject:* RE: GraphX Java API
>
> GraphX APis are available only in Scala. If you need to use GraphX you
> need to switch to Scala.
>
> *From:* Kumar, Abhishek (US - Bengaluru) [
> mailto:abhishekkuma...@deloitte.com <abhishekkuma...@deloitte.com>]
> *Sent:* 27 May 2016 19:59
> *To:* user@spark.apache.org
> *Subject:* GraphX Java API
>
> Hi,
>
> We are trying to consume the Java API for GraphX, but there is no
> documentation available online on the usage or examples. It would be great
> if we could get some examples in Java.
>
> Thanks and regards,
>
> *Abhishek Kumar*
>
>
>
> This message (including any attachments) contains confidential information
> intended for a specific individual and purpose, and is protected by law. If
> you are not the intended recipient, you should delete this message and any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, by you is strictly prohibited.
> v.E.1
>
>
>
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>
>
>
>


-- 
*Chris Fregly*
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.ai


Re: local Vs Standalonecluster production deployment

2016-05-28 Thread Chris Fregly
t;>>>>>> spark-submit in the host where the cluster manager is running.
>>>>>>>>>
>>>>>>>>> The Driver node runs on the same host that the cluster manager is
>>>>>>>>> running. The Driver requests the Cluster Manager for resources to run
>>>>>>>>> tasks. The worker is tasked to create the executor (in this case 
>>>>>>>>> there is
>>>>>>>>> only one executor) for the Driver. The Executor runs tasks for the 
>>>>>>>>> Driver.
>>>>>>>>> Only one executor can be allocated on each worker per application. In 
>>>>>>>>> your
>>>>>>>>> case you only have
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The minimum you will need will be 2-4G of RAM and two cores. Well
>>>>>>>>> that is my experience. Yes you can submit more than one spark-submit 
>>>>>>>>> (the
>>>>>>>>> driver) but they may queue up behind the running one if there is not 
>>>>>>>>> enough
>>>>>>>>> resources.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> You pointed out that you will be running few applications in
>>>>>>>>> parallel on the same host. The likelihood is that you are using a VM
>>>>>>>>> machine for this purpose and the best option is to try running the 
>>>>>>>>> first
>>>>>>>>> one, Check Web GUI on  4040 to see the progress of this Job. If you 
>>>>>>>>> start
>>>>>>>>> the next JVM then assuming it is working, it will be using port 4041 
>>>>>>>>> and so
>>>>>>>>> forth.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In actual fact try the command "free" to see how much free memory
>>>>>>>>> you have.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> LinkedIn * 
>>>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 28 May 2016 at 16:42, sujeet jog <sujeet@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I have a question w.r.t  production deployment mode of spark,
>>>>>>>>>>
>>>>>>>>>> I have 3 applications which i would like to run independently on
>>>>>>>>>> a single machine, i need to run the drivers in the same machine.
>>>>>>>>>>
>>>>>>>>>> The amount of resources i have is also limited, like 4- 5GB RAM ,
>>>>>>>>>> 3 - 4 cores.
>>>>>>>>>>
>>>>>>>>>> For deployment in standalone mode : i believe i need
>>>>>>>>>>
>>>>>>>>>> 1 Driver JVM,  1 worker node ( 1 executor )
>>>>>>>>>> 1 Driver JVM,  1 worker node ( 1 executor )
>>>>>>>>>> 1 Driver JVM,  1 worker node ( 1 executor )
>>>>>>>>>>
>>>>>>>>>> The issue here is i will require 6 JVM running in parallel, for
>>>>>>>>>> which i do not have sufficient CPU/MEM resources,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hence i was looking more towards a local mode deployment mode,
>>>>>>>>>> would like to know if anybody is using local mode where Driver + 
>>>>>>>>>> Executor
>>>>>>>>>> run in a single JVM in production mode.
>>>>>>>>>>
>>>>>>>>>> Are there any inherent issues upfront using local mode for
>>>>>>>>>> production base systems.?..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


-- 
*Chris Fregly*
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.ai


Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-17 Thread Chris Fregly
you can use HyperLogLog with Spark Streaming to accomplish this.

here is an example from my fluxcapacitor GitHub repo:

https://github.com/fluxcapacitor/pipeline/tree/master/myapps/spark/streaming/src/main/scala/com/advancedspark/streaming/rating/approx

here's an accompanying SlideShare presentation from one of my recent
meetups (slides 70-83):

http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037

<http://www.slideshare.net/cfregly/spark-after-dark-20-apache-big-data-conf-vancouver-may-11-2016-61970037>
and a YouTube video for those that prefer video (starting at 32 mins into
the video for your convenience):

https://youtu.be/wM9Z0PLx3cw?t=1922


On Tue, May 17, 2016 at 12:17 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Ok but how about something similar to
>
> val countByValueAndWindow = price.filter(_ >
> 95.0).countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
>
>
> Using a new count => c*ountDistinctByValueAndWindow ?*
>
> val countDistinctByValueAndWindow = price.filter(_ >
> 95.0).countDistinctByValueAndWindow(Seconds(windowLength),
> Seconds(slidingInterval))
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 May 2016 at 20:02, Michael Armbrust <mich...@databricks.com> wrote:
>
>> In 2.0 you won't be able to do this.  The long term vision would be to
>> make this possible, but a window will be required (like the 24 hours you
>> suggest).
>>
>> On Tue, May 17, 2016 at 1:36 AM, Todd <bit1...@163.com> wrote:
>>
>>> Hi,
>>> We have a requirement to do count(distinct) in a processing batch
>>> against all the streaming data(eg, last 24 hours' data),that is,when we do
>>> count(distinct),we actually want to compute distinct against last 24 hours'
>>> data.
>>> Does structured streaming support this scenario?Thanks!
>>>
>>
>>
>

-- 
*Chris Fregly*
Research Scientist @ Flux Capacitor AI
"Bringing AI Back to the Future!"
San Francisco, CA
http://fluxcapacitor.com


Re: Any NLP lib could be used on spark?

2016-04-20 Thread Chris Fregly
this took me a bit to get working, but I finally got it up and running so with 
the package that Burak pointed out.

here's some relevant links to my project that should give you some clues:

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/spark/ml/src/main/scala/com/advancedspark/ml/nlp/ItemDescriptionsDF.scala

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/spark/ml/build.sbt

https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/pipeline.bashrc
 (look for the SPARK_SUBMIT_PACKAGES export var)

there's also a few Zeppelin notebooks in that repo that will show you got to 
use it.  I'm doing sentiment analysis in one - as well as entity recognition 
and other fun stuff.

it's a pain to setup, unfortunately.  not sure why it.  lots of missing pieces 
that had to be manually cobbled together.

> On Apr 19, 2016, at 5:00 PM, Burak Yavuz  wrote:
> 
> A quick search on spark-packages returns: 
> http://spark-packages.org/package/databricks/spark-corenlp.
> 
> You may need to build it locally and add it to your session by --jars.
> 
> On Tue, Apr 19, 2016 at 10:47 AM, Gavin Yue  wrote:
>> Hey, 
>> 
>> Want to try the NLP on the spark. Could anyone recommend any easy to run NLP 
>> open source lib on spark?
>> 
>> Also is there any recommended semantic network? 
>> 
>> Thanks a lot.  
> 


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Chris Fregly
perhaps renaming to Spark ML would actually clear up code and documentation 
confusion?

+1 for rename 

> On Apr 5, 2016, at 7:00 PM, Reynold Xin  wrote:
> 
> +1
> 
> This is a no brainer IMO.
> 
> 
>> On Tue, Apr 5, 2016 at 7:32 PM, Joseph Bradley  wrote:
>> +1  By the way, the JIRA for tracking (Scala) API parity is: 
>> https://issues.apache.org/jira/browse/SPARK-4591
>> 
>>> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia  
>>> wrote:
>>> This sounds good to me as well. The one thing we should pay attention to is 
>>> how we update the docs so that people know to start with the spark.ml 
>>> classes. Right now the docs list spark.mllib first and also seem more 
>>> comprehensive in that area than in spark.ml, so maybe people naturally move 
>>> towards that.
>>> 
>>> Matei
>>> 
 On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
 
 Yes, DB (cc'ed) is working on porting the local linear algebra library 
 over (SPARK-13944). There are also frequent pattern mining algorithms we 
 need to port over in order to reach feature parity. -Xiangrui
 
> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman 
>  wrote:
> Overall this sounds good to me. One question I have is that in
> addition to the ML algorithms we have a number of linear algebra
> (various distributed matrices) and statistical methods in the
> spark.mllib package. Is the plan to port or move these to the spark.ml
> namespace in the 2.x series ?
> 
> Thanks
> Shivaram
> 
> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
> > FWIW, all of that sounds like a good plan to me. Developing one API is
> > certainly better than two.
> >
> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  wrote:
> >> Hi all,
> >>
> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API 
> >> built
> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based 
> >> API has
> >> been developed under the spark.ml package, while the old RDD-based API 
> >> has
> >> been developed in parallel under the spark.mllib package. While it was
> >> easier to implement and experiment with new APIs under a new package, 
> >> it
> >> became harder and harder to maintain as both packages grew bigger and
> >> bigger. And new users are often confused by having two sets of APIs 
> >> with
> >> overlapped functions.
> >>
> >> We started to recommend the DataFrame-based API over the RDD-based API 
> >> in
> >> Spark 1.5 for its versatility and flexibility, and we saw the 
> >> development
> >> and the usage gradually shifting to the DataFrame-based API. Just 
> >> counting
> >> the lines of Scala code, from 1.5 to the current master we added ~1
> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So, 
> >> to
> >> gather more resources on the development of the DataFrame-based API 
> >> and to
> >> help users migrate over sooner, I want to propose switching RDD-based 
> >> MLlib
> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
> >>
> >> * We do not accept new features in the RDD-based spark.mllib package, 
> >> unless
> >> they block implementing new features in the DataFrame-based spark.ml
> >> package.
> >> * We still accept bug fixes in the RDD-based API.
> >> * We will add more features to the DataFrame-based API in the 2.x 
> >> series to
> >> reach feature parity with the RDD-based API.
> >> * Once we reach feature parity (possibly in Spark 2.2), we will 
> >> deprecate
> >> the RDD-based API.
> >> * We will remove the RDD-based API from the main Spark repo in Spark 
> >> 3.0.
> >>
> >> Though the RDD-based API is already in de facto maintenance mode, this
> >> announcement will make it clear and hence important to both MLlib 
> >> developers
> >> and users. So we’d greatly appreciate your feedback!
> >>
> >> (As a side note, people sometimes use “Spark ML” to refer to the
> >> DataFrame-based API or even the entire MLlib component. This also 
> >> causes
> >> confusion. To be clear, “Spark ML” is not an official name and there 
> >> are no
> >> plans to rename MLlib to “Spark ML” at this time.)
> >>
> >> Best,
> >> Xiangrui
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> 


Re: Spark for Log Analytics

2016-03-31 Thread Chris Fregly
oh, and I forgot to mention Kafka Streams which has been heavily talked
about the last few days at Strata here in San Jose.

Streams can simplify a lot of this architecture by perform some
light-to-medium-complex transformations in Kafka itself.

i'm waiting anxiously for Kafka 0.10 with production-ready Kafka Streams,
so I can try this out myself - and hopefully remove a lot of extra plumbing.

On Thu, Mar 31, 2016 at 4:42 AM, Chris Fregly <ch...@fregly.com> wrote:

> this is a very common pattern, yes.
>
> note that in Netflix's case, they're currently pushing all of their logs
> to a Fronting Kafka + Samza Router which can route to S3 (or HDFS),
> ElasticSearch, and/or another Kafka Topic for further consumption by
> internal apps using other technologies like Spark Streaming (instead of
> Samza).
>
> this Fronting Kafka + Samza Router also helps to differentiate between
> high-priority events (Errors or High Latencies) and normal-priority events
> (normal User Play or Stop events).
>
> here's a recent presentation i did which details this configuration
> starting at slide 104:
> http://www.slideshare.net/cfregly/dc-spark-users-group-march-15-2016-spark-and-netflix-recommendations
> .
>
> btw, Confluent's distribution of Kafka does have a direct Http/REST API
> which is not recommended for production use, but has worked well for me in
> the past.
>
> these are some additional options to think about, anyway.
>
>
> On Thu, Mar 31, 2016 at 4:26 AM, Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>>
>> On 31 Mar 2016, at 09:37, ashish rawat <dceash...@gmail.com> wrote:
>>
>> Hi,
>>
>> I have been evaluating Spark for analysing Application and Server Logs. I
>> believe there are some downsides of doing this:
>>
>> 1. No direct mechanism of collecting log, so need to introduce other
>> tools like Flume into the pipeline.
>>
>>
>> you need something to collect logs no matter what you run. Flume isn't so
>> bad; if you bring it up on the same host as the app then you can even
>> collect logs while the network is playing up.
>>
>> Or you can just copy log4j files to HDFS and process them later
>>
>> 2. Need to write lots of code for parsing different patterns from logs,
>> while some of the log analysis tools like logstash or loggly provide it out
>> of the box
>>
>>
>>
>> Log parsing is essentially an ETL problem, especially if you don't try to
>> lock down the log event format.
>>
>> You can also configure Log4J to save stuff in an easy-to-parse format
>> and/or forward directly to your application.
>>
>> There's a log4j to flume connector to do that for you,
>>
>>
>> http://www.thecloudavenue.com/2013/11/using-log4jflume-to-log-application.html
>>
>> or you can output in, say, JSON (
>> https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/Log4Json.java
>>  )
>>
>> I'd go with flume unless you had a need to save the logs locally and copy
>> them to HDFS laster.
>>
>>
>>
>> On the benefits side, I believe Spark might be more performant (although
>> I am yet to benchmark it) and being a generic processing engine, might work
>> with complex use cases where the out of the box functionality of log
>> analysis tools is not sufficient (although I don't have any such use case
>> right now).
>>
>> One option I was considering was to use logstash for collection and basic
>> processing and then sink the processed logs to both elastic search and
>> kafka. So that Spark Streaming can pick data from Kafka for the complex use
>> cases, while logstash filters can be used for the simpler use cases.
>>
>> I was wondering if someone has already done this evaluation and could
>> provide me some pointers on how/if to create this pipeline with Spark.
>>
>> Regards,
>> Ashish
>>
>>
>>
>>
>
>
> --
>
> *Chris Fregly*
> Principal Data Solutions Engineer
> IBM Spark Technology Center, San Francisco, CA
> http://spark.tc | http://advancedspark.com
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Spark for Log Analytics

2016-03-31 Thread Chris Fregly
this is a very common pattern, yes.

note that in Netflix's case, they're currently pushing all of their logs to
a Fronting Kafka + Samza Router which can route to S3 (or HDFS),
ElasticSearch, and/or another Kafka Topic for further consumption by
internal apps using other technologies like Spark Streaming (instead of
Samza).

this Fronting Kafka + Samza Router also helps to differentiate between
high-priority events (Errors or High Latencies) and normal-priority events
(normal User Play or Stop events).

here's a recent presentation i did which details this configuration
starting at slide 104:
http://www.slideshare.net/cfregly/dc-spark-users-group-march-15-2016-spark-and-netflix-recommendations
.

btw, Confluent's distribution of Kafka does have a direct Http/REST API
which is not recommended for production use, but has worked well for me in
the past.

these are some additional options to think about, anyway.


On Thu, Mar 31, 2016 at 4:26 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 31 Mar 2016, at 09:37, ashish rawat <dceash...@gmail.com> wrote:
>
> Hi,
>
> I have been evaluating Spark for analysing Application and Server Logs. I
> believe there are some downsides of doing this:
>
> 1. No direct mechanism of collecting log, so need to introduce other tools
> like Flume into the pipeline.
>
>
> you need something to collect logs no matter what you run. Flume isn't so
> bad; if you bring it up on the same host as the app then you can even
> collect logs while the network is playing up.
>
> Or you can just copy log4j files to HDFS and process them later
>
> 2. Need to write lots of code for parsing different patterns from logs,
> while some of the log analysis tools like logstash or loggly provide it out
> of the box
>
>
>
> Log parsing is essentially an ETL problem, especially if you don't try to
> lock down the log event format.
>
> You can also configure Log4J to save stuff in an easy-to-parse format
> and/or forward directly to your application.
>
> There's a log4j to flume connector to do that for you,
>
>
> http://www.thecloudavenue.com/2013/11/using-log4jflume-to-log-application.html
>
> or you can output in, say, JSON (
> https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/log/Log4Json.java
>  )
>
> I'd go with flume unless you had a need to save the logs locally and copy
> them to HDFS laster.
>
>
>
> On the benefits side, I believe Spark might be more performant (although I
> am yet to benchmark it) and being a generic processing engine, might work
> with complex use cases where the out of the box functionality of log
> analysis tools is not sufficient (although I don't have any such use case
> right now).
>
> One option I was considering was to use logstash for collection and basic
> processing and then sink the processed logs to both elastic search and
> kafka. So that Spark Streaming can pick data from Kafka for the complex use
> cases, while logstash filters can be used for the simpler use cases.
>
> I was wondering if someone has already done this evaluation and could
> provide me some pointers on how/if to create this pipeline with Spark.
>
> Regards,
> Ashish
>
>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Can we use spark inside a web service?

2016-03-10 Thread Chris Fregly
great discussion, indeed.

Mark Hamstra and i spoke offline just now.

Below is a quick recap of our discussion on how they've achieved acceptable
performance from Spark on the user request/response path (@mark- feel free
to correct/comment).

1) there is a big difference in request/response latency between submitting
a full Spark Application (heavy weight) versus having a long-running Spark
Application (like Spark Job Server) that submits lighter-weight Jobs using
a shared SparkContext.  mark is obviously using the latter - a long-running
Spark App.

2) there are some enhancements to Spark that are required to achieve
acceptable user request/response times.  some links that Mark provided are
as follows:

   - https://issues.apache.org/jira/browse/SPARK-11838
   - https://github.com/apache/spark/pull/11036
   - https://github.com/apache/spark/pull/11403
   - https://issues.apache.org/jira/browse/SPARK-13523
   - https://issues.apache.org/jira/browse/SPARK-13756

Essentially, a deeper level of caching at the shuffle file layer to reduce
compute and memory between queries.

Note that Mark is running a slightly-modified version of stock Spark.
 (He's mentioned this in prior posts, as well.)

And I have to say that I'm, personally, seeing more and more
slightly-modified versions of Spark being deployed to production to
workaround outstanding PR's and Jiras.

this may not be what people want to hear, but it's a trend that i'm seeing
lately as more and more customize Spark to their specific use cases.

Anyway, thanks for the good discussion, everyone!  This is why we have
these lists, right!  :)


On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com> wrote:

> One of the premises here is that if you can restrict your workload to
> fewer cores - which is easier with FiloDB and careful data modeling -
> you can make this work for much higher concurrency and lower latency
> than most typical Spark use cases.
>
> The reason why it typically does not work in production is that most
> people are using HDFS and files.  These data sources are designed for
> running queries and workloads on all your cores across many workers,
> and not for filtering your workload down to only one or two cores.
>
> There is actually nothing inherent in Spark that prevents people from
> using it as an app server.   However, the insistence on using it with
> HDFS is what kills concurrency.   This is why FiloDB is important.
>
> I agree there are more optimized stacks for running app servers, but
> the choices that you mentioned:  ES is targeted at text search;  Cass
> and HBase by themselves are not fast enough for analytical queries
> that the OP wants;  and MySQL is great but not scalable.   Probably
> something like VectorWise, HANA, Vertica would work well, but those
> are mostly not free solutions.   Druid could work too if the use case
> is right.
>
> Anyways, great discussion!
>
> On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly <ch...@fregly.com> wrote:
> > you are correct, mark.  i misspoke.  apologies for the confusion.
> >
> > so the problem is even worse given that a typical job requires multiple
> > tasks/cores.
> >
> > i have yet to see this particular architecture work in production.  i
> would
> > love for someone to prove otherwise.
> >
> > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra <m...@clearstorydata.com>
> > wrote:
> >>>
> >>> For example, if you're looking to scale out to 1000 concurrent
> requests,
> >>> this is 1000 concurrent Spark jobs.  This would require a cluster with
> 1000
> >>> cores.
> >>
> >>
> >> This doesn't make sense.  A Spark Job is a driver/DAGScheduler concept
> >> without any 1:1 correspondence between Worker cores and Jobs.  Cores are
> >> used to run Tasks, not Jobs.  So, yes, a 1000 core cluster can run at
> most
> >> 1000 simultaneous Tasks, but that doesn't really tell you anything
> about how
> >> many Jobs are or can be concurrently tracked by the DAGScheduler, which
> will
> >> be apportioning the Tasks from those concurrent Jobs across the
> available
> >> Executor cores.
> >>
> >> On Thu, Mar 10, 2016 at 2:00 PM, Chris Fregly <ch...@fregly.com> wrote:
> >>>
> >>> Good stuff, Evan.  Looks like this is utilizing the in-memory
> >>> capabilities of FiloDB which is pretty cool.  looking forward to the
> webcast
> >>> as I don't know much about FiloDB.
> >>>
> >>> My personal thoughts here are to removed Spark from the user
> >>> request/response hot path.
> >>>
> >>> I can't tell you how many times i've had to unroll that architecture at
> >

Re: [MLlib - ALS] Merging two Models?

2016-03-10 Thread Chris Fregly
@Colin-  you're asking the $1 million dollar question that a lot of people
are trying to do.  This was literally the #1 most-asked question in every
city on my recent world-wide meetup tour.

I've been pointing people to my old Databricks co-worker's
streaming-matrix-factorization project:
https://github.com/brkyvz/streaming-matrix-factorization  He got tired of
everyone asking about this - and cranked it out over a weekend.  Love that
guy, Burak!  :)

I've attempted (unsuccessfully, so far) to deploy exactly what you're
trying to do here:
https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/ml/TrainMFIncremental.scala

We're a couple pull requests away from making this happen.  You can see my
comments and open github issues for the remaining bits.

And this will be my focus in the next week or so as I prepare for an
upcoming conference.  Keep an eye on this repo if you'd like.

@Sean:  thanks for the link.  I knew Oryx was doing this somehow - and I
kept meaning to see how you were doing it.  I'll likely incorporate some of
your stuff into my final solution.


On Thu, Mar 10, 2016 at 3:35 PM, Sean Owen <so...@cloudera.com> wrote:

> While it isn't crazy, I am not sure how valid it is to build a model
> off of only a chunk of recent data and then merge it into another
> model in any direct way. They're not really sharing a basis, so you
> can't just average them.
>
> My experience with this aspect suggests you should try to update the
> existing model in place on the fly. In short, you figure out how much
> the new input ought to change your estimate of the (user,item)
> association. Positive interactions should increase it a bit, etc. Then
> you work out how the item vector would change if the user vector were
> fixed in order to accomplish that change, with a bit of linear
> algebra. Vice versa for user vector. Of course, those changes affect
> the rest of the matrix too but that's the 'approximate' bit.
>
> I so happen to have an implementation of this in the context of a
> Spark ALS model, though raw source code may be hard to read. If it's
> of interest we can discuss offline (or online here to the extent it's
> relevant to Spark users)
>
>
> https://github.com/OryxProject/oryx/blob/91004a03413eef0fdfd6e75a61b68248d11db0e5/app/oryx-app/src/main/java/com/cloudera/oryx/app/speed/als/ALSSpeedModelManager.java#L192
>
> On Thu, Mar 10, 2016 at 8:01 PM, Colin Woodbury <coli...@gmail.com> wrote:
> > Hi there, I'm wondering if it's possible (or feasible) to combine the
> > feature matrices of two MatrixFactorizationModels that share a user and
> > product set.
> >
> > Specifically, one model would be the "on-going" model, and the other is
> one
> > trained only on the most recent aggregation of some event data. My
> overall
> > goal is to try to approximate "online" training, as ALS doesn't support
> > streaming, and it also isn't possible to "seed" the ALS training process
> > with an already trained model.
> >
> > Since the two Models would share a user/product ID space, can their
> feature
> > matrices be merged? For instance via:
> >
> > 1. Adding feature vectors together for user/product vectors that appear
> in
> > both models
> > 2. Averaging said vectors instead
> > 3. Some other linear algebra operation
> >
> > Unfortunately, I'm fairly ignorant as to the internal mechanics of ALS
> > itself. Is what I'm asking possible?
> >
> > Thank you,
> > Colin
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Can we use spark inside a web service?

2016-03-10 Thread Chris Fregly
you are correct, mark.  i misspoke.  apologies for the confusion.

so the problem is even worse given that a typical job requires multiple
tasks/cores.

i have yet to see this particular architecture work in production.  i would
love for someone to prove otherwise.

On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> For example, if you're looking to scale out to 1000 concurrent requests,
>> this is 1000 concurrent Spark jobs.  This would require a cluster with 1000
>> cores.
>
>
> This doesn't make sense.  A Spark Job is a driver/DAGScheduler concept
> without any 1:1 correspondence between Worker cores and Jobs.  Cores are
> used to run Tasks, not Jobs.  So, yes, a 1000 core cluster can run at most
> 1000 simultaneous Tasks, but that doesn't really tell you anything about
> how many Jobs are or can be concurrently tracked by the DAGScheduler, which
> will be apportioning the Tasks from those concurrent Jobs across the
> available Executor cores.
>
> On Thu, Mar 10, 2016 at 2:00 PM, Chris Fregly <ch...@fregly.com> wrote:
>
>> Good stuff, Evan.  Looks like this is utilizing the in-memory
>> capabilities of FiloDB which is pretty cool.  looking forward to the
>> webcast as I don't know much about FiloDB.
>>
>> My personal thoughts here are to removed Spark from the user
>> request/response hot path.
>>
>> I can't tell you how many times i've had to unroll that architecture at
>> clients - and replace with a real database like Cassandra, ElasticSearch,
>> HBase, MySql.
>>
>> Unfortunately, Spark - and Spark Streaming, especially - lead you to
>> believe that Spark could be used as an application server.  This is not a
>> good use case for Spark.
>>
>> Remember that every job that is launched by Spark requires 1 CPU core,
>> some memory, and an available Executor JVM to provide the CPU and memory.
>>
>> Yes, you can horizontally scale this because of the distributed nature of
>> Spark, however it is not an efficient scaling strategy.
>>
>> For example, if you're looking to scale out to 1000 concurrent requests,
>> this is 1000 concurrent Spark jobs.  This would require a cluster with 1000
>> cores.  this is just not cost effective.
>>
>> Use Spark for what it's good for - ad-hoc, interactive, and iterative
>> (machine learning, graph) analytics.  Use an application server for what
>> it's good - managing a large amount of concurrent requests.  And use a
>> database for what it's good for - storing/retrieving data.
>>
>> And any serious production deployment will need failover, throttling,
>> back pressure, auto-scaling, and service discovery.
>>
>> While Spark supports these to varying levels of production-readiness,
>> Spark is a batch-oriented system and not meant to be put on the user
>> request/response hot path.
>>
>> For the failover, throttling, back pressure, autoscaling that i mentioned
>> above, it's worth checking out the suite of Netflix OSS - particularly
>> Hystrix, Eureka, Zuul, Karyon, etc:  http://netflix.github.io/
>>
>> Here's my github project that incorporates a lot of these:
>> https://github.com/cfregly/fluxcapacitor
>>
>> Here's a netflix Skunkworks github project that packages these up in
>> Docker images:  https://github.com/Netflix-Skunkworks/zerotodocker
>>
>>
>> On Thu, Mar 10, 2016 at 1:40 PM, velvia.github <velvia.git...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I just wrote a blog post which might be really useful to you -- I have
>>> just
>>> benchmarked being able to achieve 700 queries per second in Spark.  So,
>>> yes,
>>> web speed SQL queries are definitely possible.   Read my new blog post:
>>>
>>> http://velvia.github.io/Spark-Concurrent-Fast-Queries/
>>>
>>> and feel free to email me (at vel...@gmail.com) if you would like to
>>> follow
>>> up.
>>>
>>> -Evan
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-use-spark-inside-a-web-service-tp26426p26451.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>>
>> *Chris Fregly*
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
>>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Can we use spark inside a web service?

2016-03-10 Thread Chris Fregly
Good stuff, Evan.  Looks like this is utilizing the in-memory capabilities
of FiloDB which is pretty cool.  looking forward to the webcast as I don't
know much about FiloDB.

My personal thoughts here are to removed Spark from the user
request/response hot path.

I can't tell you how many times i've had to unroll that architecture at
clients - and replace with a real database like Cassandra, ElasticSearch,
HBase, MySql.

Unfortunately, Spark - and Spark Streaming, especially - lead you to
believe that Spark could be used as an application server.  This is not a
good use case for Spark.

Remember that every job that is launched by Spark requires 1 CPU core, some
memory, and an available Executor JVM to provide the CPU and memory.

Yes, you can horizontally scale this because of the distributed nature of
Spark, however it is not an efficient scaling strategy.

For example, if you're looking to scale out to 1000 concurrent requests,
this is 1000 concurrent Spark jobs.  This would require a cluster with 1000
cores.  this is just not cost effective.

Use Spark for what it's good for - ad-hoc, interactive, and iterative
(machine learning, graph) analytics.  Use an application server for what
it's good - managing a large amount of concurrent requests.  And use a
database for what it's good for - storing/retrieving data.

And any serious production deployment will need failover, throttling, back
pressure, auto-scaling, and service discovery.

While Spark supports these to varying levels of production-readiness, Spark
is a batch-oriented system and not meant to be put on the user
request/response hot path.

For the failover, throttling, back pressure, autoscaling that i mentioned
above, it's worth checking out the suite of Netflix OSS - particularly
Hystrix, Eureka, Zuul, Karyon, etc:  http://netflix.github.io/

Here's my github project that incorporates a lot of these:
https://github.com/cfregly/fluxcapacitor

Here's a netflix Skunkworks github project that packages these up in Docker
images:  https://github.com/Netflix-Skunkworks/zerotodocker


On Thu, Mar 10, 2016 at 1:40 PM, velvia.github <velvia.git...@gmail.com>
wrote:

> Hi,
>
> I just wrote a blog post which might be really useful to you -- I have just
> benchmarked being able to achieve 700 queries per second in Spark.  So,
> yes,
> web speed SQL queries are definitely possible.   Read my new blog post:
>
> http://velvia.github.io/Spark-Concurrent-Fast-Queries/
>
> and feel free to email me (at vel...@gmail.com) if you would like to
> follow
> up.
>
> -Evan
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-use-spark-inside-a-web-service-tp26426p26451.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Using netlib-java in Spark 1.6 on linux

2016-03-04 Thread Chris Fregly
I have all of this pre-wired up and Docker-ized for your instant enjoyment
here:  https://github.com/fluxcapacitor/pipeline/wiki

You can review the Dockerfile
<https://github.com/fluxcapacitor/pipeline/blob/master/Dockerfile> for the
details (Ubuntu 14.04-based).

This is easy BREEZEy.

Also, here's a link to a related post from 1.5 yrs ago:
http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-native-netlib-java-OpenBLAS-td19662.html

Hit me up off-line if you want to dive into the details.  We can circle
back here if we find anything out of the ordinary.

On Thu, Mar 3, 2016 at 12:16 PM, cindymc <cindy.mcmul...@oracle.com> wrote:

> I have these on my system:
>
> /usr/lib64:$ /sbin/ldconfig -p | grep liblapack
> liblapack.so.3 (libc6,x86-64) => /usr/lib64/atlas/liblapack.so.3
> liblapack.so.3 (libc6,x86-64) =>
> /usr/lib64/atlas-sse3/liblapack.so.3
> liblapack.so.3 (libc6,x86-64) => /usr/lib64/liblapack.so.3
> liblapack.so.3 (libc6) => /usr/lib/atlas/liblapack.so.3
> liblapack.so.3 (libc6) => /usr/lib/liblapack.so.3
> liblapack.so (libc6,x86-64) => /usr/lib64/liblapack.so
> liblapack.so (libc6) => /usr/lib/liblapack.so
> /usr/lib64:$ /sbin/ldconfig -p | grep libblas
> libblas.so.3 (libc6,x86-64) => /usr/lib64/libblas.so.3
> libblas.so.3 (libc6) => /usr/lib/libblas.so.3
> libblas.so (libc6,x86-64) => /usr/lib64/libblas.so
> libblas.so (libc6) => /usr/lib/libblas.so
>
> And this in my  /etc/ld.so.conf:
> include ld.so.conf.d/*.conf
> /usr/lib64
>
> Then ran 'ldconfig'.
>
> I also have this:
> /usr/lib64:$ yum list libgfortran
> Loaded plugins: aliases, changelog, kabi, presto, refresh-packagekit,
> security, tmprepo, ulninfo, verify, versionlock
> Loading support for kernel ABI
> Installed Packages
> libgfortran.i686
> 4.4.7-16.el6  @base
> libgfortran.x86_64
> 4.4.7-16.el6  @anacond
>
> I've set:
> LD_LIBRARY_PATH=/usr/lib64
>
> Still no luck.  Suggestions?  Do I have to
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-netlib-java-in-Spark-1-6-on-linux-tp26386p26392.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Recommendation for a good book on Spark, beginner to moderate knowledge

2016-02-28 Thread Chris Fregly
for hands-on, check out the end-to-end reference data pipeline available
either from the github or docker repo's described here:

http://advancedspark.com/

i use these assets to training folks of all levels of Spark knowledge.

also, some relevant videos and slideshare presentations, but might be a bit
advanced for Spark n00bs.


On Sun, Feb 28, 2016 at 4:25 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> In my opinion the best way to learn something is trying it on the spot.
>
> As suggested if you have Hadoop, Hive and Spark installed and you are OK
> with SQL then you will have to focus on Scala and Spark pretty much.
>
> Your best bet is interactive work through Spark shell with Scala,
> understanding RDD, DataFrame, Transformation and actions. You also have
> online docs and a great number of users in this forum that can potentially
> help you with your questions. Buying books can help but nothing takes the
> place of getting your hands dirty so to speak.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 28 February 2016 at 22:32, Jules Damji <dmat...@comcast.net> wrote:
>
>> Hello Ashoka,
>>
>> "Learning Spark," from O'Reilly, is certainly a good start, and all basic
>> video tutorials from Spark Summit Training, "Spark Essentials", are
>> excellent supplementary materials.
>>
>> And the best (and most effective) way to teach yourself is really firing
>> up the spark-shell or pyspark and doing it yourself—immersing yourself by
>> trying all basic transformations and actions on RDDs, with contrived small
>> data sets.
>>
>> I've discovered that learning Scala & Python through their interactive
>> shell, where feedback is immediate and response is quick, as the best
>> learning experience.
>>
>> Same is true for Scala or Python Notebooks interacting with a Spark,
>> running in local or cluster mode.
>>
>> Cheers,
>>
>> Jules
>>
>> Sent from my iPhone
>> Pardon the dumb thumb typos :)
>>
>> On Feb 28, 2016, at 1:48 PM, Ashok Kumar <ashok34...@yahoo.com.INVALID
>> <ashok34...@yahoo.com.invalid>> wrote:
>>
>>   Hi Gurus,
>>
>> Appreciate if you recommend me a good book on Spark or documentation for
>> beginner to moderate knowledge
>>
>> I very much like to skill myself on transformation and action methods.
>>
>> FYI, I have already looked at examples on net. However, some of them not
>> clear at least to me.
>>
>> Warmest regards
>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Evaluating spark streaming use case

2016-02-21 Thread Chris Fregly
good catch on the cleaner.ttl

@jatin-  when you say "memory-persisted RDD", what do you mean exactly?  and 
how much data are you talking about?  remember that spark can evict these 
memory-persisted RDDs at any time.  they can be recovered from Kafka, but this 
is not a good situation to be in.

also, is this spark 1.6 with the new mapState() or the old updateStateByKey()?  
you definitely want the newer 1.6 mapState().

and is there any other way to store and aggregate this data outside of spark?  
I get a bit nervous when I see people treat spark/streaming like an in-memory 
database.

perhaps redis or another type of in-memory store is more appropriate.  or just 
write to long-term storage using parquet.

if this is a lot of data, you may want to use approximate probabilistic data 
structures like CountMin Sketch or HyperLogLog.  here's some relevant links 
with more info - including how to use these with redis: 

https://www.slideshare.net/cfregly/advanced-apache-spark-meetup-approximations-and-probabilistic-data-structures-jan-28-2016-galvanize

https://github.com/fluxcapacitor/pipeline/tree/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx

you can then setup a cron (or airflow) spark job to do the compute and 
aggregate against either redis or long-term storage.

this reference pipeline contains the latest airflow workflow scheduler:  
https://github.com/fluxcapacitor/pipeline/wiki

my advice with spark streaming is to get the data out of spark streaming as 
quickly as possible - and into a more durable format more suitable for 
aggregation and compute.  

this greatly simplifies your operational concerns, in my
opinion.

good question.  very common use case.

> On Feb 21, 2016, at 12:22 PM, Ted Yu  wrote:
> 
> w.r.t. cleaner TTL, please see:
> 
> [SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0
> 
> FYI
> 
>> On Sun, Feb 21, 2016 at 4:16 AM, Gerard Maas  wrote:
>> 
>> It sounds like another  window operation on top of the 30-min window will 
>> achieve the  desired objective. 
>> Just keep in mind that you'll need to set the clean TTL (spark.cleaner.ttl) 
>> to a long enough value and you will require enough resources (mem & disk) to 
>> keep the required data.  
>> 
>> -kr, Gerard.
>> 
>>> On Sun, Feb 21, 2016 at 12:54 PM, Jatin Kumar 
>>>  wrote:
>>> Hello Spark users,
>>> 
>>> I have to aggregate messages from kafka and at some fixed interval (say 
>>> every half hour) update a memory persisted RDD and run some computation. 
>>> This computation uses last one day data. Steps are:
>>> 
>>> - Read from realtime Kafka topic X in spark streaming batches of 5 seconds
>>> - Filter the above DStream messages and keep some of them
>>> - Create windows of 30 minutes on above DStream and aggregate by Key
>>> - Merge this 30 minute RDD with a memory persisted RDD say combinedRdd
>>> - Maintain last N such RDDs in a deque persisting them on disk. While 
>>> adding new RDD, subtract oldest RDD from the combinedRdd.
>>> - Final step consider last N such windows (of 30 minutes each) and do final 
>>> aggregation
>>> 
>>> Does the above way of using spark streaming looks reasonable? Is there a 
>>> better way of doing the above?
>>> 
>>> --
>>> Thanks
>>> Jatin
> 


Re: Communication between two spark streaming Job

2016-02-19 Thread Chris Fregly
if you need update notifications, you could introduce ZooKeeper (eek!) or a 
Kafka queue between the jobs.  

I've seen internal Kafka queues (relative to external spark streaming queues) 
used for this type of incremental update use case.

think of the updates as transaction logs.

> On Feb 19, 2016, at 10:35 PM, Ted Yu  wrote:
> 
> Have you considered using a Key Value store which is accessible to both jobs ?
> 
> The communication would take place through this store.
> 
> Cheers
> 
>> On Fri, Feb 19, 2016 at 11:48 AM, Ashish Soni  wrote:
>> Hi , 
>> 
>> Is there any way we can communicate across two different spark streaming job 
>> , as below is the scenario
>> 
>> we have two spark streaming job one to process metadata and one to process 
>> actual data ( this needs metadata ) 
>> 
>> So if someone did the metadata update we need to update the cache maintained 
>> in the second job so that it can take use of new metadata 
>> 
>> Please help 
>> 
>> Ashish
> 


Re: Allowing parallelism in spark local mode

2016-02-12 Thread Chris Fregly
sounds like the first job is occupying all resources.  you should limit the
resources that a single job can acquire.

fair scheduler is one way to do that.

a possibly simpler way is to configured spark.deploy.defaultCores or
spark.cores.max?

the defaults for these values - for the Spark default cluster resource
manager (aka Spark Standalone) - is infinite.  every job will try to
acquire every resource.

https://spark.apache.org/docs/latest/spark-standalone.html

here's an example config that i use for my reference data pipeline project:

https://github.com/fluxcapacitor/pipeline/blob/master/config/spark/spark-defaults.conf

i'm always playing with these values to simulate different conditions, but
that's the current snapshot that might be helpful.

also, don't forget about executor memory...


On Fri, Feb 12, 2016 at 1:40 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> You’ll want to setup the FAIR scheduler as described here:
> https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
>
> From: yael aharon <yael.aharo...@gmail.com>
> Date: Friday, February 12, 2016 at 2:00 PM
> To: "user@spark.apache.org" <user@spark.apache.org>
> Subject: Allowing parallelism in spark local mode
>
> Hello,
> I have an application that receives requests over HTTP and uses spark in
> local mode to process the requests. Each request is running in its own
> thread.
> It seems that spark is queueing the jobs, processing them one at a time.
> When 2 requests arrive simultaneously, the processing time for each of them
> is almost doubled.
> I tried setting spark.default.parallelism, spark.executor.cores,
> spark.driver.cores but that did not change the time in a meaningful way.
>
> Am I missing something obvious?
> thanks, Yael
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Do we need to enabled Tungsten sort in Spark 1.6?

2016-01-08 Thread Chris Fregly
Yeah, this confused me, as well.  Good question, Umesh.

As Ted pointed out:  between Spark 1.5 and 1.6,
o.a.s.shuffle.unsafe.UnsafeShuffleManager no longer exists as a separate
shuffle manager.  Here's the old code (notice the o.a.s.shuffle.unsafe
package):

https://github.com/apache/spark/blob/branch-1.5/core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

The functionality has essentially been rolled into
o.a.s.shuffle.sort.SortShuffleManager with the help of a Scala match/case
statement.  Here's the newer code (notice the o.a.s.shuffle.unsafe package
is gone):

https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala


On Fri, Jan 8, 2016 at 1:14 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> For "spark.shuffle.manager", the default is "sort"
> From core/src/main/scala/org/apache/spark/SparkEnv.scala :
>
> val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
>
> "tungsten-sort" is the same as "sort" :
>
> val shortShuffleMgrNames = Map(
>   "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
>   "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
>   "tungsten-sort" ->
> "org.apache.spark.shuffle.sort.SortShuffleManager")
>
> FYI
>
> On Fri, Jan 8, 2016 at 12:59 PM, Umesh Kacha <umesh.ka...@gmail.com>
> wrote:
>
>> ok thanks so it will be enabled by default always if yes then in
>> documentation why default shuffle manager is mentioned as sort?
>>
>> On Sat, Jan 9, 2016 at 1:55 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> From
>>> sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala :
>>>
>>> case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
>>>   val runFunc = (sqlContext: SQLContext) => {
>>> logWarning(
>>>   s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is
>>> deprecated and " +
>>> s"will be ignored. Tungsten will continue to be used.")
>>> Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
>>>   }
>>>
>>> FYI
>>>
>>> On Fri, Jan 8, 2016 at 12:21 PM, unk1102 <umesh.ka...@gmail.com> wrote:
>>>
>>>> Hi I was using Spark 1.5 with Tungsten sort and now I have using Spark
>>>> 1.6 I
>>>> dont see any difference I was expecting Spark 1.6 to be faster. Anyways
>>>> do
>>>> we need to enable Tunsten and unsafe options or they are enabled by
>>>> default
>>>> I see in documentation that default sort manager is sort I though it is
>>>> Tungsten no? Please guide.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Do-we-need-to-enabled-Tungsten-sort-in-Spark-1-6-tp25923.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Date Time Regression as Feature

2016-01-08 Thread Chris Fregly
Here's a good blog post by Sandy Ryza @ Cloudera on Spark + Time Series
Data:
http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/

Might give you some things to try.

On Thu, Jan 7, 2016 at 11:40 PM, dEEPU <mdeep...@hotmail.com> wrote:

> Maybe u want to convert the date to a duration in form of number of
> hours/days and then do calculation on it
> On Jan 8, 2016 12:39 AM, Jorge Machado <jorge.w.mach...@hotmail.com>
> wrote:
> Hello all,
>
> I'm new to machine learning. I'm trying to predict some electric usage
> with a decision  Free
> The data is :
> 2015-12-10-10:00, 1200
> 2015-12-11-10:00, 1150
>
> My question is : What is the best way to turn date and time into feature
> on my Vector ?
>
> Something like this :  Vector (1200, [2015,12,10,10,10] )?
> I could not fine any example with value prediction where features had
> dates in it.
>
> Thanks
>
> Jorge Machado
>
> Jorge Machado
> jo...@jmachado.me
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: how to extend java transformer from Scala UnaryTransformer ?

2016-01-02 Thread Chris Fregly
Looks like you're not registering the input param correctly.

Below are examples from the Spark Java source that show how to build a
custom transformer.  Note that a Model is a Transformer.

Also, that chimpler/wordpress/naive bayes example is a bit dated.  I tried
to implement it a while ago, but didn't get very far given that the ML API
has charged ahead in favor of pipelines and the new spark.ml package.


https://github.com/apache/spark/blob/branch-1.5/examples/src/main/java/org/apache/spark/examples/ml/JavaDeveloperApiExample.java#L126

https://github.com/apache/spark/blob/branch-1.5/examples/src/main/java/org/apache/spark/examples/ml/JavaDeveloperApiExample.java#L191



On Fri, Jan 1, 2016 at 11:38 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> I am trying to write a trivial transformer I use use in my pipeline. I am
> using java and spark 1.5.2. It was suggested that I use the Tokenize.scala
> class as an example. This should be very easy how ever I do not understand
> Scala, I am having trouble debugging the following exception.
>
> Any help would be greatly appreciated.
>
> Happy New Year
>
> Andy
>
> java.lang.IllegalArgumentException: requirement failed: Param
> null__inputCol does not belong to
> Stemmer_2f3aa96d-7919-4eaa-ad54-f7c620b92d1c.
> at scala.Predef$.require(Predef.scala:233)
> at org.apache.spark.ml.param.Params$class.shouldOwn(params.scala:557)
> at org.apache.spark.ml.param.Params$class.set(params.scala:436)
> at org.apache.spark.ml.PipelineStage.set(Pipeline.scala:37)
> at org.apache.spark.ml.param.Params$class.set(params.scala:422)
> at org.apache.spark.ml.PipelineStage.set(Pipeline.scala:37)
> at org.apache.spark.ml.UnaryTransformer.setInputCol(Transformer.scala:83)
> at com.pws.xxx.ml.StemmerTest.test(StemmerTest.java:30)
>
>
>
> public class StemmerTest extends AbstractSparkTest {
>
> @Test
>
> public void test() {
>
> Stemmer stemmer = new Stemmer()
>
> .setInputCol("raw”) //*line 30*
>
> .setOutputCol("filtered");
>
> }
>
> }
>
>
> /**
>
>  * @ see spark-1.5.1/mllib/src/main/scala/org/apache
> /spark/ml/feature/Tokenizer.scala
>
>  * @ see https://chimpler.wordpress.com/2014/06/11/classifiying-documents-
> using-naive-bayes-on-apache-spark-mllib/
>
>  * @ see http://www.tonytruong.net/movie-rating-prediction-with-apache-
> spark-and-hortonworks/
>
>  *
>
>  * @author andrewdavidson
>
>  *
>
>  */
>
> public class Stemmer extends UnaryTransformer<List, List,
> Stemmer> implements Serializable{
>
> static Logger logger = LoggerFactory.getLogger(Stemmer.class);
>
> private static final long serialVersionUID = 1L;
>
> private static final  ArrayType inputType =
> DataTypes.createArrayType(DataTypes.StringType, true);
>
> private final String uid = Stemmer.class.getSimpleName() + "_" +
> UUID.randomUUID().toString();
>
>
> @Override
>
> public String uid() {
>
> return uid;
>
> }
>
>
> /*
>
>override protected def validateInputType(inputType: DataType):
> Unit = {
>
> require(inputType == StringType, s"Input type must be string type but
> got $inputType.")
>
>   }
>
>  */
>
> @Override
>
> public void validateInputType(DataType inputTypeArg) {
>
> String msg = "inputType must be " + inputType.simpleString() + "
> but got " + inputTypeArg.simpleString();
>
> assert (inputType.equals(inputTypeArg)) : msg;
>
> }
>
>
>
> @Override
>
> public Function1<List, List> createTransformFunc() {
>
> //
> http://stackoverflow.com/questions/6545066/using-scala-from-java-passing-functions-as-parameters
>
> Function1<List, List> f = new
> AbstractFunction1<List, List>() {
>
>     public List apply(List words) {
>
> for(String word : words) {
>
> logger.error("AEDWIP input word: {}", word);
>
> }
>
> return words;
>
> }
>
> };
>
>
>
> return f;
>
> }
>
>
> @Override
>
> public DataType outputDataType() {
>
> return DataTypes.createArrayType(DataTypes.StringType, true);
>
> }
>
> }
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: How to specify the numFeatures in HashingTF

2016-01-02 Thread Chris Fregly
You can use CrossValidator/TrainingValidationSplit with ParamGridBuilder
and Evaluator to empirically choose the model hyper parameters (ie.
numFeatures) per the following:

http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-cross-validation

http://spark.apache.org/docs/latest/ml-guide.html#example-model-selection-via-train-validation-split

On Fri, Jan 1, 2016 at 7:48 AM, Yanbo Liang <yblia...@gmail.com> wrote:

> You can refer the following code snippet to set numFeatures for HashingTF:
>
> val hashingTF = new HashingTF()
>   .setInputCol("words")
>   .setOutputCol("features")
>   .setNumFeatures(n)
>
>
> 2015-10-16 0:17 GMT+08:00 Nick Pentreath <nick.pentre...@gmail.com>:
>
>> Setting the numfeatures higher than vocab size will tend to reduce the
>> chance of hash collisions, but it's not strictly necessary - it becomes a
>> memory / accuracy trade off.
>>
>> Surprisingly, the impact on model performance of moderate hash collisions
>> is often not significant.
>>
>> So it may be worth trying a few settings out (lower than vocab, higher
>> etc) and see what the impact is on evaluation metrics.
>>
>> —
>> Sent from Mailbox <https://www.dropbox.com/mailbox>
>>
>>
>> On Thu, Oct 15, 2015 at 5:46 PM, Jianguo Li <flyingfromch...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> There is a parameter in the HashingTF called "numFeatures". I was
>>> wondering what is the best way to set the value to this parameter. In the
>>> use case of text categorization, do you need to know in advance the number
>>> of words in your vocabulary? or do you set it to be a large value, greater
>>> than the number of words in your vocabulary?
>>>
>>> Thanks,
>>>
>>> Jianguo
>>>
>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Run ad-hoc queries at runtime against cached RDDs

2015-12-30 Thread Chris Fregly
There are a few diff ways to apply approximation algorithms and
probabilistic data structures to your Spark data - including Spark's
countApproxDistinct() methods as you pointed out.

There's also Twitter Algebird, and Redis HyperLogLog (PFCOUNT, PFADD).

Here's some examples from my *pipeline Github project*
<https://github.com/fluxcapacitor/pipeline/wiki> that demonstrates how to
use these in a streaming context - if that's interesting to you, at all:

*1) Algebird CountMin Sketch*

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/AlgebirdCountMinSketchTopK.scala

*2) Algebird HyperLogLog*

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/AlgebirdHyperLogLog.scala

*3) Redis HyperLogLog*

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/approx/RedisHyperLogLog.scala

In addition, my *Global Advanced Apache Spark Meetup* (based in SF) has an
entire evening dedicated to this exact topic next month:

http://www.meetup.com/Advanced-Apache-Spark-Meetup/events/22616/.

Video, slides, and live streaming urls will be available the day of the
meetup.


On Mon, Dec 14, 2015 at 12:20 PM, Krishna Rao <krishnanj...@gmail.com>
wrote:

> Thanks for the response Jörn. So to elaborate, I have a large dataset with
> userIds, each tagged with a property, e.g.:
>
> user_1prop1=X
> user_2prop1=Yprop2=A
> user_3prop2=B
>
>
> I would like to be able to get the number of distinct users that have a
> particular property (or combination of properties). The cardinality of each
> property is in the 1000s and will only grow, as will the number of
> properties. I'm happy with approximate values to trade accuracy for
> performance.
>
> Spark's performance when doing this via spark-shell is more that excellent
> using the "countApproxDistinct" method on a "javaRDD". However, I've no
> idea what's the best way to be able to run a query programatically like I
> can do manually via spark-shell.
>
> Hope this clarifies things.
>
>
> On 14 December 2015 at 17:04, Jörn Franke <jornfra...@gmail.com> wrote:
>
>> Can you elaborate a little bit more on the use case? It looks a little
>> bit like an abuse of Spark in general . Interactive queries that are not
>> suitable for in-memory batch processing might be better supported by ignite
>> that has in-memory indexes, concept of hot, warm, cold data etc. or hive on
>> tez+llap .
>>
>> > On 14 Dec 2015, at 17:19, Krishna Rao <krishnanj...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > What's the best way to run ad-hoc queries against a cached RDDs?
>> >
>> > For example, say I have an RDD that has been processed, and persisted
>> to memory-only. I want to be able to run a count (actually
>> "countApproxDistinct") after filtering by an, at compile time, unknown
>> (specified by query) value.
>> >
>> > I've experimented with using (abusing) Spark Streaming, by streaming
>> queries and running these against the cached RDD. However, as I say I don't
>> think that this is an intended use-case of Streaming.
>> >
>> > Cheers,
>> >
>> > Krishna
>>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: 回复: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-30 Thread Chris Fregly
@Jim-

I'm wondering if those docs are outdated as its my understanding (please 
correct if I'm wrong), that we should never be seeing OOMs as 1.5/Tungsten not 
only improved (reduced) the memory footprint of our data, but also introduced 
better task level - and even key level - external spilling before an OOM occurs.

Michael's comment seems to indicate there was a missed case that is fixed in 
1.6.

I personally see far too many OOMs for what I expected out of 1.5, so I'm 
anxious to try 1.6 and hopefully squash more of these edge cases.

while increasing parallelism is definitely a best practice and applies either 
way, the docs could use some updating, I feel, by the contributors of this code.

> On Dec 30, 2015, at 12:18 PM, SparkUser  wrote:
> 
> Sounds like you guys are on the right track, this is purely FYI because I 
> haven't seen it posted, just responding to the line in the original post that 
> your data structure should fit in memory.
> 
> OK two more disclaimers "FWIW" and "maybe this is not relevant or already 
> covered" OK here goes...
> 
>  from 
> http://spark.apache.org/docs/latest/tuning.html#memory-usage-of-reduce-tasks
> 
> Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit 
> in memory, but because the working set of one of your tasks, such as one of 
> the reduce tasks in groupByKey, was too large. Spark’s shuffle operations 
> (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within 
> each task to perform the grouping, which can often be large. The simplest fix 
> here is to increase the level of parallelism, so that each task’s input set 
> is smaller. Spark can efficiently support tasks as short as 200 ms, because 
> it reuses one executor JVM across many tasks and it has a low task launching 
> cost, so you can safely increase the level of parallelism to more than the 
> number of cores in your clusters.
> 
> I would be curious if that helps at all. Sounds like an interesting problem 
> you are working on.
> 
> Jim
> 
>> On 12/29/2015 05:51 PM, Davies Liu wrote:
>> Hi Andy,  
>> 
>> Could you change logging level to INFO and post some here? There will be 
>> some logging about the memory usage of a task when OOM.  
>> 
>> In 1.6, the memory for a task is : (HeapSize  - 300M) * 0.75 / number of 
>> tasks. Is it possible that the heap is too small?
>> 
>> Davies  
>> 
>> --  
>> Davies Liu
>> Sent with Sparrow (http://www.sparrowmailapp.com/?sig)
>> 
>> 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)  
>> 
>> 在 2015年12月29日 星期二,下午4:28,Andy Davidson 写道:
>> 
>>> Hi Michael
>>>  
>>> https://github.com/apache/spark/archive/v1.6.0.tar.gz
>>>  
>>> Both 1.6.0 and 1.5.2 my unit test work when I call reparation(1) before 
>>> saving output. Coalesce still fails.  
>>>  
>>> Coalesce(1) spark-1.5.2
>>> Caused by:
>>> java.io.IOException: Unable to acquire 33554432 bytes of memory
>>>  
>>>  
>>> Coalesce(1) spark-1.6.0
>>>  
>>> Caused by:  
>>> java.lang.OutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
>>>  
>>> Hope this helps
>>>  
>>> Andy
>>>  
>>> From: Michael Armbrust >> (mailto:mich...@databricks.com)>
>>> Date: Monday, December 28, 2015 at 2:41 PM
>>> To: Andrew Davidson >> (mailto:a...@santacruzintegration.com)>
>>> Cc: "user @spark" 
>>> Subject: Re: trouble understanding data frame memory usage 
>>> ³java.io.IOException: Unable to acquire memory²
>>>  
 Unfortunately in 1.5 we didn't force operators to spill when ran out of 
 memory so there is not a lot you can do. It would be awesome if you could 
 test with 1.6 and see if things are any better?
  
 On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson 
  
 wrote:
> I am using spark 1.5.1. I am running into some memory problems with a 
> java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how 
> ever I want to better understand what is going on so I can write better 
> code in the future. The test runs on a Mac, master="Local[2]"
>  
> I have a java unit test that starts by reading a 672K ascii file. I my 
> output data file is 152K. Its seems strange that such a small amount of 
> data would cause an out of memory exception. I am running a pretty 
> standard machine learning process
>  
> Load data
> create a ML pipeline
> transform the data
> Train a model
> Make predictions
> Join the predictions back to my original data set
> Coalesce(1), I only have a small amount of data and want to save it in a 
> single file
> Save final results back to disk
>  
>  
> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to 
> acquire memory”
>  
> To try and figure out what is going I put log messages in to count 

Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Chris Fregly
are the credentials visible from each Worker node to all the Executor JVMs on 
each Worker?

> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Dear Spark community,
> 
> I faced the following issue with trying accessing data on S3a, my code is the 
> following:
> 
> val sparkConf = new SparkConf()
> 
> val sc = new SparkContext(sparkConf)
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
> val sqlContext = SQLContext.getOrCreate(sc)
> val df = sqlContext.read.parquet(...)
> df.count
> 
> It results in the following exception and log messages:
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from BasicAWSCredentialsProvider: Access key or secret key is null
> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
> metadata service at URL: 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from InstanceProfileCredentialsProvider: The requested metadata 
> is not found at http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
> provider in the chain
>   at 
> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
> 
> I run standalone spark 1.5.2 and using hadoop 2.7.1
> 
> any ideas/workarounds?
> 
> AWS credentials are correct for this bucket
> 
> Thank you,
> Konstantin Kudryavtsev


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Chris Fregly
couple things:

1) switch to IAM roles if at all possible - explicitly passing AWS
credentials is a long and lonely road in the end

2) one really bad workaround/hack is to run a job that hits every worker
and writes the credentials to the proper location (~/.awscredentials or
whatever)

^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle
autoscaling, but i'm mentioning it anyway as it is a temporary fix.

if you switch to IAM roles, things become a lot easier as you can authorize
all of the EC2 instances in the cluster - and handles autoscaling very well
- and at some point, you will want to autoscale.

On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev <
kudryavtsev.konstan...@gmail.com> wrote:

> Chris,
>
>  good question, as you can see from the code I set up them on driver, so I
> expect they will be propagated to all nodes, won't them?
>
> Thank you,
> Konstantin Kudryavtsev
>
> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly <ch...@fregly.com> wrote:
>
>> are the credentials visible from each Worker node to all the Executor
>> JVMs on each Worker?
>>
>> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev <
>> kudryavtsev.konstan...@gmail.com> wrote:
>>
>> Dear Spark community,
>>
>> I faced the following issue with trying accessing data on S3a, my code is
>> the following:
>>
>> val sparkConf = new SparkConf()
>>
>> val sc = new SparkContext(sparkConf)
>> sc.hadoopConfiguration.set("fs.s3a.impl", 
>> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
>> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>>
>> val sqlContext = SQLContext.getOrCreate(sc)
>>
>> val df = sqlContext.read.parquet(...)
>>
>> df.count
>>
>>
>> It results in the following exception and log messages:
>>
>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>> credentials from BasicAWSCredentialsProvider: *Access key or secret key is 
>> null*
>> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
>> metadata service at URL: 
>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>> 15/12/30 <http://x.x.x.x/latest/meta-data/iam/security-credentials/15/12/30> 
>> 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load credentials from 
>> InstanceProfileCredentialsProvider: The requested metadata is not found at 
>> http://x.x.x.x/latest/meta-data/iam/security-credentials/
>> 15/12/30 <http://x.x.x.x/latest/meta-data/iam/security-credentials/15/12/30> 
>> 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
>> com.amazonaws.AmazonClientException: Unable to load AWS credentials from any 
>> provider in the chain
>>  at 
>> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>>  at 
>> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>>  at 
>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
>>
>>
>> I run standalone spark 1.5.2 and using hadoop 2.7.1
>>
>> any ideas/workarounds?
>>
>> AWS credentials are correct for this bucket
>>
>> Thank you,
>> Konstantin Kudryavtsev
>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Using Experminal Spark Features

2015-12-30 Thread Chris Fregly
A lot of folks are using the new Kafka Direct Stream API in production.

And a lot of folks who used the old Kafka Receiver-based API are migrating
over.

The usual downside to "Experimental" features in Spark is that the API
might change, so you'll need to rewrite some code.

Stability-wise, the Spark codebase has a TON of tests around every new
feature - including experimental features.

>From experience, the Kafka Direct Stream API is very stable and a lot of
momentum is behind this implementation.

Check out my *pipeline* Github project to see this impl fully configured
and working within a Docker instance:

https://github.com/fluxcapacitor/pipeline/wiki

Here's a link to the kafka, kafka-rest-api, and kafka schema registry
configuration:  https://github.com/fluxcapacitor/pipeline/tree/master/config

And here's a link to a sample app that uses the Kafka Direct API and stores
data in Cassandra:

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/src/main/scala/com/advancedspark/streaming/rating/store/Cassandra.scala

https://github.com/fluxcapacitor/pipeline/blob/master/myapps/streaming/start-streaming-ratings-cassandra.sh

Here's a link to the Docker image which contains the installation scripts
for Confluent's Kafka Distribution:

https://github.com/fluxcapacitor/pipeline/blob/master/Dockerfile

All code is in Scala.

On Wed, Dec 30, 2015 at 11:26 AM, David Newberger <
david.newber...@wandcorp.com> wrote:

> Hi All,
>
>
>
> I’ve been looking at the Direct Approach for streaming Kafka integration (
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html)
> because it looks like a good fit for our use cases. My concern is the
> feature is experimental according to the documentation. Has anyone used
> this approach yet and if so what has you experience been with using it? If
> it helps we’d be looking to implement it using Scala. Secondly, in general
> what has people experience been with using experimental features in Spark?
>
>
>
> Cheers,
>
>
>
> David Newberger
>
>
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: SparkSQL Hive orc snappy table

2015-12-30 Thread Chris Fregly
; at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1466)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.IndexOutOfBoundsException: Index: 0
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1016)
>> ... 48 more
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 0
>> at java.util.Collections$EmptyList.get(Collections.java:4454)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcProto$Type.getSubtypes(OrcProto.java:12240)
>> at
>> org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getColumnIndicesFromNames(ReaderImpl.java:651)
>> at
>> org.apache.hadoop.hive.ql.io.orc.ReaderImpl.getRawDataSizeOfColumns(ReaderImpl.java:634)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.populateAndCacheStripeDetails(OrcInputFormat.java:927)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:836)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat$SplitGenerator.call(OrcInputFormat.java:702)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I will be glad for any help on that matter.
>>
>> Regards
>> Dawid Wysakowicz
>>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Problem with WINDOW functions?

2015-12-29 Thread Chris Fregly
on quick glance, it appears that you're calling collect() in there which is 
bringing down a huge amount of data down to the single Driver.  this is why, 
when you allocated more memory to the Driver, a different error emerges most 
-definitely related to stop-the-world GC to cause the node to become 
unresponsive.

in general, collect() is bad and should only be used on small datasets for 
debugging or sanity-check purposes.  anything serious should be done within the 
executors running on worker nodes - not on the Driver node.

think of the Driver not as simply a coordinator - coordinating and allocating 
tasks to workers with the help of the cluster resource manager (i.e. YARN, 
Spark Standalone, Mesos, etc)

very little memory should be allocated to the Driver.  just enough to 
coordinate and a little extra for Driver-side debugging, but that's it.  leave 
the rest up to the cluster nodes.

> On Dec 29, 2015, at 8:28 PM, vadimtk  wrote:
> 
> table
> 
> 
> 
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame Vs RDDs ... Which one to use When ?

2015-12-28 Thread Chris Fregly
here's a good article that sums it up, in my opinion:

https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/

basically, building apps with RDDs is like building with apps with
primitive JVM bytecode.  haha.

@richard:  remember that even if you're currently writing RDDs in
Java/Scala, you're not gaining the code gen/rewrite performance benefits of
the Catalyst optimizer.

i agree with @daniel who suggested that you start with DataFrames and
revert to RDDs only when DataFrames don't give you what you need.

the only time i use RDDs directly these days is when i'm dealing with a
Spark library that has not yet moved to DataFrames - ie. GraphX - and it's
kind of annoying switching back and forth.

almost everything you need should be in the DataFrame API.

Datasets are similar to RDDs, but give you strong compile-time typing,
tabular structure, and Catalyst optimizations.

hopefully Datasets is the last API we see from Spark SQL...  i'm getting
tired of re-writing slides and book chapters!  :)

On Mon, Dec 28, 2015 at 4:55 PM, Richard Eggert <richard.egg...@gmail.com>
wrote:

> One advantage of RDD's over DataFrames is that RDD's allow you to use your
> own data types, whereas DataFrames are backed by RDD's of Record objects,
> which are pretty flexible but don't give you much in the way of
> compile-time type checking. If you have an RDD of case class elements or
> JSON, then Spark SQL can automatically figure out how to convert it into an
> RDD of Record objects (and therefore a DataFrame), but there's no way to
> automatically go the other way (from DataFrame/Record back to custom types).
>
> In general, you can ultimately do more with RDDs than DataFrames, but
> DataFrames give you a lot of niceties (automatic query optimization, table
> joins, SQL-like syntax, etc.) for free, and can avoid some of the runtime
> overhead associated with writing RDD code in a non-JVM language (such as
> Python or R), since the query optimizer is effectively creating the
> required JVM code under the hood. There's little to no performance benefit
> if you're already writing Java or Scala code, however (and RDD-based code
> may actually perform better in some cases, if you're willing to carefully
> tune your code).
>
> On Mon, Dec 28, 2015 at 3:05 PM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> DataFrames are a higher level API for working with tabular data - RDDs
>> are used underneath. You can use either and easily convert between them in
>> your code as necessary.
>>
>> DataFrames provide a nice abstraction for many cases, so it may be easier
>> to code against them. Though if you're used to thinking in terms of
>> collections rather than tables, you may find RDDs more natural. Data frames
>> can also be faster, since Spark will do some optimizations under the hood -
>> if you are using PySpark, this will avoid the overhead. Data frames may
>> also perform better if you're reading structured data, such as a Hive table
>> or Parquet files.
>>
>> I recommend you prefer data frames, switching over to RDDs as necessary
>> (when you need to perform an operation not supported by data frames / Spark
>> SQL).
>>
>> HOWEVER (and this is a big one), Spark 1.6 will have yet another API -
>> datasets. The release of Spark 1.6 is currently being finalized and I would
>> expect it in the next few days. You will probably want to use the new API
>> once it's available.
>>
>>
>> On Sun, Dec 27, 2015 at 9:18 PM, Divya Gehlot <divya.htco...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am new bee to spark and a bit confused about RDDs and DataFames in
>>> Spark.
>>> Can somebody explain me with the use cases which one to use when ?
>>>
>>> Would really appreciate the clarification .
>>>
>>> Thanks,
>>> Divya
>>>
>>
>>
>
>
> --
> Rich
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Help: Driver OOM when shuffle large amount of data

2015-12-28 Thread Chris Fregly
which version of spark is this?

is there any chance that a single key - or set of keys- key has a large number 
of values relative to the other keys (aka. skew)?

if so, spark 1.5 *should* fix this issue with the new tungsten stuff, although 
I had some issues still with 1.5.1 in a similar situation.

I'm waiting to test with 1.6.0 before I start asking/creating jiras.

> On Dec 28, 2015, at 5:23 AM, Eugene Morozov  
> wrote:
> 
> Kendal, 
> 
> have you tried to reduce number of partitions?
> 
> --
> Be well!
> Jean Morozov
> 
>> On Mon, Dec 28, 2015 at 9:02 AM, kendal  wrote:
>> My driver is running OOM with my 4T data set... I don't collect any data to
>> driver. All what the program done is map - reduce - saveAsTextFile. But the
>> partitions to be shuffled is quite large - 20K+.
>> 
>> The symptom what I'm seeing the timeout when GetMapOutputStatuses from
>> Driver.
>> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Don't have map outputs
>> for shuffle 0, fetching them
>> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>> tracker endpoint =
>> AkkaRpcEndpointRef(Actor[akka.tcp://sparkDriver@10.115.58.55:52077/user/MapOutputTracker#-1937024516])
>> 15/12/24 02:06:21 WARN akka.AkkaRpcEndpointRef: Error sending message
>> [message = GetMapOutputStatuses(0)] in 1 attempts
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>> at
>> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
>> 
>> But the root cause is OOM:
>> 15/12/24 02:05:36 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.remote.default-remote-dispatcher-24] shutting down
>> ActorSystem [sparkDriver]
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>> at
>> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:131)
>> at
>> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
>> at
>> akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:718)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> 
>> I've already allocated 16G memory for my driver - which is the hard limit
>> MAX of my Yarn cluster. And I also applied Kryo serialization... Any idea to
>> reduce memory foot point?
>> And what confuses me is that, even I have 20K+ partition to shuffle, why I
>> need so much memory?!
>> 
>> Thank you so much for any help!
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Help-Driver-OOM-when-shuffle-large-amount-of-data-tp25818.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: trouble understanding data frame memory usage ³java.io.IOException: Unable to acquire memory²

2015-12-28 Thread Chris Fregly
the 200 number looks strangely similar to the following default number of
post-shuffle partitions which is often left untuned:

  spark.sql.shuffle.partitions

here's the property defined in the Spark source:

https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L232

note that Spark 1.6+ will make this config param obsolete in favor of
adaptive execution which uses the following as a low watermark for # of
post-shuffle partitions:

spark.sql.adaptive.minNumPostShufflePartitions

here's the property defined in the Spark source:

https://github.com/apache/spark/blob/834e71489bf560302f9d743dff669df1134e9b74/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L245

On Mon, Dec 28, 2015 at 5:41 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Unfortunately in 1.5 we didn't force operators to spill when ran out of
> memory so there is not a lot you can do.  It would be awesome if you could
> test with 1.6 and see if things are any better?
>
> On Mon, Dec 28, 2015 at 2:25 PM, Andy Davidson <
> a...@santacruzintegration.com> wrote:
>
>> I am using spark 1.5.1. I am running into some memory problems with a
>> java unit test. Yes I could fix it by setting –Xmx (its set to 1024M) how
>> ever I want to better understand what is going on so I can write better
>> code in the future. The test runs on a Mac, master="Local[2]"
>>
>> I have a java unit test that starts by reading a 672K ascii file. I my
>> output data file is 152K. Its seems strange that such a small amount of
>> data would cause an out of memory exception. I am running a pretty standard
>> machine learning process
>>
>>
>>1. Load data
>>2. create a ML pipeline
>>3. transform the data
>>4. Train a model
>>5. Make predictions
>>6. Join the predictions back to my original data set
>>7. Coalesce(1), I only have a small amount of data and want to save
>>it in a single file
>>8. Save final results back to disk
>>
>>
>> Step 7: I am unable to call Coalesce() “java.io.IOException: Unable to
>> acquire memory”
>>
>> To try and figure out what is going I put log messages in to count the
>> number of partitions
>>
>> Turns out I have 20 input files, each one winds up in a separate
>> partition. Okay so after loading I call coalesce(1) and check to make sure
>> I only have a single partition.
>>
>> The total number of observations is 1998.
>>
>> After calling step 7 I count the number of partitions and discovered I
>> have 224 partitions!. Surprising given I called Coalesce(1) before I
>> did anything with the data. My data set should easily fit in memory. When I
>> save them to disk I get 202 files created with 162 of them being empty!
>>
>> In general I am not explicitly using cache.
>>
>> Some of the data frames get registered as tables. I find it easier to use
>> sql.
>>
>> Some of the data frames get converted back to RDDs. I find it easier to
>> create RDD this way
>>
>> I put calls to unpersist(true). In several places
>>
>>private void memoryCheck(String name) {
>>
>> Runtime rt = Runtime.getRuntime();
>>
>> logger.warn("name: {} \t\ttotalMemory: {} \tfreeMemory: {}
>> df.size: {}",
>>
>> name,
>>
>> String.format("%,d", rt.totalMemory()),
>>
>> String.format("%,d", rt.freeMemory()));
>>
>> }
>>
>> Any idea how I can get a better understanding of what is going on? My
>> goal is to learn to write better spark code.
>>
>> Kind regards
>>
>> Andy
>>
>> Memory usages at various points in my unit test
>>
>> name: rawInput totalMemory:   447,741,952 freeMemory:   233,203,184
>>
>> name: naiveBayesModel totalMemory:   509,083,648 freeMemory:
>> 403,504,128
>>
>> name: lpRDD totalMemory:   509,083,648 freeMemory:   402,288,104
>>
>> name: results totalMemory:   509,083,648 freeMemory:   368,011,008
>>
>>
>>DataFrame exploreDF = results.select(results.col("id"),
>>
>> results.col("label"),
>>
>> results.col("binomialLabel"),
>>
>>
>> results.col("labelIndex"),
>>
>>
>> results.col("prediction"),
>>
>>
>>    

Re: fishing for help!

2015-12-25 Thread Chris Fregly
note that with AWS, you can use Placement Groups
<http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/placement-groups.html>
and EC2 instances with Enhanced Networking
<http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/enhanced-networking.html>
to
lower network latency and increase network throughput within the same AZ
(data center).

On Tue, Dec 22, 2015 at 12:11 AM, Eran Witkon <eranwit...@gmail.com> wrote:

> I'll check it out.
>
> On Tue, 22 Dec 2015 at 00:30 Michal Klos <michal.klo...@gmail.com> wrote:
>
>> If you are running on Amazon, then it's always a crapshoot as well.
>>
>> M
>>
>> On Dec 21, 2015, at 4:41 PM, Josh Rosen <joshro...@databricks.com> wrote:
>>
>> @Eran, are Server 1 and Server 2 both part of the same cluster / do they
>> have similar positions in the network topology w.r.t the Spark executors?
>> If Server 1 had fast network access to the executors but Server 2 was
>> across a WAN then I'd expect the job to run slower from Server 2 duet to
>> the extra network latency / reduced bandwidth. This is assuming that you're
>> running the driver in non-cluster deploy mode (so the driver process runs
>> on the machine which submitted the job).
>>
>> On Mon, Dec 21, 2015 at 1:30 PM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> look for differences: packages versions, cpu/network/memory diff etc etc
>>>
>>>
>>> On 21 December 2015 at 14:53, Eran Witkon <eranwit...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I know it is a wide question but can you think of reasons why a pyspark
>>>> job which runs on from server 1 using user 1 will run faster then the same
>>>> job when running on server 2 with user 1
>>>> Eran
>>>>
>>>
>>>
>>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Memory allocation for Broadcast values

2015-12-25 Thread Chris Fregly
Note that starting with Spark 1.6, memory can be dynamically allocated by
the Spark execution engine based on workload heuristics.

You can still set a low watermark for the spark.storage.memoryFraction (RDD
cache), but the rest can be dynamic.

Here's some relevant slides from a recent presentation I did in Toronto:
http://www.slideshare.net/cfregly/toronto-spark-meetup-dec-14-2015 (slide
63+)

This adaptiveness will be a continual theme with Spark moving forward.

For example, Spark 1.6 includes adaptive query execution, as well,
including an adaptive, hybrid join that can broadcast just the hot, popular
keys using BroadcastHashJoin, but use the regular ShuffleHashJoin for
low-medium popular keys - within the same Job - and changing from stage to
stage.


On Wed, Dec 23, 2015 at 2:12 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> If you are creating a huge map on the driver, then spark.driver.memory
> should be set to a higher value to hold your map. Since you are going to
> broadcast this map, your spark executors must have enough memory to hold
> this map as well which can be set using the spark.executor.memory, and
> spark.storage.memoryFraction configurations.
>
> Thanks
> Best Regards
>
> On Mon, Dec 21, 2015 at 5:50 AM, Pat Ferrel <p...@occamsmachete.com> wrote:
>
>> I have a large Map that is assembled in the driver and broadcast to each
>> node.
>>
>> My question is how best to allocate memory for this.  The Driver has to
>> have enough memory for the Maps, but only one copy is serialized to each
>> node. What type of memory should I size to match the Maps? Is the broadcast
>> Map taking a little from each executor, all from every executor, or is
>> there something other than driver and executor memory I can size?
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: error while defining custom schema in Spark 1.5.0

2015-12-25 Thread Chris Fregly
what are you trying to do, exactly?

On Tue, Dec 22, 2015 at 3:03 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Hi,
> I am new bee to Apache Spark ,using  CDH 5.5 Quick start VM.having spark
> 1.5.0.
> I working on custom schema and getting error
>
> import org.apache.spark.sql.hive.HiveContext
>>>
>>> scala> import org.apache.spark.sql.hive.orc._
>>> import org.apache.spark.sql.hive.orc._
>>>
>>> scala> import org.apache.spark.sql.types.{StructType, StructField,
>>> StringType, IntegerType};
>>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>>> IntegerType}
>>>
>>> scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> 15/12/21 23:41:53 INFO hive.HiveContext: Initializing execution hive,
>>> version 1.1.0
>>> 15/12/21 23:41:53 INFO client.ClientWrapper: Inspected Hadoop version:
>>> 2.6.0-cdh5.5.0
>>> 15/12/21 23:41:53 INFO client.ClientWrapper: Loaded
>>> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0
>>> hiveContext: org.apache.spark.sql.hive.HiveContext =
>>> org.apache.spark.sql.hive.HiveContext@214bd538
>>>
>>> scala> val customSchema = StructType(Seq(StructField("year",
>>> IntegerType, true),StructField("make", StringType,
>>> true),StructField("model", StringType, true),StructField("comment",
>>> StringType, true),StructField("blank", StringType, true)))
>>> customSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(year,IntegerType,true),
>>> StructField(make,StringType,true), StructField(model,StringType,true),
>>> StructField(comment,StringType,true), StructField(blank,StringType,true))
>>>
>>> scala> val customSchema = (new StructType).add("year", IntegerType,
>>> true).add("make", StringType, true).add("model", StringType,
>>> true).add("comment", StringType, true).add("blank", StringType, true)
>>> customSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(year,IntegerType,true),
>>> StructField(make,StringType,true), StructField(model,StringType,true),
>>> StructField(comment,StringType,true), StructField(blank,StringType,true))
>>>
>>> scala> val customSchema = StructType( StructField("year", IntegerType,
>>> true) :: StructField("make", StringType, true) :: StructField("model",
>>> StringType, true) :: StructField("comment", StringType, true) ::
>>> StructField("blank", StringType, true)::StructField("blank", StringType,
>>> true))
>>> :24: error: value :: is not a member of
>>> org.apache.spark.sql.types.StructField
>>>val customSchema = StructType( StructField("year", IntegerType,
>>> true) :: StructField("make", StringType, true) :: StructField("model",
>>> StringType, true) :: StructField("comment", StringType, true) ::
>>> StructField("blank", StringType, true)::StructField("blank", StringType,
>>> true))
>>>
>>
> Tried like like below also
>
> scala> val customSchema = StructType( StructField("year", IntegerType,
> true), StructField("make", StringType, true) ,StructField("model",
> StringType, true) , StructField("comment", StringType, true) ,
> StructField("blank", StringType, true),StructField("blank", StringType,
> true))
> :24: error: overloaded method value apply with alternatives:
>   (fields:
> Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
> 
>   (fields:
> java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
> 
>   (fields:
> Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
>  cannot be applied to (org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField,
> org.apache.spark.sql.types.StructField)
>val customSchema = StructType( StructField("year", IntegerType,
> true), StructField("make", StringType, true) ,StructField("model",
> StringType, true) , StructField("comment", StringType, true) ,
> StructField("blank", StringType, true),StructField("blank", StringType,
> true))
>   ^
>Would really appreciate if somebody share the example which works with
> Spark 1.4 or Spark 1.5.0
>
> Thanks,
> Divya
>
> ^
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Tips for Spark's Random Forest slow performance

2015-12-25 Thread Chris Fregly
so it looks like you're increasing num trees by 5x and you're seeing an 8x
increase in runtime, correct?

did you analyze the Spark cluster resources to monitor the memory usage,
spillage, disk I/O, etc?

you may need more Workers.

On Tue, Dec 22, 2015 at 8:57 AM, Alexander Ratnikov <
ratnikov.alexan...@gmail.com> wrote:

> Hi All,
>
> It would be good to get some tips on tuning Apache Spark for Random
> Forest classification.
> Currently, we have a model that looks like:
>
> featureSubsetStrategy all
> impurity gini
> maxBins 32
> maxDepth 11
> numberOfClasses 2
> numberOfTrees 100
>
> We are running Spark 1.5.1 as a standalone cluster.
>
> 1 Master and 2 Worker nodes.
> The amount of RAM is 32GB on each node with 4 Cores.
> The classification takes 440ms.
>
> When we increase the number of trees to 500, it takes 8 sec already.
> We tried to reduce the depth but then error rate is higher. We have
> around 246 attributes.
>
> Probably we are doing something wrong. Any ideas how we could improve
> the performance ?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Tips-for-Spark-s-Random-Forest-slow-performance-tp25766.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: java.io.FileNotFoundException(Too many open files) in Spark streaming

2015-12-25 Thread Chris Fregly
and which version of Spark/Spark Streaming are you using?

are you explicitly setting the spark.streaming.concurrentJobs to something
larger than the default of 1?

if so, please try setting that back to 1 and see if the problem still
exists.

this is a dangerous parameter to modify from the default - which is why
it's not well-documented.


On Wed, Dec 23, 2015 at 8:23 AM, Vijay Gharge <vijay.gha...@gmail.com>
wrote:

> Few indicators -
>
> 1) during execution time - check total number of open files using lsof
> command. Need root permissions. If it is cluster not sure much !
> 2) which exact line in the code is triggering this error ? Can you paste
> that snippet ?
>
>
> On Wednesday 23 December 2015, Priya Ch <learnings.chitt...@gmail.com>
> wrote:
>
>> ulimit -n 65000
>>
>> fs.file-max = 65000 ( in etc/sysctl.conf file)
>>
>> Thanks,
>> Padma Ch
>>
>> On Tue, Dec 22, 2015 at 6:47 PM, Yash Sharma <yash...@gmail.com> wrote:
>>
>>> Could you share the ulimit for your setup please ?
>>>
>>> - Thanks, via mobile,  excuse brevity.
>>> On Dec 22, 2015 6:39 PM, "Priya Ch" <learnings.chitt...@gmail.com>
>>> wrote:
>>>
>>>> Jakob,
>>>>
>>>>Increased the settings like fs.file-max in /etc/sysctl.conf and
>>>> also increased user limit in /etc/security/limits.conf. But still see
>>>> the same issue.
>>>>
>>>> On Fri, Dec 18, 2015 at 12:54 AM, Jakob Odersky <joder...@gmail.com>
>>>> wrote:
>>>>
>>>>> It might be a good idea to see how many files are open and try
>>>>> increasing the open file limit (this is done on an os level). In some
>>>>> application use-cases it is actually a legitimate need.
>>>>>
>>>>> If that doesn't help, make sure you close any unused files and streams
>>>>> in your code. It will also be easier to help diagnose the issue if you 
>>>>> send
>>>>> an error-reproducing snippet.
>>>>>
>>>>
>>>>
>>
>
> --
> Regards,
> Vijay Gharge
>
>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Fat jar can't find jdbc

2015-12-25 Thread Chris Fregly
JDBC Drivers need to be on the system classpath.

try passing --jars /path/to/local/mysql-connector.jar when you submit the
job.

this will also copy the jars to each of the worker nodes and should set you
straight.

On Tue, Dec 22, 2015 at 11:42 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> imho, if you succeeded to fetch something from your mysql with same jar in
> classpath, then Manifest is ok and you indeed should look at your spark sql
> - jdbc configs
>
> On 22 December 2015 at 12:21, David Yerrington <da...@yerrington.net>
> wrote:
>
>> Igor, I think it's available.  After I extract the jar file, I see a
>> directory with class files that look very relevant in "/com/mysql/jdbc".
>>
>> After reading this, I started to wonder if MySQL connector was really the
>> problem.  Perhaps it's something to do with SQLcontext?  I just wired a
>> test endpoint to run a very basic mysql query, outside of Spark, and it
>> worked just fine (yay!).  I copied and pasted this example to verify my
>> MySQL connector availability, and it worked just fine:
>> https://mkaz.github.io/2011/05/27/using-scala-with-jdbc-to-connect-to-mysql/
>>
>> As far as the Maven manifest goes, I'm really not sure.  I will research
>> it though.  Now I'm wondering if my mergeStrategy is to blame?  I'm going
>> to try there next.
>>
>> Thank you for the help!
>>
>> On Tue, Dec 22, 2015 at 1:18 AM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> David, can you verify that mysql connector classes indeed in your single
>>> jar?
>>> open it with zip tool available at your platform
>>>
>>> another options that might be a problem - if there is some dependency in
>>> MANIFEST(not sure though this is the case of mysql connector) then it might
>>> be broken after preparing single jar
>>> so you need to verify that it's ok(in maven usually it's possible to
>>> define merging policy for resources while creating single jar)
>>>
>>> On 22 December 2015 at 10:04, Vijay Kiran <m...@vijaykiran.com> wrote:
>>>
>>>> Can you paste your libraryDependencies from build.sbt ?
>>>>
>>>> ./Vijay
>>>>
>>>> > On 22 Dec 2015, at 06:12, David Yerrington <da...@yerrington.net>
>>>> wrote:
>>>> >
>>>> > Hi Everyone,
>>>> >
>>>> > I'm building a prototype that fundamentally grabs data from a MySQL
>>>> instance, crunches some numbers, and then moves it on down the pipeline.
>>>> I've been using SBT with assembly tool to build a single jar for 
>>>> deployment.
>>>> >
>>>> > I've gone through the paces of stomping out many dependency problems
>>>> and have come down to one last (hopefully) zinger.
>>>> >
>>>> > java.lang.ClassNotFoundException: Failed to load class for data
>>>> source: jdbc.
>>>> >
>>>> > at
>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
>>>> >
>>>> > at
>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
>>>> >
>>>> > at
>>>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
>>>> >
>>>> > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203)
>>>> >
>>>> > at her.recommender.getDataframe(her.recommender.scala:45)
>>>> >
>>>> > at her.recommender.getRecommendations(her.recommender.scala:60)
>>>> >
>>>> >
>>>> > I'm assuming this has to do with mysql-connector because this is the
>>>> problem I run into when I'm working with spark-shell and I forget to
>>>> include my classpath with my mysql-connect jar file.
>>>> >
>>>> > I've tried:
>>>> >   • Using different versions of mysql-connector-java in my
>>>> build.sbt file
>>>> >   • Copying the connector jar to my_project/src/main/lib
>>>> >   • Copying the connector jar to my_project/lib <-- (this is
>>>> where I keep my build.sbt)
>>>> > Everything loads fine and works, except my call that does
>>>> "sqlContext.load("jdbc", myOptions)".  I know this is a total newbie
>>>> question but in my defense, I'm fairly new to Scala, and this is my first
>>>> go at deploying a fat jar with sbt-assembly.
>>>> >
>>>> > Thanks for any advice!
>>>> >
>>>> > --
>>>> > David Yerrington
>>>> > yerrington.net
>>>>
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>>
>> --
>> David Yerrington
>> yerrington.net
>>
>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Stuck with DataFrame df.select("select * from table");

2015-12-25 Thread Chris Fregly
I assume by "The same code perfectly works through Zeppelin 0.5.5" that
you're using the %sql interpreter with your regular SQL SELECT statement,
correct?

If so, the Zeppelin interpreter is converting the  that
follows

%sql

to

sqlContext.sql()

per the following code:

https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L125

https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L140-L141


Also, keep in mind that you can do something like this if you want to stay
in DataFrame land:

df.selectExpr("*").limit(5).show()



On Fri, Dec 25, 2015 at 12:53 PM, Eugene Morozov <evgeny.a.moro...@gmail.com
> wrote:

> Ted, Igor,
>
> Oh my... thanks a lot to both of you!
> Igor was absolutely right, but I missed that I have to use sqlContext =(
>
> Everything's perfect.
> Thank you.
>
> --
> Be well!
> Jean Morozov
>
> On Fri, Dec 25, 2015 at 8:31 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> DataFrame uses different syntax from SQL query.
>> I searched unit tests but didn't find any in the form of df.select("select
>> ...")
>>
>> Looks like you should use sqlContext as other people suggested.
>>
>> On Fri, Dec 25, 2015 at 8:29 AM, Eugene Morozov <
>> evgeny.a.moro...@gmail.com> wrote:
>>
>>> Thanks for the comments, although the issue is not in limit() predicate.
>>> It's something with spark being unable to resolve the expression.
>>>
>>> I can do smth like this. It works as it suppose to:
>>>  df.select(df.col("*")).where(df.col("x1").equalTo(3.0)).show(5);
>>>
>>> But I think old fashioned sql style have to work also. I have
>>> df.registeredTempTable("tmptable") and then
>>>
>>> df.select("select * from tmptable where x1 = '3.0'").show();
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve 'select * from
>>> tmp where x1 = '1.0'' given input columns x1, x4, x5, x3, x2;
>>>
>>> at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.sca
>>>
>>>
>>> From the first statement I conclude that my custom datasource is
>>> perfectly fine.
>>> Just wonder how to fix / workaround that.
>>> --
>>> Be well!
>>> Jean Morozov
>>>
>>> On Fri, Dec 25, 2015 at 6:13 PM, Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> sqlContext.sql("select * from table limit 5").show() (not sure if limit
>>>> 5 supported)
>>>>
>>>> or use Dmitriy's solution. select() defines your projection when you've
>>>> specified entire query
>>>>
>>>> On 25 December 2015 at 15:42, Василец Дмитрий <pronix.serv...@gmail.com
>>>> > wrote:
>>>>
>>>>> hello
>>>>> you can try to use df.limit(5).show()
>>>>> just trick :)
>>>>>
>>>>> On Fri, Dec 25, 2015 at 2:34 PM, Eugene Morozov <
>>>>> evgeny.a.moro...@gmail.com> wrote:
>>>>>
>>>>>> Hello, I'm basically stuck as I have no idea where to look;
>>>>>>
>>>>>> Following simple code, given that my Datasource is working gives me
>>>>>> an exception.
>>>>>>
>>>>>> DataFrame df = sqlc.load(filename, 
>>>>>> "com.epam.parso.spark.ds.DefaultSource");
>>>>>> df.cache();
>>>>>> df.printSchema();   <-- prints the schema perfectly fine!
>>>>>>
>>>>>> df.show();  <-- Works perfectly fine (shows table 
>>>>>> with 20 lines)!
>>>>>> df.registerTempTable("table");
>>>>>> df.select("select * from table limit 5").show(); <-- gives weird 
>>>>>> exception
>>>>>>
>>>>>> Exception is:
>>>>>>
>>>>>> AnalysisException: cannot resolve 'select * from table limit 5' given 
>>>>>> input columns VER, CREATED, SOC, SOCC, HLTC, HLGTC, STATUS
>>>>>>
>>>>>> I can do a collect on a dataframe, but cannot select any specific
>>>>>> columns either "select * from table" or "select VER, CREATED from table".
>>>>>>
>>>>>> I use spark 1.5.2.
>>>>>> The same code perfectly works through Zeppelin 0.5.5.
>>>>>>
>>>>>> Thanks.
>>>>>> --
>>>>>> Be well!
>>>>>> Jean Morozov
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Spark SQL 1.5.2 missing JDBC driver for PostgreSQL?

2015-12-25 Thread Chris Fregly
Configuring JDBC drivers with Spark is a bit tricky as the JDBC driver
needs to be on the Java System Classpath per this
<http://spark.apache.org/docs/latest/sql-programming-guide.html#troubleshooting>
troubleshooting
section in the Spark SQL programming guide.

Here
<https://github.com/fluxcapacitor/pipeline/blob/master/bin/start-hive-thriftserver.sh>
is an example hive-thrift-server start script from my Spark-based reference
pipeline project.  Here
<https://github.com/fluxcapacitor/pipeline/blob/master/bin/pipeline-spark-sql.sh>
is an example script that decorates the out-of-the-box spark-sql command to
use the MySQL JDBC driver.

These scripts explicitly set --jars to $SPARK_SUBMIT_JARS which is defined
here
<https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L144>
and
here
<https://github.com/fluxcapacitor/pipeline/blob/master/config/bash/.profile#L87>
and
includes the path to the local MySQL JDBC driver.  This approach is
described here
<http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management>
in
the Spark docs that describe the advanced spark-submit options.

Any jar specified with --jars will be passed to each worker node in the
cluster - specifically in the work directory for each SparkContext for
isolation purposes.

Cleanup of these jars on the worker nodes is handled by YARN automatically,
and by Spark Standalone per the spark.worker.cleanup.appDataTtl config
param.

The Spark SQL programming guide says to use SPARK_CLASSPATH for this
purpose, but I couldn't get this to work for whatever reason, so i'm
sticking to the --jars approach used in my examples.

On Tue, Dec 22, 2015 at 9:51 PM, Benjamin Kim <bbuil...@gmail.com> wrote:

> Stephen,
>
> Let me confirm. I just need to propagate these settings I put in
> spark-defaults.conf to all the worker nodes? Do I need to do the same with
> the PostgreSQL driver jar file too? If so, is there a way to have it read
> from HDFS rather than copying out to the cluster manually.
>
> Thanks for your help,
> Ben
>
>
> On Tuesday, December 22, 2015, Stephen Boesch <java...@gmail.com> wrote:
>
>> HI Benjamin,  yes by adding to the thrift server then the create table
>> would work.  But querying is performed by the workers: so you need to add
>> to the classpath of all nodes for reads to work.
>>
>> 2015-12-22 18:35 GMT-08:00 Benjamin Kim <bbuil...@gmail.com>:
>>
>>> Hi Stephen,
>>>
>>> I forgot to mention that I added these lines below to the
>>> spark-default.conf on the node with Spark SQL Thrift JDBC/ODBC Server
>>> running on it. Then, I restarted it.
>>>
>>>
>>> spark.driver.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
>>>
>>> spark.executor.extraClassPath=/usr/share/java/postgresql-9.3-1104.jdbc41.jar
>>>
>>> I read in another thread that this would work. I was able to create the
>>> table and could see it in my SHOW TABLES list. But, when I try to query the
>>> table, I get the same error. It looks like I’m getting close.
>>>
>>> Are there any other things that I have to do that you can think of?
>>>
>>> Thanks,
>>> Ben
>>>
>>>
>>> On Dec 22, 2015, at 6:25 PM, Stephen Boesch <java...@gmail.com> wrote:
>>>
>>> The postgres jdbc driver needs to be added to the  classpath of your
>>> spark workers.  You can do a search for how to do that (multiple ways).
>>>
>>> 2015-12-22 17:22 GMT-08:00 b2k70 <bbuil...@gmail.com>:
>>>
>>>> I see in the Spark SQL documentation that a temporary table can be
>>>> created
>>>> directly onto a remote PostgreSQL table.
>>>>
>>>> CREATE TEMPORARY TABLE 
>>>> USING org.apache.spark.sql.jdbc
>>>> OPTIONS (
>>>> url "jdbc:postgresql:///",
>>>> dbtable "impressions"
>>>> );
>>>> When I run this against our PostgreSQL server, I get the following
>>>> error.
>>>>
>>>> Error: java.sql.SQLException: No suitable driver found for
>>>> jdbc:postgresql:///
>>>> (state=,code=0)
>>>>
>>>> Can someone help me understand why this is?
>>>>
>>>> Thanks, Ben
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-1-5-2-missing-JDBC-driver-for-PostgreSQL-tp25773.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>>> <http://nabble.com>.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>>
>>


-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Getting estimates and standard error using ml.LinearRegression

2015-12-25 Thread Chris Fregly
try

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.RegressionEvaluator
if you're using *spark.ml.LinearRegression *(which it appears you are)

or

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.evaluation.RegressionMetrics
if you're using *spark.mllib.LinearRegressionModel *(included for
completeness)






On Mon, Dec 21, 2015 at 12:47 AM, Arunkumar Pillai <arunkumar1...@gmail.com>
wrote:

> Hi
>
> I'm using ml.LinearRegession  package
>
>
> How to get estimates and standard Error for the coefficient
>
> PFB the code snippet
>
> val lr = new LinearRegression()
> lr.setMaxIter(10)
>   .setRegParam(0.01)
>   .setFitIntercept(true)
> val model= lr.fit(test)
> val estimates = model.summary
>
>
> --
> Thanks and Regards
> Arun
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Stuck with DataFrame df.select("select * from table");

2015-12-25 Thread Chris Fregly
oh, and it's worth noting that - starting with Spark 1.6 - you'll be able
to just do the following:

SELECT * FROM json.`/path/to/json/file`

(note the back ticks)

instead of calling registerTempTable() for the sole purpose of using SQL.

https://issues.apache.org/jira/browse/SPARK-11197

On Fri, Dec 25, 2015 at 2:17 PM, Chris Fregly <ch...@fregly.com> wrote:

> I assume by "The same code perfectly works through Zeppelin 0.5.5" that
> you're using the %sql interpreter with your regular SQL SELECT statement,
> correct?
>
> If so, the Zeppelin interpreter is converting the  that
> follows
>
> %sql
>
> to
>
> sqlContext.sql()
>
> per the following code:
>
>
> https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L125
>
>
> https://github.com/apache/incubator-zeppelin/blob/01f4884a3a971ece49d668a9783d6b705cf6dbb5/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java#L140-L141
>
>
> Also, keep in mind that you can do something like this if you want to stay
> in DataFrame land:
>
> df.selectExpr("*").limit(5).show()
>
>
>
> On Fri, Dec 25, 2015 at 12:53 PM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Ted, Igor,
>>
>> Oh my... thanks a lot to both of you!
>> Igor was absolutely right, but I missed that I have to use sqlContext =(
>>
>> Everything's perfect.
>> Thank you.
>>
>> --
>> Be well!
>> Jean Morozov
>>
>> On Fri, Dec 25, 2015 at 8:31 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> DataFrame uses different syntax from SQL query.
>>> I searched unit tests but didn't find any in the form of df.select("select
>>> ...")
>>>
>>> Looks like you should use sqlContext as other people suggested.
>>>
>>> On Fri, Dec 25, 2015 at 8:29 AM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
>>>> Thanks for the comments, although the issue is not in limit()
>>>> predicate.
>>>> It's something with spark being unable to resolve the expression.
>>>>
>>>> I can do smth like this. It works as it suppose to:
>>>>  df.select(df.col("*")).where(df.col("x1").equalTo(3.0)).show(5);
>>>>
>>>> But I think old fashioned sql style have to work also. I have
>>>> df.registeredTempTable("tmptable") and then
>>>>
>>>> df.select("select * from tmptable where x1 = '3.0'").show();
>>>>
>>>> org.apache.spark.sql.AnalysisException: cannot resolve 'select * from
>>>> tmp where x1 = '1.0'' given input columns x1, x4, x5, x3, x2;
>>>>
>>>> at
>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>>>> at
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:56)
>>>> at
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.sca
>>>>
>>>>
>>>> From the first statement I conclude that my custom datasource is
>>>> perfectly fine.
>>>> Just wonder how to fix / workaround that.
>>>> --
>>>> Be well!
>>>> Jean Morozov
>>>>
>>>> On Fri, Dec 25, 2015 at 6:13 PM, Igor Berman <igor.ber...@gmail.com>
>>>> wrote:
>>>>
>>>>> sqlContext.sql("select * from table limit 5").show() (not sure if
>>>>> limit 5 supported)
>>>>>
>>>>> or use Dmitriy's solution. select() defines your projection when
>>>>> you've specified entire query
>>>>>
>>>>> On 25 December 2015 at 15:42, Василец Дмитрий <
>>>>> pronix.serv...@gmail.com> wrote:
>>>>>
>>>>>> hello
>>>>>> you can try to use df.limit(5).show()
>>>>>> just trick :)
>>>>>>
>>>>>> On Fri, Dec 25, 2015 at 2:34 PM, Eugene Morozov <
>>>>>> evgeny.a.moro...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello, I'm basically stuck as I have no idea where to look;
>>>>>>>
>>>>>>> Following simple code, given that my Datasource is working gives me
>>>>>>> an exception.
>>>>>>>
>&

Re: Tips for Spark's Random Forest slow performance

2015-12-25 Thread Chris Fregly
ah, so with that much serialization happening, you might actually need
*less* workers!  :)

in the next couple releases of Spark ML should, we should see better
scoring/predicting functionality using a single node for exactly this
reason.

to get there, we need model.save/load support (PMML?), optimized
single-node linear algebra support, and a few other goodies.

useNodeIdCache only affects training.

btw, are you checkpointing per this
<https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala#L151>?
 (code snippet from DecisionTreeParams copy/pasted below for convenience)

/** * Specifies how often to checkpoint the cached node IDs. * E.g. 10
means that the cache will get checkpointed every 10 iterations. * This is
only used if cacheNodeIds is true and if the checkpoint directory is set in
* [[org.apache.spark.SparkContext]]. * Must be >= 1. * (default = 10) *
@group expertSetParam */ def setCheckpointInterval(value: Int): this.type =
set(checkpointInterval, value)
i'm not actually sure how this will affect training performance with the
new ml.RandomForest impl, but i'm curious to hear what you find.


On Fri, Dec 25, 2015 at 6:03 PM, Alexander Ratnikov <
ratnikov.alexan...@gmail.com> wrote:

> Definitely the biggest difference is the maxDepth of the trees. With
> values smaller or equal to 5 the time goes into milliseconds.
> The amount of trees affects the performance but not that much.
> I tried to profile the app and I see decent time spent in serialization.
> I'm wondering if Spark isn't somehow caching model on workers during
> classification ?
>
> useNodeIdCache is ON but docs aren't clear if Spark is using it only
> on training.
> Also, I must say we didn't have this problem in the old mllib API so
> it might be something in the new ml that I'm missing.
> I will dig deeper into the problem after holidays.
>
> 2015-12-25 16:26 GMT+01:00 Chris Fregly <ch...@fregly.com>:
> > so it looks like you're increasing num trees by 5x and you're seeing an
> 8x
> > increase in runtime, correct?
> >
> > did you analyze the Spark cluster resources to monitor the memory usage,
> > spillage, disk I/O, etc?
> >
> > you may need more Workers.
> >
> > On Tue, Dec 22, 2015 at 8:57 AM, Alexander Ratnikov
> > <ratnikov.alexan...@gmail.com> wrote:
> >>
> >> Hi All,
> >>
> >> It would be good to get some tips on tuning Apache Spark for Random
> >> Forest classification.
> >> Currently, we have a model that looks like:
> >>
> >> featureSubsetStrategy all
> >> impurity gini
> >> maxBins 32
> >> maxDepth 11
> >> numberOfClasses 2
> >> numberOfTrees 100
> >>
> >> We are running Spark 1.5.1 as a standalone cluster.
> >>
> >> 1 Master and 2 Worker nodes.
> >> The amount of RAM is 32GB on each node with 4 Cores.
> >> The classification takes 440ms.
> >>
> >> When we increase the number of trees to 500, it takes 8 sec already.
> >> We tried to reduce the depth but then error rate is higher. We have
> >> around 246 attributes.
> >>
> >> Probably we are doing something wrong. Any ideas how we could improve
> >> the performance ?
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Tips-for-Spark-s-Random-Forest-slow-performance-tp25766.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> >
> >
> > --
> >
> > Chris Fregly
> > Principal Data Solutions Engineer
> > IBM Spark Technology Center, San Francisco, CA
> > http://spark.tc | http://advancedspark.com
>



-- 

*Chris Fregly*
Principal Data Solutions Engineer
IBM Spark Technology Center, San Francisco, CA
http://spark.tc | http://advancedspark.com


Re: Kafka - streaming from multiple topics

2015-12-20 Thread Chris Fregly
separating out your code into separate streaming jobs - especially when there 
are no dependencies between the jobs - is almost always the best route.  it's 
easier to combine atoms (fusion), then split them (fission).

I recommend splitting out jobs along batch window, stream window, and 
state-tracking characteristics.

for example, imagine 3 separate jobs for the following:

1) light storing of raw data into Cassandra (500ms batch interval)
2) medium aggregations/window roll ups (2000ms batch interval)
3) heavy training a ML model (1ms batch interval). 

and reminder that you can control (isolate or combine) the spark resources used 
by these separate, single purpose streaming jobs using scheduler pools just 
like your batch spark jobs.

@cody:  curious about neelesh's question, as well.  does the Kafka Direct 
Stream API treat each Kafka Topic Partition separately in terms of parallel 
retrieval?

more context:  within a Kafka Topic partition, Kafka guarantees order, but not 
total ordering across partitions.  this is normal and expected.

so I assume the the Kafka Direct Streaming connector can retrieve (and 
recover/retry) from separate partitions in parallel and still maintain the 
ordering guarantees offered by Kafka.

if this is true, then I'd suggest @neelesh create more partitions within the 
Kafka Topic to improve parallelism - same as any distributed, partitioned data 
processing engine including spark.

if this is not true, is there a technical limitation to prevent this 
parallelism within the connector?  

> On Dec 19, 2015, at 5:51 PM, Neelesh  wrote:
> 
> A related issue -  When I put multiple topics in a single stream, the 
> processing delay is as bad as the slowest task in the number of tasks 
> created. Even though the topics are unrelated to each other, RDD at time "t1" 
> has to wait for the RDD at "t0"  is fully executed,  even if most cores are 
> idling, and  just one task is still running and the rest of them have 
> completed. Effectively, a lightly loaded topic gets the worst deal because of 
> a heavily loaded topic
> 
> Is my understanding correct? 
> 
> 
> 
>> On Thu, Dec 17, 2015 at 9:53 AM, Cody Koeninger  wrote:
>> You could stick them all in a single stream, and do mapPartitions, then 
>> switch on the topic for that partition.  It's probably cleaner to do 
>> separate jobs, just depends on how you want to organize your code.
>> 
>>> On Thu, Dec 17, 2015 at 11:11 AM, Jean-Pierre OCALAN  
>>> wrote:
>>> Hi Cody,
>>> 
>>> First of all thanks for the note about spark.streaming.concurrentJobs. I 
>>> guess this is why it's not mentioned in the actual spark streaming doc.
>>> Since those 3 topics contain completely different data on which I need to 
>>> apply different kind of transformations, I am not sure joining them would 
>>> be really efficient, unless you know something that I don't.
>>> 
>>> As I really don't need any interaction between those streams, I think I 
>>> might end up running 3 different streaming apps instead of one.
>>> 
>>> Thanks again!
>>> 
 On Thu, Dec 17, 2015 at 11:43 AM, Cody Koeninger  
 wrote:
 Using spark.streaming.concurrentJobs for this probably isn't a good idea, 
 as it allows the next batch to start processing before current one is 
 finished, which may have unintended consequences.
 
 Why can't you use a single stream with all the topics you care about, or 
 multiple streams if you're e.g. joining them?
 
 
 
> On Wed, Dec 16, 2015 at 3:00 PM, jpocalan  wrote:
> Nevermind, I found the answer to my questions.
> The following spark configuration property will allow you to process
> multiple KafkaDirectStream in parallel:
> --conf spark.streaming.concurrentJobs=
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-from-multiple-topics-tp8678p25723.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>>> 
>>> -- 
>>> jean-pierre ocalan
>>> jpoca...@gmail.com
> 


Re: spark 1.5.2 memory leak? reading JSON

2015-12-20 Thread Chris Fregly
hey Eran, I run into this all the time with Json.

the problem is likely that your Json is "too pretty" and extending beyond a 
single line which trips up the Json reader.

my solution is usually to de-pretty the Json - either manually or through an 
ETL step - by stripping all white space before pointing my DataFrame/JSON 
reader at the file.

this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu

for streaming use cases, you'll want to have a light de-pretty ETL step either 
within the Spark Streaming job after ingestion - or upstream using something 
like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka transformation 
assuming those exist by now.

a similar problem exists for XML, btw.  there's lots of wonky workarounds for 
this that use MapPartitions and all kinds of craziness.  the best option, in my 
opinion, is to just ETL/flatten the data to make the DataFrame reader happy.

> On Dec 19, 2015, at 4:55 PM, Eran Witkon  wrote:
> 
> Hi,
> I tried the following code in spark-shell on spark1.5.2:
> 
> val df = 
> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")
> df.count()
> 
> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size = 
> 67108864 bytes, TID = 3
> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
> java.lang.RuntimeException: Failed to parse a value for data type 
> StructType() (current token: VALUE_STRING).
>   at scala.sys.package$.error(package.scala:27)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
>   at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> 
> Am I am doing something wrong?
> Eran


Re: Hive error when starting up spark-shell in 1.5.2

2015-12-20 Thread Chris Fregly
hopping on a plane, but check the hive-site.xml that's in your spark/conf 
directory (or should be, anyway).  I believe you can change the root path thru 
this mechanism.

if not, this should give you more info google on.

let me know as this comes up a fair amount.

> On Dec 19, 2015, at 4:58 PM, Marco Mistroni  wrote:
> 
> HI all
>  posting again this as i was experiencing this error also under 1.5.1
> I am running spark 1.5.2 on a Windows 10 laptop (upgraded from Windows 8)
> When i launch spark-shell i am getting this exception, presumably becaus ei 
> hav eno 
> admin right to /tmp directory on my latpop (windows 8-10 seems very 
> restrictive)
> 
> java.lang.RuntimeException: java.lang.RuntimeException: The root scratch dir: 
> /tmp/hive on HDFS should be writable. Current permissions are: -
> at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
> at 
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
> at 
> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)
> at 
> org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)
> at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:167)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
> at 
> org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
> at $iwC$$iwC.(:9)
> at $iwC.(:18)
> at (:20)
> at .(:24)
> at .()
> at .(:7)
> at .()
> at $print()
> ..
> Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive on 
> HDFS should be writable. Current permissions are: -
> at 
> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:612)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
> at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
> ... 56 more
> 
> :10: error: not found: value sqlContext
>import sqlContext.implicits._
>   ^
> :10: error: not found: value sqlContext
>import sqlContext.sql
> 
> I was wondering how can i configure hive to point to a different 
> directorywhere i have more permissions
> 
> kr
>  marco
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to do map join in Spark SQL

2015-12-20 Thread Chris Fregly
this type of broadcast should be handled by Spark SQL/DataFrames automatically.

this is the primary cost-based, physical-plan query optimization that the Spark 
SQL Catalyst optimizer supports.

in Spark 1.5 and before, you can trigger this optimization by properly setting 
the spark.sql.autobroadcastThreshold to a value that is *above* the size of 
your smaller table when fully bloated in JVM memory (not the serialized size of 
the data on disk - very common mistake).

in Spark 1.6+, there are heuristics to make this decision dynamically - and 
even allow hybrid execution where certain keys - within the same Spark job - 
will be broadcast and others won't depending on their relative " "hotness" for 
that particular job.

common theme of Spark 1.6 and beyond will be adaptive physical plan execution, 
adaptive memory allocation to RDD Cache vs Spark Execution Engine, adaptive 
cluster resource allocation, etc.

the goal being to minimize manual configuration and enable many diff types of 
workloads to run efficiently on the same Spark cluster.

> On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov  
> wrote:
> 
> I collected small DF to array of tuple3
> Then I registered UDF with function which is doing lookup in the array
> Then I just run select which uses the UDF.
> 
>> On Dec 18, 2015 1:06 AM, "Akhil Das"  wrote:
>> You can broadcast your json data and then do a map side join. This article 
>> is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/
>> 
>> Thanks
>> Best Regards
>> 
>>> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov  
>>> wrote:
>>> I have big folder having ORC files. Files have duration field (e.g. 
>>> 3,12,26, etc)
>>> Also I have small json file  (just 8 rows) with ranges definition (min, max 
>>> , name)
>>> 0, 10, A
>>> 10, 20, B
>>> 20, 30, C 
>>> etc
>>> 
>>> Because I can not do equi-join btw duration and range min/max I need to do 
>>> cross join and apply WHERE condition to take records which belong to the 
>>> range
>>> Cross join is an expensive operation I think that it's better if this 
>>> particular join done using Map Join
>>> 
>>> How to do Map join in Spark Sql?


Re: Pyspark SQL Join Failure

2015-12-20 Thread Chris Fregly
how does Spark SQL/DataFrame know that train_users_2.csv has a field named, 
"id" or anything else domain specific?  is there a header?  if so, does 
sc.textFile() know about this header?

I'd suggest using the Databricks spark-csv package for reading csv data.  there 
is an option in there to specify whether or not a header exists and, if so, use 
this as the column names.

one caveat:  I've had some issues relying on this specific feature of the 
spark-csv package, so I almost always call .toDF("id", "firstname", "lastname", 
...) on the created DataFrame to explicitly name the columns exactly what I 
want them to be.

I've also had some uppercase/lowercase and "_" and "-" issues with some of 
these connectors including spark-csv, spark-cassandra, and a few others, so I 
try to stick to simple lower-case column names without special chars just to be 
safe.

note:  these issues are connector specific, so not all should be subjected to 
this type of pessimism.  just pointing these out for completeness.

> On Dec 19, 2015, at 5:30 PM, Weiwei Zhang  wrote:
> 
> Hi all, 
> 
> I got this error when I tried to use the 'join' function to left outer join 
> two data frames in pyspark 1.4.1. 
> Please kindly point out the places where I made mistakes. Thank you. 
> 
> Traceback (most recent call last):
>   File "/Users/wz/PycharmProjects/PysparkTraining/Airbnb/src/driver.py", line 
> 46, in 
> trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, 
> 'left_outer')
>   File 
> "/Users/wz/Downloads/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py",
>  line 701, in __getattr__
> "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
> AttributeError: 'DataFrame' object has no attribute 'id' - It does have a 
> column called "id"
> 15/12/19 14:15:00 INFO SparkContext: Invoking stop() from shutdown hook
> 
> Here is the code:
> from pyspark import SparkContext
> from pyspark import SparkConf
> from pyspark.sql import SQLContext
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.classification import LogisticRegressionWithSGD
> from pyspark.sql.functions import *
> conf = SparkConf().set("spark.executor.memory", "4g")
> sc = SparkContext(conf= conf)
> sqlCtx = SQLContext(sc)
> 
> train = sc.textFile("../train_users_2.csv").map(lambda line: line.split(","))
> print train.first()
> trainDF = sqlCtx.createDataFrame(train)
> 
> test = sc.textFile("../test_users.csv").map(lambda line: line.split(","))
> testDF = sqlCtx.createDataFrame(test)
> 
> session = sc.textFile("../sessions.csv").map(lambda line: line.split(","))
> sessionDF = sqlCtx.createDataFrame(session)
> 
> # join train with session (Error)
> trainSessionDF = trainDF.join(sessionDF, trainDF.id == sessionDF.user_id, 
> 'left_outer')
> 
> Best Regards, 
> WZ


Re: Multiple Kinesis Streams in a single Streaming job

2015-05-14 Thread Chris Fregly
have you tried to union the 2 streams per the KinesisWordCountASL example
https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120
where
2 streams (against the same Kinesis stream in this case) are created and
union'd?

it should work the same way - including union() of streams from totally
different source types (kafka, kinesis, flume).



On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote:

 What is the error you are seeing?

 TD

 On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com
 wrote:

 Hi,

 Is it possible to setup streams from multiple Kinesis streams and process
 them in a single job?  From what I have read, this should be possible,
 however, the Kinesis layer errors out whenever I try to receive from more
 than a single Kinesis Stream.

 Here is the code.  Currently, I am focused on just getting receivers setup
 and working for the two Kinesis Streams, as such, this code just attempts
 to
 print out the contents of both streams:

 implicit val formats = Serialization.formats(NoTypeHints)

 val conf = new SparkConf().setMaster(local[*]).setAppName(test)
 val ssc = new StreamingContext(conf, Seconds(1))

 val rawStream = KinesisUtils.createStream(ssc, erich-test,
 kinesis.us-east-1.amazonaws.com, Duration(1000),
 InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY)
 rawStream.map(msg = new String(msg)).print

 val loaderStream = KinesisUtils.createStream(
   ssc,
   dev-loader,
   kinesis.us-east-1.amazonaws.com,
   Duration(1000),
   InitialPositionInStream.TRIM_HORIZON,
   StorageLevel.MEMORY_ONLY)

 val loader = loaderStream.map(msg = new String(msg)).print

 ssc.start()

 Thanks,
 -Erich



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Multiple Kinesis Streams in a single Streaming job

2015-05-14 Thread Chris Fregly
another option (not really recommended, but worth mentioning) would be to 
change the region of dynamodb to be separate from the other stream - and even 
separate from the stream itself.

this isn't available right now, but will be in Spark 1.4.

 On May 14, 2015, at 6:47 PM, Erich Ess er...@simplerelevance.com wrote:
 
 Hi Tathagata,
 
 I think that's exactly what's happening.
 
 The error message is: 
 com.amazonaws.services.kinesis.model.InvalidArgumentException: 
 StartingSequenceNumber 
 49550673839151225431779125105915140284622031848663416866 used in 
 GetShardIterator on shard shardId-0002 in stream erich-test under 
 account xxx is invalid because it did not come from this stream.
 
 I looked at the DynamoDB table and each job has single table and that table 
 does not contain any stream identification information, only shard 
 checkpointing data.  I think the error is that when it tries to read from 
 stream B, it's using checkpointing data for stream A and errors out.  So it 
 appears, at first glance, that currently you can't read from multiple Kinesis 
 streams in a single job.  I haven't tried this, but it might be possible for 
 this to work if I force each stream to have different shard IDs so there is 
 no ambiguity in the DynamoDB table; however, that's clearly not a feasible 
 production solution.
 
 Thanks,
 -Erich
 
 On Thu, May 14, 2015 at 8:34 PM, Tathagata Das t...@databricks.com wrote:
 A possible problem may be that the kinesis stream in 1.3 uses the 
 SparkContext app name, as the Kinesis Application Name, that is used by the 
 Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis 
 DStreams are using the Kinesis application name (as they are in the same 
 StreamingContext / SparkContext / Spark app name), KCL may be doing weird 
 overwriting checkpoint information of both Kinesis streams into the same 
 DynamoDB table. Either ways, this is going to be fixed in Spark 1.4. 
 
 On Thu, May 14, 2015 at 4:10 PM, Chris Fregly ch...@fregly.com wrote:
 have you tried to union the 2 streams per the KinesisWordCountASL example 
 where 2 streams (against the same Kinesis stream in this case) are created 
 and union'd?
 
 it should work the same way - including union() of streams from totally 
 different source types (kafka, kinesis, flume).
 
 
 
 On Thu, May 14, 2015 at 2:07 PM, Tathagata Das t...@databricks.com wrote:
 What is the error you are seeing?
 
 TD
 
 On Thu, May 14, 2015 at 9:00 AM, Erich Ess er...@simplerelevance.com 
 wrote:
 Hi,
 
 Is it possible to setup streams from multiple Kinesis streams and process
 them in a single job?  From what I have read, this should be possible,
 however, the Kinesis layer errors out whenever I try to receive from more
 than a single Kinesis Stream.
 
 Here is the code.  Currently, I am focused on just getting receivers setup
 and working for the two Kinesis Streams, as such, this code just attempts 
 to
 print out the contents of both streams:
 
 implicit val formats = Serialization.formats(NoTypeHints)
 
 val conf = new SparkConf().setMaster(local[*]).setAppName(test)
 val ssc = new StreamingContext(conf, Seconds(1))
 
 val rawStream = KinesisUtils.createStream(ssc, erich-test,
 kinesis.us-east-1.amazonaws.com, Duration(1000),
 InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY)
 rawStream.map(msg = new String(msg)).print
 
 val loaderStream = KinesisUtils.createStream(
   ssc,
   dev-loader,
   kinesis.us-east-1.amazonaws.com,
   Duration(1000),
   InitialPositionInStream.TRIM_HORIZON,
   StorageLevel.MEMORY_ONLY)
 
 val loader = loaderStream.map(msg = new String(msg)).print
 
 ssc.start()
 
 Thanks,
 -Erich
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 -- 
 Erich Ess | CTO
 c. 310-703-6058
 @SimpleRelevance | 130 E Randolph, Ste 1650 | Chicago, IL 60601
 Machine Learning For Marketers
 
 Named a top startup to watch in Crain's — View the Article.
 
 SimpleRelevance.com | Facebook | Twitter | Blog


Re: Spark + Kinesis + Stream Name + Cache?

2015-05-08 Thread Chris Fregly
hey mike-

as you pointed out here from my docs, changing the stream name is sometimes 
problematic due to the way the Kinesis Client Library manages leases and 
checkpoints, etc in DynamoDB.

I noticed this directly while developing the Kinesis connector which is why I 
highlighted the issue here.

is wiping out the DynamoDB table a suitable workaround for now?  usually in 
production, you wouldn't be changing stream names often, so hopefully that's ok 
for dev.

otherwise, can you share the relevant spark streaming logs that are generated 
when you do this?

I saw a lot of lease not owned by this Kinesis Client type of errors, from 
what I remember.

lemme know!

-Chris 

 On May 8, 2015, at 4:36 PM, Mike Trienis mike.trie...@orcsol.com wrote:
 
 [Kinesis stream name]: The Kinesis stream that this streaming application 
 receives from
 The application name used in the streaming context becomes the Kinesis 
 application name
 The application name must be unique for a given account and region.
 The Kinesis backend automatically associates the application name to the 
 Kinesis stream using a DynamoDB table (always in the us-east-1 region) 
 created during Kinesis Client Library initialization.
 Changing the application name or stream name can lead to Kinesis errors in 
 some cases. If you see errors, you may need to manually delete the DynamoDB 
 table.
 
 On Fri, May 8, 2015 at 2:06 PM, Mike Trienis mike.trie...@orcsol.com wrote:
 Hi All,
 
 I am submitting the assembled fat jar file by the command:
 
 bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class 
 com.xxx.Consumer -0.1-SNAPSHOT.jar 
 
 It reads the data file from kinesis using the stream name defined in a 
 configuration file. It turns out that it reads the data perfectly fine for 
 one stream name (i.e. the default), however, if I switch the stream name and 
 re-submit the jar, it no longer reads the data. From CloudWatch, I can see 
 that there is data put into the stream and spark is actually sending get 
 requests as well. However, it doesn't seem to be outputting the data. 
 
 Has anyone else encountered a similar issue? Does spark cache the stream 
 name somewhere? I also have checkpointing enabled as well.
 
 Thanks, Mike. 
 


Re: Spark Streaming S3 Performance Implications

2015-03-21 Thread Chris Fregly
hey mike!
you'll definitely want to increase your parallelism by adding more shards to 
the stream - as well as spinning up 1 receiver per shard and unioning all the 
shards per the KinesisWordCount example that is included with the kinesis 
streaming package. 
you'll need more cores (cluster) or threads (local) to support this - equalling 
at least the number of shards/receivers + 1.
also, it looks like you're writing to S3 per RDD.  you'll want to broaden that 
out to write DStream batches - or expand  even further and write window batches 
(where the window interval is a multiple of the batch interval).
this goes for any spark streaming implementation - not just Kinesis.
lemme know if that works for you.
thanks!
-Chris 
_
From: Mike Trienis mike.trie...@orcsol.com
Sent: Wednesday, March 18, 2015 2:45 PM
Subject: Spark Streaming S3 Performance Implications
To:  user@spark.apache.org


   Hi All,   
  I am pushing data from Kinesis stream to S3 using Spark Streaming and 
noticed that during testing (i.e. master=local[2]) the batches (1 second 
intervals) were falling behind the incoming data stream at about 5-10 events / 
second. It seems that the rdd.saveAsTextFile(s3n://...) is taking at a few 
seconds to complete.   
           val saveFunc = (rdd: RDD[String], time: Time) = {   
  
             val count = rdd.count() 
             if (count  0) { 
                 val s3BucketInterval = time.milliseconds.toString  
   
                rdd.saveAsTextFile(s3n://...)   
            }         } 
         dataStream.foreachRDD(saveFunc)  
  
  Should I expect the same behaviour in a deployed cluster? Or does the 
rdd.saveAsTextFile(s3n://...) distribute the push work to each worker node? 
 
  Write the elements of the dataset as a text file (or set of text 
files) in a given directory in the local filesystem, HDFS or any other 
Hadoop-supported file system. Spark will call toString on each element to 
convert it to a line of text in the file.  
  Thanks, Mike. 

Re: Connection pool in workers

2015-03-01 Thread Chris Fregly
hey AKM!

this is a very common problem.  the streaming programming guide addresses
this issue here, actually:
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

the tl;dr is this:
1) you want to use foreachPartition() to operate on a whole partition
versus a single record with foreachRDD()
2) you want to get/release the ConnectionPool within each worker
3) make sure you initialize the ConnectionPool first - or do it lazily upon
getting the first connection.

here's the sample code referenced in the link above with some additional
comments for clarity:

dstream.foreachRDD { rdd =
  // everything within here runs on the Driver

  rdd.foreachPartition { partitionOfRecords =
   // everything within here runs on the Worker and operates on a partition
of records

// ConnectionPool is a static, lazily initialized singleton pool of
connections that runs within the Worker JVM

// retrieve a connection from the pool
val connection = ConnectionPool.getConnection()

// perform the application logic here - parse and write to mongodb
using the connection
partitionOfRecords.foreach(record = connection.send(record))

// return to the pool for future reuse
ConnectionPool.returnConnection(connection)
  }
}

hope that helps!

-chris




On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman 
ashrafuzzaman...@gmail.com wrote:

 Sorry guys may bad,
 Here is a high level code sample,

 val unionStreams = ssc.union(kinesisStreams)
 unionStreams.foreachRDD(rdd = {
   rdd.foreach(tweet =
 val strTweet = new String(tweet, UTF-8)
 val interaction = InteractionParser.parser(strTweet)
 interactionDAL.insert(interaction)
   )
 })

 Here I have to close the connection for interactionDAL other wise the JVM
 gives me error that the connection is open. I tried with sticky connection
 as well with keep_alive true. So my guess was that at the point of
 “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and
 send to workers and workers un-marshals and execute the process, which is
 why the connection is alway opened for each RDD. I might be completely
 wrong. I would love to know what is going on underneath.
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Pushing data from AWS Kinesis - Spark Streaming - AWS Redshift

2015-03-01 Thread Chris Fregly
Hey Mike-

Great to see you're using the AWS stack to its fullest!

I've already created the Kinesis-Spark Streaming connector with examples,
documentation, test, and everything.  You'll need to build Spark from
source with the -Pkinesis-asl profile, otherwise they won't be included in
the build.  This is due to the Amazon Software License (ASL).

Here's the link to the Kinesis-Spark Streaming integration guide:
http://spark.apache.org/docs/1.2.0/streaming-kinesis-integration.html.

Here's a link to the source:
https://github.com/apache/spark/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark

Here's the original jira as well as some follow-up jiras that I'm working
on now:

   - https://issues.apache.org/jira/browse/SPARK-1981
   - https://issues.apache.org/jira/browse/SPARK-5959
   - https://issues.apache.org/jira/browse/SPARK-5960

I met a lot of folks at the Strata San Jose conference a couple weeks ago
who are using this Kinesis connector with good results, so you should be OK
there.

Regarding Redshift, the current Redshift connector only pulls data from
Redshift - it doesn't not write to Redshift.

And actually, it only reads files that have been UNLOADed from Redshift
into S3, so it's not pulling from Redshift directly.

This is confusing, I know.  Here's the jira for this work:
https://issues.apache.org/jira/browse/SPARK-3205

For now, you'd have to use the AWS Java SDK to write to Redshift directly
from within your Spark Streaming worker as batches of data come in from the
Kinesis Stream.

This is roughly how your Spark Streaming app would look:

dstream.foreachRDD { rdd =
  // everything within here runs on the Driver

  rdd.foreachPartition { partitionOfRecords =
   // everything within here runs on the Worker and operates on a partition
of records

// RedshiftConnectionPool is a static, lazily initialized singleton
pool of connections that runs within the Worker JVM

// retrieve a connection from the pool
val connection = RedshiftConnectionPool.getConnection()

// perform the application logic here - parse and write to Redshift
using the connection
partitionOfRecords.foreach(record = connection.send(record))

// return to the pool for future reuse
RedshiftConnectionPool.returnConnection(connection)
  }
}

Note that you would need to write the RedshiftConnectionPool single class
yourself using the AWS Java SDK as I mentioned.

There is a relatively new Spark SQL DataSources API that supports reading
and writing to these data sources.

An example Avro implementation is here:
http://spark-packages.org/package/databricks/spark-avro, although I think
that's a read-only impl, but you get the idea.

I've created 2 jiras for creating libraries for DynamoDB and Redshift that
implement this DataSources API.

Here are the jiras:

   - https://issues.apache.org/jira/browse/SPARK-6101 (DynamoDB)
   - https://issues.apache.org/jira/browse/SPARK-6102 (Redshift)

Perhaps you can take a stab at SPARK-6102?  I should be done with the
DynamoDB impl in the next few weeks - scheduled for Spark 1.4.0.

Hope that helps!  :)

-Chris


On Sun, Mar 1, 2015 at 11:06 AM, Mike Trienis mike.trie...@orcsol.com
wrote:

 Hi All,

 I am looking at integrating a data stream from AWS Kinesis to AWS Redshift
 and since I am already ingesting the data through Spark Streaming, it seems
 convenient to also push that data to AWS Redshift at the same time.

 I have taken a look at the AWS kinesis connector although I am not sure it
 was designed to integrate with Apache Spark. It seems more like a
 standalone approach:

- https://github.com/awslabs/amazon-kinesis-connectors

 There is also a Spark redshift integration library, however, it looks like
 it was intended for pulling data rather than pushing data to AWS Redshift:

- https://github.com/databricks/spark-redshift

 I finally took a look at a general Scala library that integrates with AWS
 Redshift:

- http://scalikejdbc.org/

 Does anyone have any experience pushing data from Spark Streaming to AWS
 Redshift? Does it make sense conceptually, or does it make more sense to
 push data from AWS Kinesis to AWS Redshift VIA another standalone approach
 such as the AWS Kinesis connectors.

 Thanks, Mike.




Re: Out of memory with Spark Streaming

2014-10-30 Thread Chris Fregly
curious about why you're only seeing 50 records max per batch.

how many receivers are you running?  what is the rate that you're putting
data onto the stream?

per the default AWS kinesis configuration, the producer can do 1000 PUTs
per second with max 50k bytes per PUT and max 1mb per second per shard.

on the consumer side, you can only do 5 GETs per second and 2mb per second
per shard.

my hunch is that the 5 GETs per second is what's limiting your consumption
rate.

can you verify that these numbers match what you're seeing?  if so, you may
want to increase your shards and therefore the number of kinesis receivers.

otherwise, this may require some further investigation on my part.  i wanna
stay on top of this if it's an issue.

thanks for posting this, aniket!

-chris

On Fri, Sep 12, 2014 at 5:34 AM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 Hi all

 Sorry but this was totally my mistake. In my persistence logic, I was
 creating async http client instance in RDD foreach but was never closing it
 leading to memory leaks.

 Apologies for wasting everyone's time.

 Thanks,
 Aniket

 On 12 September 2014 02:20, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Which version of spark are you running?

 If you are running the latest one, then could try running not a window
 but a simple event count on every 2 second batch, and see if you are still
 running out of memory?

 TD


 On Thu, Sep 11, 2014 at 10:34 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I did change it to be 1 gb. It still ran out of memory but a little
 later.

 The streaming job isnt handling a lot of data. In every 2 seconds, it
 doesn't get more than 50 records. Each record size is not more than 500
 bytes.
  On Sep 11, 2014 10:54 PM, Bharat Venkat bvenkat.sp...@gmail.com
 wrote:

 You could set spark.executor.memory to something bigger than the
 default (512mb)


 On Thu, Sep 11, 2014 at 8:31 AM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am running a simple Spark Streaming program that pulls in data from
 Kinesis at a batch interval of 10 seconds, windows it for 10 seconds, maps
 data and persists to a store.

 The program is running in local mode right now and runs out of memory
 after a while. I am yet to investigate heap dumps but I think Spark isn't
 releasing memory after processing is complete. I have even tried changing
 storage level to disk only.

 Help!

 Thanks,
 Aniket







Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?

2014-10-29 Thread Chris Fregly
jira created with comments/references to this discussion:
https://issues.apache.org/jira/browse/SPARK-4144



On Tue, Aug 19, 2014 at 4:47 PM, Xiangrui Meng men...@gmail.com wrote:

 No. Please create one but it won't be able to catch the v1.1 train.
 -Xiangrui

 On Tue, Aug 19, 2014 at 4:22 PM, Chris Fregly ch...@fregly.com wrote:
  this would be awesome.  did a jira get created for this?  I searched, but
  didn't find one.
 
  thanks!
 
  -chris
 
 
  On Tue, Jul 8, 2014 at 1:30 PM, Rahul Bhojwani 
 rahulbhojwani2...@gmail.com
  wrote:
 
  Thanks a lot Xiangrui. This will help.
 
 
  On Wed, Jul 9, 2014 at 1:34 AM, Xiangrui Meng men...@gmail.com wrote:
 
  Hi Rahul,
 
  We plan to add online model updates with Spark Streaming, perhaps in
  v1.1, starting with linear methods. Please open a JIRA for Naive
  Bayes. For Naive Bayes, we need to update the priors and conditional
  probabilities, which means we should also remember the number of
  observations for the updates.
 
  Best,
  Xiangrui
 
  On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani
  rahulbhojwani2...@gmail.com wrote:
   Hi,
  
   I am using the MLlib Naive Bayes for a text classification problem. I
   have
   very less amount of training data. And then the data will be coming
   continuously and I need to classify it as either A or B. I am
 training
   the
   MLlib Naive Bayes model using the training data but next time when
 data
   comes, I want to predict its class and then incorporate that also in
   the
   model for next time prediction of new data(I think that is obvious).
  
   So I am not able to figure out what is the way to do that using MLlib
   Naive
   Bayes. Is it that I have to train the model on the whole data every
   time new
   data comes in??
  
   Thanks in Advance!
   --
   Rahul K Bhojwani
   3rd Year B.Tech
   Computer Science and Engineering
   National Institute of Technology, Karnataka
 
 
 
 
  --
  Rahul K Bhojwani
  3rd Year B.Tech
  Computer Science and Engineering
  National Institute of Technology, Karnataka
 
 



Re: Shared variable in Spark Streaming

2014-09-05 Thread Chris Fregly
good question, soumitra.  it's a bit confusing.

to break TD's code down a bit:

dstream.count() is a transformation operation (returns a new DStream),
executes lazily, runs in the cluster on the underlying RDDs that come
through in that batch, and returns a new DStream with a single element
representing the count of the underlying RDDs in each batch.

dstream.foreachRDD() is an output/action operation (returns something other
than a DStream - nothing in this case), triggers the lazy execution above,
returns the results to the driver, and increments the globalCount locally
in the driver.

per your specific question, RDD.count() is different in that it's an
output/action operation that materializes the RDD and collects the count of
elements in the RDD locally in the driver.  confusing, indeed.

accumulators updated in parallel on the worker nodes across the cluster and
are read locally in the driver.




On Fri, Aug 8, 2014 at 7:36 AM, Soumitra Kumar kumar.soumi...@gmail.com
wrote:

 I want to keep track of the events processed in a batch.

 How come 'globalCount' work for DStream? I think similar construct won't
 work for RDD, that's why there is accumulator.


 On Fri, Aug 8, 2014 at 12:52 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Do you mean that you want a continuously updated count as more
 events/records are received in the DStream (remember, DStream is a
 continuous stream of data)? Assuming that is what you want, you can use a
 global counter

 var globalCount = 0L

 dstream.count().foreachRDD(rdd = { globalCount += rdd.first() } )

 This globalCount variable will reside in the driver and will keep being
 updated after every batch.

 TD


 On Thu, Aug 7, 2014 at 10:16 PM, Soumitra Kumar kumar.soumi...@gmail.com
  wrote:

 Hello,

 I want to count the number of elements in the DStream, like RDD.count()
 . Since there is no such method in DStream, I thought of using
 DStream.count and use the accumulator.

 How do I do DStream.count() to count the number of elements in a DStream?

 How do I create a shared variable in Spark Streaming?

 -Soumitra.






Re: spark RDD join Error

2014-09-04 Thread Chris Fregly
specifically, you're picking up the following implicit:

import org.apache.spark.SparkContext.rddToPairRDDFunctions

(in case you're a wildcard-phobe like me)


On Thu, Sep 4, 2014 at 5:15 PM, Veeranagouda Mukkanagoudar 
veera...@gmail.com wrote:

 Thanks a lot, that fixed the issue :)


 On Thu, Sep 4, 2014 at 4:51 PM, Zhan Zhang zzh...@hortonworks.com wrote:

 Try this:
 Import org.apache.spark.SparkContext._

 Thanks.

 Zhan Zhang


 On Sep 4, 2014, at 4:36 PM, Veeranagouda Mukkanagoudar 
 veera...@gmail.com wrote:

 I am planning to use RDD join operation, to test out i was trying to
 compile some test code, but am getting following compilation error

 *value join is not a member of org.apache.spark.rdd.RDD[(String, Int)]*
 *[error] rddA.join(rddB).map { case (k, (a, b)) = (k, a+b) }*

 Code:

 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.rdd.RDD

 def joinTest(rddA: RDD[(String, Int)], rddB: RDD[(String, Int)]) :
 RDD[(String, Int)] = {
 rddA.join(rddB).map { case (k, (a, b)) = (k, a+b) }
 }

 Any help would be great .

 Veera



 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity
 to which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader
 of this message is not the intended recipient, you are hereby notified that
 any printing, copying, dissemination, distribution, disclosure or
 forwarding of this communication is strictly prohibited. If you have
 received this communication in error, please contact the sender immediately
 and delete it from your system. Thank You.





Re: saveAsSequenceFile for DStream

2014-08-30 Thread Chris Fregly
couple things to add here:

1) you can import the
org.apache.spark.streaming.dstream.PairDStreamFunctions implicit which adds
a whole ton of functionality to DStream itself.  this lets you work at the
DStream level versus digging into the underlying RDDs.

2) you can use ssc.fileStream(directory) to create an input stream made up
of files in a given directory.  new files will be added to the stream as
they appear in that directory.  note:  files must be immutable.


On Tue, Jul 22, 2014 at 8:39 AM, Barnaby Falls bfa...@outlook.com wrote:

 Thanks Sean! I got that working last night similar to how you solved it.
 Any ideas about how to monitor that same folder in another script by
 creating a stream? I can use sc.sequenceFile() to read in the RDD, but how
 do I get the name of the file that got added since there is no
 sequenceFileStream() method? Thanks again for your help.

  On Jul 22, 2014, at 1:57, Sean Owen so...@cloudera.com wrote:
 
  What about simply:
 
  dstream.foreachRDD(_.saveAsSequenceFile(...))
 
  ?
 
  On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote:
  First of all, I do not know Scala, but learning.
 
  I'm doing a proof of concept by streaming content from a socket,
 counting
  the words and write it to a Tachyon disk. A different script will read
 the
  file stream and print out the results.
 
  val lines = ssc.socketTextStream(args(0), args(1).toInt,
  StorageLevel.MEMORY_AND_DISK_SER)
  val words = lines.flatMap(_.split( ))
  val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)
  wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts)
  ssc.start()
  ssc.awaitTermination()
 
  I already did a proof of concept to write and read sequence files but
 there
  doesn't seem to be a saveAsSequenceFiles() method in DStream. What is
 the
  best way to write out an RDD to a stream so that the timestamps are in
 the
  filenames and so there is minimal overhead in reading the data back in
 as
  objects, see below.
 
  My simple successful proof was the following:
  val rdd =  sc.parallelize(Array((a,2), (b,3), (c,1)))
  rdd.saveAsSequenceFile(tachyon://.../123.sf2)
  val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2)
 
  How can I do something similar with streaming?
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: data locality

2014-08-30 Thread Chris Fregly
you can view the Locality Level of each task within a stage by using the
Spark Web UI under the Stages tab.

levels are as follows (in order of decreasing desirability):
1) PROCESS_LOCAL - data was found directly in the executor JVM
2) NODE_LOCAL - data was found on the same node as the executor JVM
3) RACK_LOCAL - data was found in the same rack
4) ANY - outside the rack

also, the Aggregated Metrics by Executor section of the Stage detail view
shows how much data is being shuffled across the network (Shuffle
Read/Write).  0 is where you wanna be with that metric.

-chris


On Fri, Jul 25, 2014 at 4:13 AM, Tsai Li Ming mailingl...@ltsai.com wrote:

 Hi,

 In the standalone mode, how can we check data locality is working as
 expected when tasks are assigned?

 Thanks!


 On 23 Jul, 2014, at 12:49 am, Sandy Ryza sandy.r...@cloudera.com wrote:

 On standalone there is still special handling for assigning tasks within
 executors.  There just isn't special handling for where to place executors,
 because standalone generally places an executor on every node.


 On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang hw...@qilinsoft.com wrote:

   Sandy,



 I just tried the standalone cluster and didn't have chance to try Yarn
 yet.

 So if I understand correctly, there are **no** special handling of task
 assignment according to the HDFS block's location when Spark is running as
 a **standalone** cluster.

 Please correct me if I'm wrong. Thank you for your patience!


  --

 *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
 *Sent:* 2014年7月22日 9:47

 *To:* user@spark.apache.org
 *Subject:* Re: data locality



 This currently only works for YARN.  The standalone default is to place
 an executor on every node for every job.



 The total number of executors is specified by the user.



 -Sandy



 On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote:

 Sandy,



 Do you mean the “preferred location” is working for standalone cluster
 also? Because I check the code of SparkContext and see comments as below:



   // This is used only by YARN for now, but should be relevant to other
 cluster types (*Mesos*,

   // etc) too. This is typically generated from
 InputFormatInfo.computePreferredLocations. It

   // contains a map from *hostname* to a list of input format splits on
 the host.

   *private*[spark] *var* preferredNodeLocationData: Map[String,
 Set[SplitInfo]] = Map()



 BTW, even with the preferred hosts, how does Spark decide how many total
 executors to use for this application?



 Thanks again!


  --

 *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
 *Sent:* Friday, July 18, 2014 3:44 PM
 *To:* user@spark.apache.org
 *Subject:* Re: data locality



 Hi Haopu,



 Spark will ask HDFS for file block locations and try to assign tasks
 based on these.



 There is a snag.  Spark schedules its tasks inside of executor
 processes that stick around for the lifetime of a Spark application.  Spark
 requests executors before it runs any jobs, i.e. before it has any
 information about where the input data for the jobs is located.  If the
 executors occupy significantly fewer nodes than exist in the cluster, it
 can be difficult for Spark to achieve data locality.  The workaround for
 this is an API that allows passing in a set of preferred locations when
 instantiating a Spark context.  This API is currently broken in Spark 1.0,
 and will likely changed to be something a little simpler in a future
 release.



 val locData = InputFormatInfo.computePreferredLocations

   (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new
 Path(“myfile.txt”)))



 val sc = new SparkContext(conf, locData)



 -Sandy





 On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I have a standalone spark cluster and a HDFS cluster which share some of
 nodes.



 When reading HDFS file, how does spark assign tasks to nodes? Will it ask
 HDFS the location for each file block in order to get a right worker node?



 How about a spark cluster on Yarn?



 Thank you very much!












Re: Low Level Kafka Consumer for Spark

2014-08-28 Thread Chris Fregly
@bharat-

overall, i've noticed a lot of confusion about how Spark Streaming scales -
as well as how it handles failover and checkpointing, but we can discuss
that separately.

there's actually 2 dimensions to scaling here:  receiving and processing.

*Receiving*
receiving can be scaled out by submitting new DStreams/Receivers to the
cluster as i've done in the Kinesis example.  in fact, i purposely chose to
submit multiple receivers in my Kinesis example because i feel it should be
the norm and not the exception - particularly for partitioned and
checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
only way to scale.

a side note here is that each receiver running in the cluster will
immediately replicates to 1 other node for fault-tolerance of that specific
receiver.  this is where the confusion lies.  this 2-node replication is
mainly for failover in case the receiver dies while data is in flight.
 there's still chance for data loss as there's no write ahead log on the
hot path, but this is being addressed.

this in mentioned in the docs here:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

*Processing*
once data is received, tasks are scheduled across the Spark cluster just
like any other non-streaming task where you can specify the number of
partitions for reduces, etc.  this is the part of scaling that is sometimes
overlooked - probably because it works just like regular Spark, but it is
worth highlighting.

Here's a blurb in the docs:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

the other thing that's confusing with Spark Streaming is that in Scala, you
need to explicitly

import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

in order to pick up the implicits that allow DStream.reduceByKey and such
(versus DStream.transform(rddBatch = rddBatch.reduceByKey())

in other words, DStreams appear to be relatively featureless until you
discover this implicit.  otherwise, you need to operate on the underlying
RDD's explicitly which is not ideal.

the Kinesis example referenced earlier in the thread uses the DStream
implicits.


side note to all of this - i've recently convinced my publisher for my
upcoming book, Spark In Action, to let me jump ahead and write the Spark
Streaming chapter ahead of other more well-understood libraries.  early
release is in a month or so.  sign up  @ http://sparkinaction.com if you
wanna get notified.

shameless plug that i wouldn't otherwise do, but i really think it will
help clear a lot of confusion in this area as i hear these questions asked
a lot in my talks and such.  and i think a clear, crisp story on scaling
and fault-tolerance will help Spark Streaming's adoption.

hope that helps!

-chris




On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Kinesis receiver spark streaming partition

2014-08-28 Thread Chris Fregly
great question, wei.  this is very important to understand from a
performance perspective.  and this extends is beyond kinesis - it's for any
streaming source that supports shards/partitions.

i need to do a little research into the internals to confirm my theory.

lemme get back to you!

-chris


On Tue, Aug 26, 2014 at 11:37 AM, Wei Liu wei@stellarloyalty.com
wrote:

 We are exploring using Kinesis and spark streaming together. I took at a
 look at the kinesis receiver code in 1.1.0. I have a question regarding
 kinesis partition  spark streaming partition. It seems to be pretty
 difficult to align these partitions.

 Kinesis partitions a stream of data into shards, if we follow the example,
 we will have multiple kinesis receivers reading from the same stream in
 spark streaming. It seems like kinesis workers will coordinate among
 themselves and assign shards to themselves dynamically. For a particular
 shard, it can be consumed by different kinesis workers (thus different
 spark workers) dynamically (not at the same time). Blocks are generated
 based on time intervals, RDD are created based on blocks. RDDs are
 partitioned based on blocks. At the end, the data for a given shard will be
 spread into multiple blocks (possible located on different spark worker
 nodes).

 We will probably need to group these data again for a given shard and
 shuffle data around to achieve the same partition we had in Kinesis.

 Is there a better way to achieve this to avoid data reshuffling?

 Thanks,
 Wei



Re: Parsing Json object definition spanning multiple lines

2014-08-26 Thread Chris Fregly
i've seen this done using mapPartitions() where each partition represents a
single, multi-line json file.  you can rip through each partition (json
file) and parse the json doc as a whole.

this assumes you use sc.textFile(path/*.json) or equivalent to load in
multiple files at once.  each json file will be a partition.

not sure if this satisfies your use case, but might be a good starting
point.

-chris


On Mon, Jul 14, 2014 at 2:55 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I have a json file where the definition of each object spans multiple
 lines.
 An example of one object definition appears below.

  {
 name: 16287e9cdf,
 width: 500,
 height: 325,
 width: 1024,
 height: 665,
 obj: [
   {
 x: 395.08,
 y: 82.09,
 w: 185.48677,
 h: 185.48677,
 min: 50,
 max: 59,
 attr1: 2,
 attr2: 68,
 attr3: 8
   },
   {
 x: 519.1,
 y: 225.8,
 w: 170,
 h: 171,
 min: 20,
 max: 29,
 attr1: 7,
 attr2: 93,
 attr3: 10
   }
]
 }

 I used the following Spark code to parse the file. However, the parsing is
 failing because I think it expects one Json object definition per line. I
 can try to preprocess the input file  to remove the new lines, but I would
 like to know if it is possible to parse a Json object definition that spans
 multiple lines, directly in Spark.

 val inp = sc.textFile(args(0))
 val res = inp.map(line = { parse(line) })
.map(json =
   {
  implicit lazy val formats =
 org.json4s.DefaultFormats
  val image = (json \ name).extract[String]
   }
 )


 Thanks for  your help.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-Json-object-definition-spanning-multiple-lines-tp9659.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark-Streaming collect/take functionality.

2014-08-26 Thread Chris Fregly
good suggestion, td.

and i believe the optimization that jon.burns is referring to - from the
big data mini course - is a step earlier:  the sorting mechanism that
produces sortedCounts.

you can use mapPartitions() to get a top k locally on each partition, then
shuffle only (k * # of partitions) elements to the driver for sorting -
versus shuffling the whole dataset from all partitions.  network IO saving
technique.


On Tue, Jul 15, 2014 at 9:41 AM, jon.burns jon.bu...@uleth.ca wrote:

 It works perfect, thanks!. I feel like I should have figured that out, I'll
 chalk it up to inexperience with Scala. Thanks again.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-collect-take-functionality-tp9670p9772.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Low Level Kafka Consumer for Spark

2014-08-26 Thread Chris Fregly
great work, Dibyendu.  looks like this would be a popular contribution.

expanding on bharat's question a bit:

what happens if you submit multiple receivers to the cluster by creating
and unioning multiple DStreams as in the kinesis example here:

https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L123

for more context, the kinesis implementation above uses the Kinesis Client
Library (KCL) to automatically assign - and load balance - stream shards
among all KCL threads from all receivers (potentially coming and going as
nodes die) on all executors/nodes using DynamoDB as the association data
store.

ZooKeeper would be used for your Kafka consumers, of course.  and ZooKeeper
watches to handle the ephemeral nodes.  and I see you're using Curator,
which makes things easier.

as bharat suggested, running multiple receivers/dstreams may be desirable
from a scalability and fault tolerance standpoint.  is this type of load
balancing possible among your different Kafka consumers running in
different ephemeral JVMs?

and isn't it fun proposing a popular piece of code?  the question
floodgates have opened!  haha. :)

-chris



On Tue, Aug 26, 2014 at 7:29 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Bharat,

 Thanks for your email. If the Kafka Reader worker process dies, it will
 be replaced by different machine, and it will start consuming from the
 offset where it left over ( for each partition). Same case can happen even
 if I tried to have individual Receiver for every partition.

 Regards,
 Dibyendu


 On Tue, Aug 26, 2014 at 5:43 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:

 I like this consumer for what it promises - better control over offset and
 recovery from failures.  If I understand this right, it still uses single
 worker process to read from Kafka (one thread per partition) - is there a
 way to specify multiple worker processes (on different machines) to read
 from Kafka?  Maybe one worker process for each partition?

 If there is no such option, what happens when the single machine hosting
 the
 Kafka Reader worker process dies and is replaced by a different machine
 (like in cloud)?

 Thanks,
 Bharat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark on Yarn: Connecting to Existing Instance

2014-08-21 Thread Chris Fregly
perhaps the author is referring to Spark Streaming applications?  they're
examples of long-running applications.

the application/domain-level protocol still needs to be implemented
yourself, as sandy pointed out.


On Wed, Jul 9, 2014 at 11:03 AM, John Omernik j...@omernik.com wrote:

 So how do I do the long-lived server continually satisfying requests in
 the Cloudera application? I am very confused by that at this point.


 On Wed, Jul 9, 2014 at 12:49 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Spark doesn't currently offer you anything special to do this.  I.e. if
 you want to write a Spark application that fires off jobs on behalf of
 remote processes, you would need to implement the communication between
 those remote processes and your Spark application code yourself.


 On Wed, Jul 9, 2014 at 10:41 AM, John Omernik j...@omernik.com wrote:

 Thank you for the link.  In that link the following is written:

 For those familiar with the Spark API, an application corresponds to an
 instance of the SparkContext class. An application can be used for a
 single batch job, an interactive session with multiple jobs spaced apart,
 or a long-lived server continually satisfying requests

 So, if I wanted to use a long-lived server continually satisfying
 requests and then start a shell that connected to that context, how would
 I do that in Yarn? That's the problem I am having right now, I just want
 there to be that long lived service that I can utilize.

 Thanks!


 On Wed, Jul 9, 2014 at 11:14 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 To add to Ron's answer, this post explains what it means to run Spark
 against a YARN cluster, the difference between yarn-client and yarn-cluster
 mode, and the reason spark-shell only works in yarn-client mode.

 http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

 -Sandy


 On Wed, Jul 9, 2014 at 9:09 AM, Ron Gonzalez zlgonza...@yahoo.com
 wrote:

 The idea behind YARN is that you can run different application types
 like MapReduce, Storm and Spark.

 I would recommend that you build your spark jobs in the main method
 without specifying how you deploy it. Then you can use spark-submit to 
 tell
 Spark how you would want to deploy to it using yarn-cluster as the master.
 The key point here is that once you have YARN setup, the spark client
 connects to it using the $HADOOP_CONF_DIR that contains the resource
 manager address. In particular, this needs to be accessible from the
 classpath of the submitter since it implicitly uses this when it
 instantiates a YarnConfiguration instance. If you want more details, read
 org.apache.spark.deploy.yarn.Client.scala.

 You should be able to download a standalone YARN cluster from any of
 the Hadoop providers like Cloudera or Hortonworks. Once you have that, the
 spark programming guide describes what I mention above in sufficient 
 detail
 for you to proceed.

 Thanks,
 Ron

 Sent from my iPad

  On Jul 9, 2014, at 8:31 AM, John Omernik j...@omernik.com wrote:
 
  I am trying to get my head around using Spark on Yarn from a
 perspective of a cluster. I can start a Spark Shell no issues in Yarn.
 Works easily.  This is done in yarn-client mode and it all works well.
 
  In multiple examples, I see instances where people have setup Spark
 Clusters in Stand Alone mode, and then in the examples they connect to
 this cluster in Stand Alone mode. This is done often times using the
 spark:// string for connection.  Cool. s
  But what I don't understand is how do I setup a Yarn instance that I
 can connect to? I.e. I tried running Spark Shell in yarn-cluster mode 
 and
 it gave me an error, telling me to use yarn-client.  I see information on
 using spark-class or spark-submit.  But what I'd really like is a instance
 I can connect a spark-shell too, and have the instance stay up. I'd like 
 to
 be able run other things on that instance etc. Is that possible with Yarn?
 I know there may be long running job challenges with Yarn, but I am just
 testing, I am just curious if I am looking at something completely bonkers
 here, or just missing something simple.
 
  Thanks!
 
 








Re: Spark Streaming - What does Spark Streaming checkpoint?

2014-08-21 Thread Chris Fregly
The StreamingContext can be recreated from a checkpoint file, indeed.

check out the following Spark Streaming source files for details:
 StreamingContext, Checkpoint, DStream, DStreamCheckpoint, and DStreamGraph.




On Wed, Jul 9, 2014 at 6:11 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi guys,

 I am a little confusing by the checkpointing in Spark Streaming. It
 checkpoints the intermediate data for the stateful operations for sure.
 Does it also checkpoint the information of StreamingContext? Because it
 seems we can recreate the SC from the checkpoint in a driver node failure
 scenario. When I looked at the checkpoint directory, did not find much
 clue. Any help? Thank you very much.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108



Re: Task's Scheduler Delay in web ui

2014-08-19 Thread Chris Fregly
Scheduling Delay is the time required to assign a task to an available
resource.

if you're seeing large scheduler delays, this likely means that other
jobs/tasks are using up all of the resources.

here's some more info on how to setup Fair Scheduling versus the default
FIFO Scheduler:  https://spark.apache.org/docs/latest/job-scheduling.html

of course, increasing the cluster size would help assuming resources are
being allocated fairly.

also, delays can vary depending on the cluster resource manager that you're
using (spark standalone, yarn, mesos).

-chris


On Tue, Jul 8, 2014 at 4:14 AM, haopu hw...@qilinsoft.com wrote:

 What's the meaning of a Task's Scheduler Delay in the web ui?
 And what could cause that delay? Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Task-s-Scheduler-Delay-in-web-ui-tp9019.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?

2014-08-19 Thread Chris Fregly
this would be awesome.  did a jira get created for this?  I searched, but
didn't find one.

thanks!

-chris


On Tue, Jul 8, 2014 at 1:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com
wrote:

 Thanks a lot Xiangrui. This will help.


 On Wed, Jul 9, 2014 at 1:34 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Rahul,

 We plan to add online model updates with Spark Streaming, perhaps in
 v1.1, starting with linear methods. Please open a JIRA for Naive
 Bayes. For Naive Bayes, we need to update the priors and conditional
 probabilities, which means we should also remember the number of
 observations for the updates.

 Best,
 Xiangrui

 On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani
 rahulbhojwani2...@gmail.com wrote:
  Hi,
 
  I am using the MLlib Naive Bayes for a text classification problem. I
 have
  very less amount of training data. And then the data will be coming
  continuously and I need to classify it as either A or B. I am training
 the
  MLlib Naive Bayes model using the training data but next time when data
  comes, I want to predict its class and then incorporate that also in the
  model for next time prediction of new data(I think that is obvious).
 
  So I am not able to figure out what is the way to do that using MLlib
 Naive
  Bayes. Is it that I have to train the model on the whole data every
 time new
  data comes in??
 
  Thanks in Advance!
  --
  Rahul K Bhojwani
  3rd Year B.Tech
  Computer Science and Engineering
  National Institute of Technology, Karnataka




 --
 Rahul K Bhojwani
 3rd Year B.Tech
 Computer Science and Engineering
 National Institute of Technology, Karnataka



Re: slower worker node in the cluster

2014-08-19 Thread Chris Fregly
perhaps creating Fair Scheduler Pools might help?  there's no way to pin
certain nodes to a pool, but you can specify minShares (cpu's).  not sure
if that would help, but worth looking in to.


On Tue, Jul 8, 2014 at 7:37 PM, haopu hw...@qilinsoft.com wrote:

 In a standalone cluster, is there way to specify the stage to be running
 on a
 faster worker?

 That stage is reading HDFS file and then doing some filter operations.  The
 tasks are assigned to the slower worker also, but the slower worker delays
 to launch because it's running some tasks from other stages.

 So I think it may be better to assign stage to a worker. Any suggestions?

 And will the cluster on Yarn help?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/slower-worker-node-in-the-cluster-tp9125.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark Streaming source from Amazon Kinesis

2014-07-22 Thread Chris Fregly
i took this over from parviz.

i recently submitted a new PR for Kinesis Spark Streaming support:
https://github.com/apache/spark/pull/1434

others have tested it with good success, so give it a whirl!

waiting for it to be reviewed/merged.  please put any feedback into the PR
directly.

thanks!

-chris


On Mon, Apr 21, 2014 at 2:39 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 No worries, looking forward to it!

 Matei

 On Apr 21, 2014, at 1:59 PM, Parviz Deyhim pdey...@gmail.com wrote:

 sorry Matei. Will definitely start working on making the changes soon :)


 On Mon, Apr 21, 2014 at 1:10 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 There was a patch posted a few weeks ago (
 https://github.com/apache/spark/pull/223), but it needs a few changes in
 packaging because it uses a license that isn’t fully compatible with
 Apache. I’d like to get this merged when the changes are made though — it
 would be a good input source to support.

 Matei


 On Apr 21, 2014, at 1:00 PM, Nicholas Chammas nicholas.cham...@gmail.com
 wrote:

 I'm looking to start experimenting with Spark Streaming, and I'd like to
 use Amazon Kinesis https://aws.amazon.com/kinesis/ as my data source.
 Looking at the list of supported Spark Streaming sources
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking,
 I don't see any mention of Kinesis.

 Is it possible to use Spark Streaming with Amazon Kinesis? If not, are
 there plans to add such support in the future?

 Nick


 --
 View this message in context: Spark Streaming source from Amazon Kinesis
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-source-from-Amazon-Kinesis-tp4550.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com
 http://nabble.com/.







Re: multiple passes in mapPartitions

2014-07-01 Thread Chris Fregly
also, multiple calls to mapPartitions() will be pipelined by the spark
execution engine into a single stage, so the overhead is minimal.


On Fri, Jun 13, 2014 at 9:28 PM, zhen z...@latrobe.edu.au wrote:

 Thank you for your suggestion. We will try it out and see how it performs.
 We
 think the single call to mapPartitions will be faster but we could be
 wrong.
 It would be nice to have a clone method on the iterator.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/multiple-passes-in-mapPartitions-tp7555p7616.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Fw: How Spark Choose Worker Nodes for respective HDFS block

2014-07-01 Thread Chris Fregly
yes, spark attempts to achieve data locality (PROCESS_LOCAL or NODE_LOCAL)
where possible just like MapReduce.  it's a best practice to co-locate your
Spark Workers on the same nodes as your HDFS Name Nodes for just this
reason.

this is achieved through the RDD.preferredLocations() interface method:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

on a related note, you can configure spark.locality.wait as the number of
millis to wait before falling back to a less-local data node (RACK_LOCAL):
  http://spark.apache.org/docs/latest/configuration.html

-chris


On Fri, Jun 13, 2014 at 11:06 PM, anishs...@yahoo.co.in 
anishs...@yahoo.co.in wrote:

 Hi All

 Is there any communication between Spark MASTER node and Hadoop NameNode
 while distributing work to WORKER nodes, like we have in MapReduce.

 Please suggest

 TIA

 --
 Anish Sneh
 Experience is the best teacher.
 http://in.linkedin.com/in/anishsneh


  --
 * From: * anishs...@yahoo.co.in anishs...@yahoo.co.in;
 * To: * u...@spark.incubator.apache.org u...@spark.incubator.apache.org;

 * Subject: * How Spark Choose Worker Nodes for respective HDFS block
 * Sent: * Fri, Jun 13, 2014 9:17:50 PM

   Hi All

 I am new to Spark, workin on 3 node test cluster. I am trying to explore
 Spark scope in analytics, my Spark codes interacts with HDFS mostly.

 I have a confusion that how Spark choose on which node it will distribute
 its work.

 Since we assume that it can be an alternative to Hadoop MapReduce. In
 MapReduce we know that internally framework will distribute code (or logic)
 to the nearest TaskTracker which are co-located with DataNode or in same
 rack or probably nearest to the data blocks.

 My confusion is when I give HDFS path inside a Spark program how it choose
 which node is nearest (if it does).

 If it does not then how it will work when I have TBs of data where high
 network latency will be involved.

 My apologies for asking basic question, please suggest.

 TIA
 --
 Anish Sneh
 Experience is the best teacher.
 http://www.anishsneh.com



Re: Selecting first ten values in a RDD/partition

2014-06-29 Thread Chris Fregly
as brian g alluded to earlier, you can use DStream.mapPartitions() to
return  the partition-local top 10 for each partition.  once you collect
the results from all the partitions, you can do a global top 10 merge sort
across all partitions.

this leads to a much much-smaller dataset to be shuffled back to the driver
to calculate the global top 10.


On Fri, May 30, 2014 at 5:05 AM, nilmish nilmish@gmail.com wrote:

 My primary goal : To get top 10 hashtag for every 5 mins interval.

 I want to do this efficiently. I have already done this by using
 reducebykeyandwindow() and then sorting all hashtag in 5 mins interval
 taking only top 10 elements. But this is very slow.

 So I now I am thinking of retaining only top 10 hashtags in each RDD
 because
 these only could come in the final answer.

 I am stuck at : how to retain only top 10 hashtag in each RDD of my DSTREAM
 ? Basically I need to transform my DTREAM in which each RDD contains only
 top 10 hashtags so that number of hashtags in 5 mins interval is low.

 If there is some more efficient way of doing this then please let me know
 that also.

 Thanx,
 Nilesh



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517p6577.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Reconnect to an application/RDD

2014-06-29 Thread Chris Fregly
Tachyon is another option - this is the off heap StorageLevel specified
when persisting RDDs:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.storage.StorageLevel

or just use HDFS.  this requires subsequent Applications/SparkContext's to
reload the data from disk, of course.


On Tue, Jun 3, 2014 at 6:58 AM, Gerard Maas gerard.m...@gmail.com wrote:

 I don't think that's supported by default as when the standalone context
 will close, the related RDDs will be GC'ed

 You should explore Spark-Job Server, which allows to cache RDDs by name
 and reuse them within  a  context.

 https://github.com/ooyala/spark-jobserver

 -kr, Gerard.


 On Tue, Jun 3, 2014 at 3:45 PM, Oleg Proudnikov oleg.proudni...@gmail.com
  wrote:

 HI All,

 Is it possible to run a standalone app that would compute and
 persist/cache an RDD and then run other standalone apps that would gain
 access to that RDD?

 --
 Thank you,
 Oleg





Re: spark streaming question

2014-05-04 Thread Chris Fregly
great questions, weide.  in addition, i'd also like to hear more about how
to horizontally scale a spark-streaming cluster.

i've gone through the samples (standalone mode) and read the documentation,
but it's still not clear to me how to scale this puppy out under high load.
 i assume i add more receivers (kinesis, flume, etc), but physically how
does this work?

@TD:  can you comment?

thanks!

-chris


On Sun, May 4, 2014 at 2:10 PM, Weide Zhang weo...@gmail.com wrote:

 Hi ,

 It might be a very general question to ask here but I'm curious to know
 why spark streaming can achieve better throughput than storm as claimed in
 the spark streaming paper. Does it depend on certain use cases and/or data
 source ? What drives better performance in spark streaming case or in other
 ways, what makes storm not as performant as spark streaming ?

 Also, in order to guarantee exact-once semantics when node failure
 happens,  spark makes replicas of RDDs and checkpoints so that data can be
 recomputed on the fly while on Trident case, they use transactional object
 to persist the state and result but it's not obvious to me which approach
 is more costly and why ? Any one can provide some experience here ?

 Thanks a lot,

 Weide



Re: Multiple Streams with Spark Streaming

2014-05-03 Thread Chris Fregly
if you want to use true Spark Streaming (not the same as Hadoop
Streaming/Piping, as Mayur pointed out), you can use the DStream.union()
method as described in the following docs:

http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html
http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html

our friend, diana carroll, from cloudera recently posted a nice little
utility for sending files to a Spark Streaming Receiver to simulate a
streaming scenario from disk.

here's the link to her post:
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tc3431.html

-chris

On Thu, May 1, 2014 at 3:09 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 File as a stream?
 I think you are confusing Spark Streaming with buffer reader. Spark
 streaming is meant to process batches of data (files, packets, messages) as
 they come in, infact utilizing time of packet reception as a way to create
 windows etc.

 In your case you are better off reading the file, partitioning it 
 operating on each column individually if that makes more sense to you.



 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, May 1, 2014 at 3:24 PM, Laeeq Ahmed laeeqsp...@yahoo.com wrote:

 Hi all,

 Is it possible to read and process multiple streams with spark. I have
 eeg(brain waves) csv file with 23 columns  Each column is one stream(wave)
 and each column has one million values.

 I know one way to do it is to take transpose of the file and then give it
 to spark and each mapper will get one or more waves out of the 23 waves,
 but then it will be non-streaming problem and I want to read the file as
 stream. Please correct me if I am wrong.

 I have to apply simple operations(mean and SD) on each window of a wave.

 Regards,
 Laeeq






Re: Reading multiple S3 objects, transforming, writing back one

2014-05-03 Thread Chris Fregly
not sure if this directly addresses your issue, peter, but it's worth
mentioned a handy AWS EMR utility called s3distcp that can upload a single
HDFS file - in parallel - to a single, concatenated S3 file once all the
partitions are uploaded.  kinda cool.

here's some info:

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/UsingEMR_s3distcp.html


s3distcp is an extension of the familiar hadoop distcp, of course.


On Thu, May 1, 2014 at 11:41 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 The fastest way to save to S3 should be to leave the RDD with many
 partitions, because all partitions will be written out in parallel.

 Then, once the various parts are in S3, somehow concatenate the files
 together into one file.

 If this can be done within S3 (I don't know if this is possible), then you
 get the best of both worlds: a highly parallelized write to S3, and a
 single cleanly named output file.


 On Thu, May 1, 2014 at 12:52 PM, Peter thenephili...@yahoo.com wrote:

 Thank you Patrick.

 I took a quick stab at it:

 val s3Client = new AmazonS3Client(...)
 val copyObjectResult = s3Client.copyObject(upload, outputPrefix +
 /part-0, rolled-up-logs, 2014-04-28.csv)
 val objectListing = s3Client.listObjects(upload, outputPrefix)
 s3Client.deleteObjects(new
 DeleteObjectsRequest(upload).withKeys(objectListing.getObjectSummaries.asScala.map(s
 = new KeyVersion(s.getKey)).asJava))

  Using a 3GB object I achieved about 33MB/s between buckets in the same
 AZ.

 This is a workable solution for the short term but not ideal for the
 longer term as data size increases. I understand it's a limitation of the
 Hadoop API but ultimately it must be possible to dump a RDD to a single S3
 object :)

   On Wednesday, April 30, 2014 7:01 PM, Patrick Wendell 
 pwend...@gmail.com wrote:
  This is a consequence of the way the Hadoop files API works. However,
 you can (fairly easily) add code to just rename the file because it
 will always produce the same filename.

 (heavy use of pseudo code)

 dir = /some/dir
 rdd.coalesce(1).saveAsTextFile(dir)
 f = new File(dir + part-0)
 f.moveTo(somewhere else)
 dir.remove()

 It might be cool to add a utility called `saveAsSingleFile` or
 something that does this for you. In fact probably we should have
 called saveAsTextfile saveAsTextFiles to make it more clear...

 On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote:
  Thanks Nicholas, this is a bit of a shame, not very practical for log
 roll
  up for example when every output needs to be in it's own directory.
  On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
  nicholas.cham...@gmail.com wrote:
  Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
  coalesce(1), you move everything in the RDD to a single partition, which
  then gives you 1 output file.
  It will still be called part-0 or something like that because that's
  defined by the Hadoop API that Spark uses for reading to/writing from
 S3. I
  don't know of a way to change that.
 
 
  On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:
 
  Ah, looks like RDD.coalesce(1) solves one part of the problem.
  On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com
  wrote:
  Hi
 
  Playing around with Spark  S3, I'm opening multiple objects (CSV files)
  with:
 
 val hfile = sc.textFile(s3n://bucket/2014-04-28/)
 
  so hfile is a RDD representing 10 objects that were underneath
 2014-04-28.
  After I've sorted and otherwise transformed the content, I'm trying to
 write
  it back to a single object:
 
 
 
 sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)
 
  unfortunately this results in a folder named concatted.csv with 10
 objects
  underneath, part-0 .. part-00010, corresponding to the 10 original
  objects loaded.
 
  How can I achieve the desired behaviour of putting a single object named
  concatted.csv ?
 
  I've tried 0.9.1 and 1.0.0-RC3.
 
  Thanks!
  Peter
 
 
 
 
 
 
 






Re: help me

2014-05-03 Thread Chris Fregly
as Mayur indicated, it's odd that you are seeing better performance from a
less-local configuration.  however, the non-deterministic behavior that you
describe is likely caused by GC pauses in your JVM process.

take note of the *spark.locality.wait* configuration parameter described
here: http://spark.apache.org/docs/latest/configuration.html

this is the amount of time the Spark execution engine waits before
launching a new task on a less-data-local node (ie. process - node -
rack).  by default, this is 3 seconds.

if there is excessive GC occurring on the original process-local JVM, it is
possible that another node-local JVM process could actually load the data
from HDFS (on the same node) and complete the processing before the
original process's GC finishes.

you could bump up the *spark.locality.wait* default (not recommended) or
increase your number of nodes/partitions to increase parallelism and reduce
hotspots.

also, keep an eye on your GC characteristics.  perhaps you need to increase
your Eden size to reduce the amount of movement through the GC generations
and reduce major compactions.  (the usual GC tuning fun.)

curious if others have experienced this behavior, as well?

-chris


On Fri, May 2, 2014 at 6:07 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Spark would be much faster on process_local instead of node_local.
 Node_local references data from local harddisk, process_local references
 data from in-memory thread.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Tue, Apr 22, 2014 at 4:45 PM, Joe L selme...@yahoo.com wrote:

 I got the following performance is it normal in spark to be like this.
 some
 times spark switchs into node_local mode from process_local and it becomes
 10x faster. I am very confused.

 scala val a = sc.textFile(/user/exobrain/batselem/LUBM1000)
 scala f.count()

 Long = 137805557
 took 130.809661618 s




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/help-me-tp4598.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: function state lost when next RDD is processed

2014-04-13 Thread Chris Fregly
or how about the UpdateStateByKey() operation?

https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html

the StatefulNetworkWordCount example demonstrates how to keep state across RDDs.

 On Mar 28, 2014, at 8:44 PM, Mayur Rustagi mayur.rust...@gmail.com wrote:
 
 Are you referring to Spark Streaming?
 
 Can you save the sum as a RDD  keep joining the two rdd together?
 
 Regards
 Mayur
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 
 On Fri, Mar 28, 2014 at 10:47 AM, Adrian Mocanu amoc...@verticalscope.com 
 wrote:
 Thanks!
 
  
 
 Ya that’s what I’m doing so far, but I wanted to see if it’s possible to 
 keep the tuples inside Spark for fault tolerance purposes.
 
  
 
 -A
 
 From: Mark Hamstra [mailto:m...@clearstorydata.com] 
 Sent: March-28-14 10:45 AM
 To: user@spark.apache.org
 Subject: Re: function state lost when next RDD is processed
 
  
 
 As long as the amount of state being passed is relatively small, it's 
 probably easiest to send it back to the driver and to introduce it into RDD 
 transformations as the zero value of a fold.
 
  
 
 On Fri, Mar 28, 2014 at 7:12 AM, Adrian Mocanu amoc...@verticalscope.com 
 wrote:
 
 I’d like to resurrect this thread since I don’t have an answer yet.
 
  
 
 From: Adrian Mocanu [mailto:amoc...@verticalscope.com] 
 Sent: March-27-14 10:04 AM
 To: u...@spark.incubator.apache.org
 Subject: function state lost when next RDD is processed
 
  
 
 Is there a way to pass a custom function to spark to run it on the entire 
 stream? For example, say I have a function which sums up values in each RDD 
 and then across RDDs.
 
  
 
 I’ve tried with map, transform, reduce. They all apply my sum function on 1 
 RDD. When the next RDD comes the function starts from 0 so the sum of the 
 previous RDD is lost.
 
  
 
 Does Spark support a way of passing a custom function so that its state is 
 preserved across RDDs and not only within RDD?
 
  
 
 Thanks
 
 -Adrian
 
 


Re: network wordcount example

2014-03-31 Thread Chris Fregly
@eric-

i saw this exact issue recently while working on the KinesisWordCount.

are you passing local[2] to your example as the MASTER arg versus just
local or local[1]?

you need at least 2.  it's documented as n1 in the scala source docs -
which is easy to mistake for n=1.

i just ran the NetworkWordCount sample and confirmed that local[1] does not
work, but  local[2] does work.

give that a whirl.

-chris




On Mon, Mar 31, 2014 at 10:41 AM, Diana Carroll dcarr...@cloudera.comwrote:

 Not sure what data you are sending in.  You could try calling
 lines.print() instead which should just output everything that comes in
 on the stream.  Just to test that your socket is receiving what you think
 you are sending.


 On Mon, Mar 31, 2014 at 12:18 PM, eric perler ericper...@hotmail.comwrote:

 Hello

 i just started working with spark today... and i am trying to run the
 wordcount network example

 i created a socket server and client.. and i am sending data to the
 server in an infinite loop

 when i run the spark class.. i see this output in the console...

 ---
 Time: 1396281891000 ms
 ---

 14/03/31 11:04:51 INFO SparkContext: Job finished: take at
 DStream.scala:586, took 0.056794606 s
 14/03/31 11:04:51 INFO JobScheduler: Finished job streaming job
 1396281891000 ms.0 from job set of time 1396281891000 ms
 14/03/31 11:04:51 INFO JobScheduler: Total delay: 0.101 s for time
 1396281891000 ms (execution: 0.058 s)
 14/03/31 11:04:51 INFO TaskSchedulerImpl: Remove TaskSet 3.0 from pool

 but i dont see any output from the workcount operation when i make this
 call...

 wordCounts.print();

 any help is greatly appreciated

 thanks in advance