Re: Create a Column expression from a String

2016-11-19 Thread Luciano Resende
Are you looking for UDFs?

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-udfs.html

On Sun, Nov 20, 2016 at 3:12 AM Stuart White 
wrote:

> I'd like to allow for runtime-configured Column expressions in my
> Spark SQL application.  For example, if my application needs a 5-digit
> zip code, but the file I'm processing contains a 9-digit zip code, I'd
> like to be able to configure my application with the expression
> "substring('zipCode, 0, 5)" to use for the zip code.
>
> So, I think I'm looking for something like this:
>
> def parseColumnExpression(colExpr: String) : Column
>
> I see that SparkSession's sql() method exists to take a string and
> parse it into a DataFrame.  But that's not quite what I want.
>
> Does a mechanism exist that would allow me to take a string
> representation of a column expression and parse it into an actual
> column expression (something that could be use in a .select() call,
> for example)?
>
> Thanks!
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Sent from my Mobile device


Create a Column expression from a String

2016-11-19 Thread Stuart White
I'd like to allow for runtime-configured Column expressions in my
Spark SQL application.  For example, if my application needs a 5-digit
zip code, but the file I'm processing contains a 9-digit zip code, I'd
like to be able to configure my application with the expression
"substring('zipCode, 0, 5)" to use for the zip code.

So, I think I'm looking for something like this:

def parseColumnExpression(colExpr: String) : Column

I see that SparkSession's sql() method exists to take a string and
parse it into a DataFrame.  But that's not quite what I want.

Does a mechanism exist that would allow me to take a string
representation of a column expression and parse it into an actual
column expression (something that could be use in a .select() call,
for example)?

Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: using StreamingKMeans

2016-11-19 Thread Debasish Ghosh
I share both the concerns that u have expressed. And as I mentioned in my
earlier mail, offline (batch) training is an option if I get a dataset
without outliers. In that case I can train and have a model. I find the
model parameters, which will be the mean distance to the centroid. Note in
training I will have only 1 cluster as it's only normal data (no outlier).

I can now pass these parameters to the prediction phase which can work on
streaming data. In the prediction phase I just compute the distance to
centroid for each point and flag the violating ones as outliers.

This looks like a perfectly valid option if I get a dataset with no
outliers to train on.

Now my question is what then is the use case in which we can use
StreamingKMeans ? In the above scenario we use batch KMeans in training
phase while we just compute the distance in the prediction phase. And how
do we address the scenario where we have only one stream of data available ?

regards.

On Sun, 20 Nov 2016 at 6:07 AM, ayan guha  wrote:

> Here are 2 concerns I would have with the design (This discussion is
> mostly to validate my own understanding)
>
> 1. if you have outliers "before" running k-means, aren't your centroids
> get skewed? In other word, outliers by themselves may bias the cluster
> evaluation, isn't it?
> 2. Typically microbatches are small, like 3 sec in your case. in this
> window you may not have enough data to run any statistically sigficant
> operation, can you?
>
> My approach would have been: Run K-means on data without outliers (in
> batch mode). Determine the model, ie centroids in case of kmeans. Then load
> the model in your streaming app and just apply "outlier detection"
> function, which takes the form of
>
> def detectOutlier(model,data):
>   /// your code, like mean distance etc
>   return T or F
>
> In response to your point about "alternet set of data", I would assume you
> would accumulate the data you are receiving from streaming over few weeks
> or months before running offline training.
>
> Am I missing something?
>
> On Sun, Nov 20, 2016 at 10:29 AM, Debasish Ghosh  > wrote:
>
> Looking for alternative suggestions in case where we have 1 continuous
> stream of data. Offline training and online prediction can be one option if
> we can have an alternate set of data to train. But if it's one single
> stream you don't have separate sets for training or cross validation.
>
> So whatever data u get in each micro batch, train on them and u get the
> cluster centroids from the model. Then apply some heuristics like mean
> distance from centroid and detect outliers. So for every microbatch u get
> the outliers based on the model and u can control forgetfulness of the
> model through the decay factor that u specify for StramingKMeans.
>
> Suggestions ?
>
> regards.
>
> On Sun, 20 Nov 2016 at 3:51 AM, ayan guha  wrote:
>
> Curious why do you want to train your models every 3 secs?
> On 20 Nov 2016 06:25, "Debasish Ghosh"  wrote:
>
> Thanks a lot for the response.
>
> Regarding the sampling part - yeah that's what I need to do if there's no
> way of titrating the number of clusters online.
>
> I am using something like
>
> dstream.foreachRDD { rdd =>
>   if (rdd.count() > 0) { //.. logic
>   }
> }
>
> Feels a little odd but if that's the idiom then I will stick to it.
>
> regards.
>
>
>
> On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger 
> wrote:
>
> So I haven't played around with streaming k means at all, but given
> that no one responded to your message a couple of days ago, I'll say
> what I can.
>
> 1. Can you not sample out some % of the stream for training?
> 2. Can you run multiple streams at the same time with different values
> for k and compare their performance?
> 3. foreachRDD is fine in general, can't speak to the specifics.
> 4. If you haven't done any transformations yet on a direct stream,
> foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
> is very cheap, it's done on the driver only because the beginning and
> ending offsets are known.  So you should be able to skip empty
> batches.
>
>
>
> On Sat, Nov 19, 2016 at 10:46 AM, debasishg 
> wrote:
> > Hello -
> >
> > I am trying to implement an outlier detection application on streaming
> data.
> > I am a newbie to Spark and hence would like some advice on the confusions
> > that I have ..
> >
> > I am thinking of using StreamingKMeans - is this a good choice ? I have
> one
> > stream of data and I need an online algorithm. But here are some
> questions
> > that immediately come to my mind ..
> >
> > 1. I cannot do separate training, cross validation etc. Is this a good
> idea
> > to do training and prediction online ?
> >
> > 2. The data will be read from the stream coming from Kafka in
> microbatches
> > of (say) 3 seconds. I get a DStream on which I train and get the
> 

Re: HPC with Spark? Simultaneous, parallel one to one mapping of partition to vcore

2016-11-19 Thread Stephen Boesch
While "apparently" saturating the N available workers using your proposed N
partitions - the "actual" distribution of workers to tasks is controlled by
the scheduler.  If my past experience were of service - you can *not *trust
the default Fair Scheduler to ensure the round-robin scheduling of the
tasks: you may well end up with tasks being queued.

The suggestion is to try it out on the resource manager and scheduler being
used for your deployment. You may need to swap out their default scheduler
for a true round robin.

2016-11-19 16:44 GMT-08:00 Adam Smith :

> Dear community,
>
> I have a RDD with N rows and N partitions. I want to ensure that the
> partitions run all at the some time, by setting the number of vcores
> (spark-yarn) to N. The partitions need to talk to each other with some
> socket based sync that is why I need them to run more or less
> simultaneously.
>
> Let's assume no node will die. Will my setup guarantee that all partitions
> are computed in parallel?
>
> I know this is somehow hackish. Is there a better way doing so?
>
> My goal is replicate message passing (like OpenMPI) with spark, where I
> have very specific and final communcation requirements. So no need for the
> many comm and sync funtionality, just what I already have - sync and talk.
>
> Thanks!
> Adam
>
>


HPC with Spark? Simultaneous, parallel one to one mapping of partition to vcore

2016-11-19 Thread Adam Smith
Dear community,

I have a RDD with N rows and N partitions. I want to ensure that the
partitions run all at the some time, by setting the number of vcores
(spark-yarn) to N. The partitions need to talk to each other with some
socket based sync that is why I need them to run more or less
simultaneously.

Let's assume no node will die. Will my setup guarantee that all partitions
are computed in parallel?

I know this is somehow hackish. Is there a better way doing so?

My goal is replicate message passing (like OpenMPI) with spark, where I
have very specific and final communcation requirements. So no need for the
many comm and sync funtionality, just what I already have - sync and talk.

Thanks!
Adam


Re: using StreamingKMeans

2016-11-19 Thread ayan guha
Here are 2 concerns I would have with the design (This discussion is mostly
to validate my own understanding)

1. if you have outliers "before" running k-means, aren't your centroids get
skewed? In other word, outliers by themselves may bias the cluster
evaluation, isn't it?
2. Typically microbatches are small, like 3 sec in your case. in this
window you may not have enough data to run any statistically sigficant
operation, can you?

My approach would have been: Run K-means on data without outliers (in batch
mode). Determine the model, ie centroids in case of kmeans. Then load the
model in your streaming app and just apply "outlier detection" function,
which takes the form of

def detectOutlier(model,data):
  /// your code, like mean distance etc
  return T or F

In response to your point about "alternet set of data", I would assume you
would accumulate the data you are receiving from streaming over few weeks
or months before running offline training.

Am I missing something?

On Sun, Nov 20, 2016 at 10:29 AM, Debasish Ghosh 
wrote:

> Looking for alternative suggestions in case where we have 1 continuous
> stream of data. Offline training and online prediction can be one option if
> we can have an alternate set of data to train. But if it's one single
> stream you don't have separate sets for training or cross validation.
>
> So whatever data u get in each micro batch, train on them and u get the
> cluster centroids from the model. Then apply some heuristics like mean
> distance from centroid and detect outliers. So for every microbatch u get
> the outliers based on the model and u can control forgetfulness of the
> model through the decay factor that u specify for StramingKMeans.
>
> Suggestions ?
>
> regards.
>
> On Sun, 20 Nov 2016 at 3:51 AM, ayan guha  wrote:
>
>> Curious why do you want to train your models every 3 secs?
>> On 20 Nov 2016 06:25, "Debasish Ghosh"  wrote:
>>
>> Thanks a lot for the response.
>>
>> Regarding the sampling part - yeah that's what I need to do if there's no
>> way of titrating the number of clusters online.
>>
>> I am using something like
>>
>> dstream.foreachRDD { rdd =>
>>   if (rdd.count() > 0) { //.. logic
>>   }
>> }
>>
>> Feels a little odd but if that's the idiom then I will stick to it.
>>
>> regards.
>>
>>
>>
>> On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger 
>> wrote:
>>
>> So I haven't played around with streaming k means at all, but given
>> that no one responded to your message a couple of days ago, I'll say
>> what I can.
>>
>> 1. Can you not sample out some % of the stream for training?
>> 2. Can you run multiple streams at the same time with different values
>> for k and compare their performance?
>> 3. foreachRDD is fine in general, can't speak to the specifics.
>> 4. If you haven't done any transformations yet on a direct stream,
>> foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
>> is very cheap, it's done on the driver only because the beginning and
>> ending offsets are known.  So you should be able to skip empty
>> batches.
>>
>>
>>
>> On Sat, Nov 19, 2016 at 10:46 AM, debasishg 
>> wrote:
>> > Hello -
>> >
>> > I am trying to implement an outlier detection application on streaming
>> data.
>> > I am a newbie to Spark and hence would like some advice on the
>> confusions
>> > that I have ..
>> >
>> > I am thinking of using StreamingKMeans - is this a good choice ? I have
>> one
>> > stream of data and I need an online algorithm. But here are some
>> questions
>> > that immediately come to my mind ..
>> >
>> > 1. I cannot do separate training, cross validation etc. Is this a good
>> idea
>> > to do training and prediction online ?
>> >
>> > 2. The data will be read from the stream coming from Kafka in
>> microbatches
>> > of (say) 3 seconds. I get a DStream on which I train and get the
>> clusters.
>> > How can I decide on the number of clusters ? Using StreamingKMeans is
>> there
>> > any way I can iterate on microbatches with different values of k to
>> find the
>> > optimal one ?
>> >
>> > 3. Even if I fix k, after training on every microbatch I get a DStream.
>> How
>> > can I compute things like clustering score on the DStream ?
>> > StreamingKMeansModel has a computeCost function but it takes an RDD. I
>> can
>> > use dstream.foreachRDD { // process RDD for the micro batch here } - is
>> this
>> > the idiomatic way ?
>> >
>> > 4. If I use dstream.foreachRDD { .. } and use functions like new
>> > StandardScaler().fit(rdd) to do feature normalization, then it works
>> when I
>> > have data in the stream. But when the microbatch is empty (say I don't
>> have
>> > data for some time), the fit method throws exception as it gets an empty
>> > collection. Things start working ok when data starts coming back to the
>> > stream. But is this the way to go ?
>> >
>> > any suggestion will be welcome ..
>> >

Re: using StreamingKMeans

2016-11-19 Thread Debasish Ghosh
Looking for alternative suggestions in case where we have 1 continuous
stream of data. Offline training and online prediction can be one option if
we can have an alternate set of data to train. But if it's one single
stream you don't have separate sets for training or cross validation.

So whatever data u get in each micro batch, train on them and u get the
cluster centroids from the model. Then apply some heuristics like mean
distance from centroid and detect outliers. So for every microbatch u get
the outliers based on the model and u can control forgetfulness of the
model through the decay factor that u specify for StramingKMeans.

Suggestions ?

regards.

On Sun, 20 Nov 2016 at 3:51 AM, ayan guha  wrote:

> Curious why do you want to train your models every 3 secs?
> On 20 Nov 2016 06:25, "Debasish Ghosh"  wrote:
>
> Thanks a lot for the response.
>
> Regarding the sampling part - yeah that's what I need to do if there's no
> way of titrating the number of clusters online.
>
> I am using something like
>
> dstream.foreachRDD { rdd =>
>   if (rdd.count() > 0) { //.. logic
>   }
> }
>
> Feels a little odd but if that's the idiom then I will stick to it.
>
> regards.
>
>
>
> On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger 
> wrote:
>
> So I haven't played around with streaming k means at all, but given
> that no one responded to your message a couple of days ago, I'll say
> what I can.
>
> 1. Can you not sample out some % of the stream for training?
> 2. Can you run multiple streams at the same time with different values
> for k and compare their performance?
> 3. foreachRDD is fine in general, can't speak to the specifics.
> 4. If you haven't done any transformations yet on a direct stream,
> foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
> is very cheap, it's done on the driver only because the beginning and
> ending offsets are known.  So you should be able to skip empty
> batches.
>
>
>
> On Sat, Nov 19, 2016 at 10:46 AM, debasishg 
> wrote:
> > Hello -
> >
> > I am trying to implement an outlier detection application on streaming
> data.
> > I am a newbie to Spark and hence would like some advice on the confusions
> > that I have ..
> >
> > I am thinking of using StreamingKMeans - is this a good choice ? I have
> one
> > stream of data and I need an online algorithm. But here are some
> questions
> > that immediately come to my mind ..
> >
> > 1. I cannot do separate training, cross validation etc. Is this a good
> idea
> > to do training and prediction online ?
> >
> > 2. The data will be read from the stream coming from Kafka in
> microbatches
> > of (say) 3 seconds. I get a DStream on which I train and get the
> clusters.
> > How can I decide on the number of clusters ? Using StreamingKMeans is
> there
> > any way I can iterate on microbatches with different values of k to find
> the
> > optimal one ?
> >
> > 3. Even if I fix k, after training on every microbatch I get a DStream.
> How
> > can I compute things like clustering score on the DStream ?
> > StreamingKMeansModel has a computeCost function but it takes an RDD. I
> can
> > use dstream.foreachRDD { // process RDD for the micro batch here } - is
> this
> > the idiomatic way ?
> >
> > 4. If I use dstream.foreachRDD { .. } and use functions like new
> > StandardScaler().fit(rdd) to do feature normalization, then it works
> when I
> > have data in the stream. But when the microbatch is empty (say I don't
> have
> > data for some time), the fit method throws exception as it gets an empty
> > collection. Things start working ok when data starts coming back to the
> > stream. But is this the way to go ?
> >
> > any suggestion will be welcome ..
> >
> > regards.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
> --
Sent from my iPhone


Re: Run spark with hadoop snapshot

2016-11-19 Thread Luke Miner
Thanks! Should I do it from the spark build environment?

On Sat, Nov 19, 2016 at 4:48 AM, Steve Loughran 
wrote:

> I'd recommend you build a fill spark release with the new hadoop version;
> you should have built that locally earlier the same day (so that ivy/maven
> pick up the snapshot)
>
>
> dev/make-distribution.sh -Pyarn,hadoop-2.7,hive -Dhadoop.version=2.9.0-
> SNAPSHOT;
>
>
>
> > On 18 Nov 2016, at 19:31, lminer  wrote:
> >
> > I'm trying to figure out how to run spark with a snapshot of Hadoop 2.8
> that
> > I built myself. I'm unclear on the configuration needed to get spark to
> work
> > with the snapshot.
> >
> > I'm running spark on mesos. Per the spark documentation, I run
> spark-submit
> > as follows using the `spark-2.0.2-bin-without-hadoop`, but spark doesn't
> > appear to be finding hadoop 2.8.
> >
> >export SPARK_DIST_CLASSPATH=$(/path/to/hadoop2.8/bin/hadoop
> classpath)
> >spark-submit --verbose --master mesos://$MASTER_HOST/mesos
> >
> > I get the error:
> >
> >Exception in thread "main" java.lang.NoClassDefFoundError:
> > org/apache/hadoop/fs/FSDataInputStream
> >at
> > org.apache.spark.deploy.SparkSubmitArguments.handle(
> SparkSubmitArguments.scala:403)
> >at
> > org.apache.spark.launcher.SparkSubmitOptionParser.parse(
> SparkSubmitOptionParser.java:163)
> >at
> > org.apache.spark.deploy.SparkSubmitArguments.(
> SparkSubmitArguments.scala:98)
> >at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
> >at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.hadoop.fs.FSDataInputStream
> >at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> >at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> >at java.security.AccessController.doPrivileged(Native Method)
> >at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> >at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> >at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> >at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> >... 5 more
> >
> > Any ideas on the proper configuration?
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Run-spark-with-hadoop-snapshot-tp28105.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
>


Re: using StreamingKMeans

2016-11-19 Thread ayan guha
Curious why do you want to train your models every 3 secs?
On 20 Nov 2016 06:25, "Debasish Ghosh"  wrote:

> Thanks a lot for the response.
>
> Regarding the sampling part - yeah that's what I need to do if there's no
> way of titrating the number of clusters online.
>
> I am using something like
>
> dstream.foreachRDD { rdd =>
>   if (rdd.count() > 0) { //.. logic
>   }
> }
>
> Feels a little odd but if that's the idiom then I will stick to it.
>
> regards.
>
>
>
> On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger 
> wrote:
>
>> So I haven't played around with streaming k means at all, but given
>> that no one responded to your message a couple of days ago, I'll say
>> what I can.
>>
>> 1. Can you not sample out some % of the stream for training?
>> 2. Can you run multiple streams at the same time with different values
>> for k and compare their performance?
>> 3. foreachRDD is fine in general, can't speak to the specifics.
>> 4. If you haven't done any transformations yet on a direct stream,
>> foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
>> is very cheap, it's done on the driver only because the beginning and
>> ending offsets are known.  So you should be able to skip empty
>> batches.
>>
>>
>>
>> On Sat, Nov 19, 2016 at 10:46 AM, debasishg 
>> wrote:
>> > Hello -
>> >
>> > I am trying to implement an outlier detection application on streaming
>> data.
>> > I am a newbie to Spark and hence would like some advice on the
>> confusions
>> > that I have ..
>> >
>> > I am thinking of using StreamingKMeans - is this a good choice ? I have
>> one
>> > stream of data and I need an online algorithm. But here are some
>> questions
>> > that immediately come to my mind ..
>> >
>> > 1. I cannot do separate training, cross validation etc. Is this a good
>> idea
>> > to do training and prediction online ?
>> >
>> > 2. The data will be read from the stream coming from Kafka in
>> microbatches
>> > of (say) 3 seconds. I get a DStream on which I train and get the
>> clusters.
>> > How can I decide on the number of clusters ? Using StreamingKMeans is
>> there
>> > any way I can iterate on microbatches with different values of k to
>> find the
>> > optimal one ?
>> >
>> > 3. Even if I fix k, after training on every microbatch I get a DStream.
>> How
>> > can I compute things like clustering score on the DStream ?
>> > StreamingKMeansModel has a computeCost function but it takes an RDD. I
>> can
>> > use dstream.foreachRDD { // process RDD for the micro batch here } - is
>> this
>> > the idiomatic way ?
>> >
>> > 4. If I use dstream.foreachRDD { .. } and use functions like new
>> > StandardScaler().fit(rdd) to do feature normalization, then it works
>> when I
>> > have data in the stream. But when the microbatch is empty (say I don't
>> have
>> > data for some time), the fit method throws exception as it gets an empty
>> > collection. Things start working ok when data starts coming back to the
>> > stream. But is this the way to go ?
>> >
>> > any suggestion will be welcome ..
>> >
>> > regards.
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Re: Logistic Regression Match Error

2016-11-19 Thread Meeraj Kunnumpurath
Thank you, it was the escape character, option("escape", "\"")

Regards

On Sat, Nov 19, 2016 at 11:10 PM, Meeraj Kunnumpurath <
mee...@servicesymphony.com> wrote:

> I triied .option("quote", "\""), which I believe is the default, still the
> same error. This is the offending record.
>
> Primo 4-In-1 Soft Seat Toilet Trainer and Step Stool White with Pastel
> Blue Seat,"I chose this potty for my son because of the good reviews. I do
> not like it. I'm honestly baffled by all the great reviews now that I have
> this thing in front of me.1)It is made of cheap material, feels flimsy, the
> grips on the bottom of the thing do nothing to keep it in place when the
> child sits on it.2)It comes apart into 5 or 6 different pieces and all my
> son likes to do is take it apart. I did not want a potty that would turn
> into a toy, and this has just become like a puzzle for him, with all the
> different pieces.3)It is a little big for him. He is young still but he's a
> big boy for his age. I looked at one of the pictures posted and he looks
> about the same size as the curly haired kid reading the book, but the potty
> in that picture is NOT this potty! This one is a little bigger and he can't
> get quite touch his feet on the ground, which is important.4)And one final
> thing, maybe most importantly, the ""soft"" seat is not so soft. Doesn't
> seem very comfortable to me. It's just plastic on top of plastic... and
> after my son sits on it for just a few minutes his butt has horrible red
> marks all over it! Definitely not comfortable.So, overall, i'm not
> impressed at all.I gave it 2 stars because... it gets the job done I
> suppose, and for a child a little bit older than my son it might fit a
> little better. Also I really liked the idea that it was 4-in-1.Overall
> though, I do not suggest getting this potty. Look elseware!It's probably
> best to actually go to a store and look at them first hand, and not order
> online. That's what I should have done in the first place.",2
>
> On Sat, Nov 19, 2016 at 10:59 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
>> Digging through it looks like an issue with reading CSV. Some of the data
>> have embedded commas in them, these fields are rightly quoted. However, the
>> CSV reader seems to be getting to a pickle, when the records contain quoted
>> and unquoted data. Fields are only quoted, when there are commas within the
>> fields, otherwise they are unquoted.
>>
>> Regards
>> Meeraj
>>
>> On Sat, Nov 19, 2016 at 10:10 PM, Meeraj Kunnumpurath <
>> mee...@servicesymphony.com> wrote:
>>
>>> Hello,
>>>
>>> I have the following code that trains a mapping of review text to
>>> ratings. I use a tokenizer to get all the words from the review, and use a
>>> count vectorizer to get all the words. However, when I train the classifier
>>> I get a match error. Any pointers will be very helpful.
>>>
>>> The code is below,
>>>
>>> val spark = SparkSession.builder().appName("Logistic 
>>> Regression").master("local").getOrCreate()
>>> import spark.implicits._
>>>
>>> val df = spark.read.option("header", "true").option("inferSchema", 
>>> "true").csv("data/amazon_baby.csv")
>>> val tk = new Tokenizer().setInputCol("review").setOutputCol("words")
>>> val cv = new CountVectorizer().setInputCol("words").setOutputCol("features")
>>>
>>> val isGood = udf((x: Int) => if (x >= 4) 1 else 0)
>>>
>>> val words = tk.transform(df.withColumn("label", isGood('rating)))
>>> val Array(training, test) = 
>>> cv.fit(words).transform(words).randomSplit(Array(0.8, 0.2), 1)
>>>
>>> val classifier = new LogisticRegression()
>>>
>>> training.show(10)
>>>
>>> val simpleModel = classifier.fit(training)
>>> simpleModel.evaluate(test).predictions.select("words", "label", 
>>> "prediction", "probability").show(10)
>>>
>>>
>>> And the error I get is below.
>>>
>>> 16/11/19 22:06:45 ERROR Executor: Exception in task 0.0 in stage 8.0
>>> (TID 9)
>>> scala.MatchError: [null,1.0,(257358,[0,1,2,3,4,5
>>> ,6,7,8,9,10,13,15,16,20,25,27,29,34,37,40,42,45,48,49,52,58,
>>> 68,71,76,77,86,89,93,98,99,100,108,109,116,122,124,129,169,2
>>> 08,219,221,235,249,255,260,353,355,371,431,442,641,711,972,
>>> 1065,1411,1663,1776,1925,2596,2957,3355,3828,4860,6288,7294,
>>> 8951,9758,12203,18319,21779,48525,72732,75420,146476,
>>> 192184],[3.0,8.0,1.0,1.0,4.0,2.0,7.0,4.0,2.0,1.0,1.0,2.0,1.0
>>> ,4.0,3.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0
>>> ,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0
>>> ,1.0,2.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>>> ,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>>> ,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])] (of class
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>>> at org.apache.spark.ml.classification.LogisticRegression$$anonf
>>> un$6.apply(LogisticRegression.scala:266)
>>> at org.apache.spark.ml.classification.LogisticRegression$$anonf
>>> un$6.apply(LogisticRegression.scala:266)
>>> at 

Re: using StreamingKMeans

2016-11-19 Thread Debasish Ghosh
Thanks a lot for the response.

Regarding the sampling part - yeah that's what I need to do if there's no
way of titrating the number of clusters online.

I am using something like

dstream.foreachRDD { rdd =>
  if (rdd.count() > 0) { //.. logic
  }
}

Feels a little odd but if that's the idiom then I will stick to it.

regards.



On Sat, Nov 19, 2016 at 10:52 PM, Cody Koeninger  wrote:

> So I haven't played around with streaming k means at all, but given
> that no one responded to your message a couple of days ago, I'll say
> what I can.
>
> 1. Can you not sample out some % of the stream for training?
> 2. Can you run multiple streams at the same time with different values
> for k and compare their performance?
> 3. foreachRDD is fine in general, can't speak to the specifics.
> 4. If you haven't done any transformations yet on a direct stream,
> foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
> is very cheap, it's done on the driver only because the beginning and
> ending offsets are known.  So you should be able to skip empty
> batches.
>
>
>
> On Sat, Nov 19, 2016 at 10:46 AM, debasishg 
> wrote:
> > Hello -
> >
> > I am trying to implement an outlier detection application on streaming
> data.
> > I am a newbie to Spark and hence would like some advice on the confusions
> > that I have ..
> >
> > I am thinking of using StreamingKMeans - is this a good choice ? I have
> one
> > stream of data and I need an online algorithm. But here are some
> questions
> > that immediately come to my mind ..
> >
> > 1. I cannot do separate training, cross validation etc. Is this a good
> idea
> > to do training and prediction online ?
> >
> > 2. The data will be read from the stream coming from Kafka in
> microbatches
> > of (say) 3 seconds. I get a DStream on which I train and get the
> clusters.
> > How can I decide on the number of clusters ? Using StreamingKMeans is
> there
> > any way I can iterate on microbatches with different values of k to find
> the
> > optimal one ?
> >
> > 3. Even if I fix k, after training on every microbatch I get a DStream.
> How
> > can I compute things like clustering score on the DStream ?
> > StreamingKMeansModel has a computeCost function but it takes an RDD. I
> can
> > use dstream.foreachRDD { // process RDD for the micro batch here } - is
> this
> > the idiomatic way ?
> >
> > 4. If I use dstream.foreachRDD { .. } and use functions like new
> > StandardScaler().fit(rdd) to do feature normalization, then it works
> when I
> > have data in the stream. But when the microbatch is empty (say I don't
> have
> > data for some time), the fit method throws exception as it gets an empty
> > collection. Things start working ok when data starts coming back to the
> > stream. But is this the way to go ?
> >
> > any suggestion will be welcome ..
> >
> > regards.
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: Logistic Regression Match Error

2016-11-19 Thread Meeraj Kunnumpurath
I triied .option("quote", "\""), which I believe is the default, still the
same error. This is the offending record.

Primo 4-In-1 Soft Seat Toilet Trainer and Step Stool White with Pastel Blue
Seat,"I chose this potty for my son because of the good reviews. I do not
like it. I'm honestly baffled by all the great reviews now that I have this
thing in front of me.1)It is made of cheap material, feels flimsy, the
grips on the bottom of the thing do nothing to keep it in place when the
child sits on it.2)It comes apart into 5 or 6 different pieces and all my
son likes to do is take it apart. I did not want a potty that would turn
into a toy, and this has just become like a puzzle for him, with all the
different pieces.3)It is a little big for him. He is young still but he's a
big boy for his age. I looked at one of the pictures posted and he looks
about the same size as the curly haired kid reading the book, but the potty
in that picture is NOT this potty! This one is a little bigger and he can't
get quite touch his feet on the ground, which is important.4)And one final
thing, maybe most importantly, the ""soft"" seat is not so soft. Doesn't
seem very comfortable to me. It's just plastic on top of plastic... and
after my son sits on it for just a few minutes his butt has horrible red
marks all over it! Definitely not comfortable.So, overall, i'm not
impressed at all.I gave it 2 stars because... it gets the job done I
suppose, and for a child a little bit older than my son it might fit a
little better. Also I really liked the idea that it was 4-in-1.Overall
though, I do not suggest getting this potty. Look elseware!It's probably
best to actually go to a store and look at them first hand, and not order
online. That's what I should have done in the first place.",2

On Sat, Nov 19, 2016 at 10:59 PM, Meeraj Kunnumpurath <
mee...@servicesymphony.com> wrote:

> Digging through it looks like an issue with reading CSV. Some of the data
> have embedded commas in them, these fields are rightly quoted. However, the
> CSV reader seems to be getting to a pickle, when the records contain quoted
> and unquoted data. Fields are only quoted, when there are commas within the
> fields, otherwise they are unquoted.
>
> Regards
> Meeraj
>
> On Sat, Nov 19, 2016 at 10:10 PM, Meeraj Kunnumpurath <
> mee...@servicesymphony.com> wrote:
>
>> Hello,
>>
>> I have the following code that trains a mapping of review text to
>> ratings. I use a tokenizer to get all the words from the review, and use a
>> count vectorizer to get all the words. However, when I train the classifier
>> I get a match error. Any pointers will be very helpful.
>>
>> The code is below,
>>
>> val spark = SparkSession.builder().appName("Logistic 
>> Regression").master("local").getOrCreate()
>> import spark.implicits._
>>
>> val df = spark.read.option("header", "true").option("inferSchema", 
>> "true").csv("data/amazon_baby.csv")
>> val tk = new Tokenizer().setInputCol("review").setOutputCol("words")
>> val cv = new CountVectorizer().setInputCol("words").setOutputCol("features")
>>
>> val isGood = udf((x: Int) => if (x >= 4) 1 else 0)
>>
>> val words = tk.transform(df.withColumn("label", isGood('rating)))
>> val Array(training, test) = 
>> cv.fit(words).transform(words).randomSplit(Array(0.8, 0.2), 1)
>>
>> val classifier = new LogisticRegression()
>>
>> training.show(10)
>>
>> val simpleModel = classifier.fit(training)
>> simpleModel.evaluate(test).predictions.select("words", "label", 
>> "prediction", "probability").show(10)
>>
>>
>> And the error I get is below.
>>
>> 16/11/19 22:06:45 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID
>> 9)
>> scala.MatchError: [null,1.0,(257358,[0,1,2,3,4,5
>> ,6,7,8,9,10,13,15,16,20,25,27,29,34,37,40,42,45,48,49,52,58,
>> 68,71,76,77,86,89,93,98,99,100,108,109,116,122,124,129,169,
>> 208,219,221,235,249,255,260,353,355,371,431,442,641,711,
>> 972,1065,1411,1663,1776,1925,2596,2957,3355,3828,4860,6288,
>> 7294,8951,9758,12203,18319,21779,48525,72732,75420,146476
>> ,192184],[3.0,8.0,1.0,1.0,4.0,2.0,7.0,4.0,2.0,1.0,1.0,2.0,1.
>> 0,4.0,3.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,2.0,2.0,1.0,1.
>> 0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.
>> 0,1.0,2.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.
>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.
>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])] (of class
>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
>> at org.apache.spark.ml.classification.LogisticRegression$$
>> anonfun$6.apply(LogisticRegression.scala:266)
>> at org.apache.spark.ml.classification.LogisticRegression$$
>> anonfun$6.apply(LogisticRegression.scala:266)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsVal
>> ues(MemoryStore.scala:214)
>> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator
>> $1.apply(BlockManager.scala:919)
>> at 

Re: Logistic Regression Match Error

2016-11-19 Thread Meeraj Kunnumpurath
Digging through it looks like an issue with reading CSV. Some of the data
have embedded commas in them, these fields are rightly quoted. However, the
CSV reader seems to be getting to a pickle, when the records contain quoted
and unquoted data. Fields are only quoted, when there are commas within the
fields, otherwise they are unquoted.

Regards
Meeraj

On Sat, Nov 19, 2016 at 10:10 PM, Meeraj Kunnumpurath <
mee...@servicesymphony.com> wrote:

> Hello,
>
> I have the following code that trains a mapping of review text to ratings.
> I use a tokenizer to get all the words from the review, and use a count
> vectorizer to get all the words. However, when I train the classifier I get
> a match error. Any pointers will be very helpful.
>
> The code is below,
>
> val spark = SparkSession.builder().appName("Logistic 
> Regression").master("local").getOrCreate()
> import spark.implicits._
>
> val df = spark.read.option("header", "true").option("inferSchema", 
> "true").csv("data/amazon_baby.csv")
> val tk = new Tokenizer().setInputCol("review").setOutputCol("words")
> val cv = new CountVectorizer().setInputCol("words").setOutputCol("features")
>
> val isGood = udf((x: Int) => if (x >= 4) 1 else 0)
>
> val words = tk.transform(df.withColumn("label", isGood('rating)))
> val Array(training, test) = 
> cv.fit(words).transform(words).randomSplit(Array(0.8, 0.2), 1)
>
> val classifier = new LogisticRegression()
>
> training.show(10)
>
> val simpleModel = classifier.fit(training)
> simpleModel.evaluate(test).predictions.select("words", "label", "prediction", 
> "probability").show(10)
>
>
> And the error I get is below.
>
> 16/11/19 22:06:45 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID
> 9)
> scala.MatchError: [null,1.0,(257358,[0,1,2,3,4,
> 5,6,7,8,9,10,13,15,16,20,25,27,29,34,37,40,42,45,48,49,52,
> 58,68,71,76,77,86,89,93,98,99,100,108,109,116,122,124,129,
> 169,208,219,221,235,249,255,260,353,355,371,431,442,641,
> 711,972,1065,1411,1663,1776,1925,2596,2957,3355,3828,4860,
> 6288,7294,8951,9758,12203,18319,21779,48525,72732,75420,
> 146476,192184],[3.0,8.0,1.0,1.0,4.0,2.0,7.0,4.0,2.0,1.0,1.0,
> 2.0,1.0,4.0,3.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,2.0,2.0,
> 1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,
> 1.0,1.0,1.0,2.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,
> 1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,
> 1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])] (of class
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
> at org.apache.spark.ml.classification.LogisticRegression$$anonfun$6.
> apply(LogisticRegression.scala:266)
> at org.apache.spark.ml.classification.LogisticRegression$$anonfun$6.
> apply(LogisticRegression.scala:266)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(
> MemoryStore.scala:214)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:919)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(
> BlockManager.scala:910)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
> at org.apache.spark.storage.BlockManager.doPutIterator(
> BlockManager.scala:910)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:668)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
>
> Many thanks
> --
> *Meeraj Kunnumpurath*
>
>
> *Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*
>
> *00 971 50 409 0169mee...@servicesymphony.com *
>



-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169mee...@servicesymphony.com *


Logistic Regression Match Error

2016-11-19 Thread Meeraj Kunnumpurath
Hello,

I have the following code that trains a mapping of review text to ratings.
I use a tokenizer to get all the words from the review, and use a count
vectorizer to get all the words. However, when I train the classifier I get
a match error. Any pointers will be very helpful.

The code is below,

val spark = SparkSession.builder().appName("Logistic
Regression").master("local").getOrCreate()
import spark.implicits._

val df = spark.read.option("header", "true").option("inferSchema",
"true").csv("data/amazon_baby.csv")
val tk = new Tokenizer().setInputCol("review").setOutputCol("words")
val cv = new CountVectorizer().setInputCol("words").setOutputCol("features")

val isGood = udf((x: Int) => if (x >= 4) 1 else 0)

val words = tk.transform(df.withColumn("label", isGood('rating)))
val Array(training, test) =
cv.fit(words).transform(words).randomSplit(Array(0.8, 0.2), 1)

val classifier = new LogisticRegression()

training.show(10)

val simpleModel = classifier.fit(training)
simpleModel.evaluate(test).predictions.select("words", "label",
"prediction", "probability").show(10)


And the error I get is below.

16/11/19 22:06:45 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 9)
scala.MatchError:
[null,1.0,(257358,[0,1,2,3,4,5,6,7,8,9,10,13,15,16,20,25,27,29,34,37,40,42,45,48,49,52,58,68,71,76,77,86,89,93,98,99,100,108,109,116,122,124,129,169,208,219,221,235,249,255,260,353,355,371,431,442,641,711,972,1065,1411,1663,1776,1925,2596,2957,3355,3828,4860,6288,7294,8951,9758,12203,18319,21779,48525,72732,75420,146476,192184],[3.0,8.0,1.0,1.0,4.0,2.0,7.0,4.0,2.0,1.0,1.0,2.0,1.0,4.0,3.0,1.0,1.0,1.0,1.0,1.0,5.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])]
(of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at
org.apache.spark.ml.classification.LogisticRegression$$anonfun$6.apply(LogisticRegression.scala:266)
at
org.apache.spark.ml.classification.LogisticRegression$$anonfun$6.apply(LogisticRegression.scala:266)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)

Many thanks
-- 
*Meeraj Kunnumpurath*


*Director and Executive PrincipalService Symphony Ltd00 44 7702 693597*

*00 971 50 409 0169mee...@servicesymphony.com *


Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Hster Geguri
Hi Cody,

Our test producer has been vetted for producing evenly into each
partition.  We use kafka-manager to track this.

$ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> $ kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list '
> 10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021


We have spent two weeks trying different configurations and stripping
everything down.  The only thing we have not tried is a different cloud
provider- we are using GCE. Since previous versions work properly as does
the "latest" offset setting, we did not think the problem was in the
infrastructure layer.

Thanks,
Heji


On Sat, Nov 19, 2016 at 9:27 AM, Cody Koeninger  wrote:

> This is running locally on my mac, but it's still a standalone spark
> master with multiple separate executor jvms (i.e. using --master not
> --local[2]), so it should be the same code paths.  I can't speak to
> yarn one way or the other, but you said you tried it with the
> standalone scheduler.
>
> At the very least, you should run ./bin/kafka-run-class.sh
> kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
> to what you're seeing from spark.  The results you posted from spark
> didn't show any incoming messages at all.
>
> On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
>  wrote:
> > Hi Cody,
> >
> > Thank you for testing this on a Saturday morning!  I failed to mention
> that
> > when our data engineer runs our drivers(even complex ones) locally on his
> > Mac, the drivers work fine. However when we launch it into the cluster (4
> > machines either for a YARN cluster or spark standalone) we get this
> issue.
> >
> > Heji
> >
> >
> > On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger 
> wrote:
> >>
> >> I ran your example using the versions of kafka and spark you are
> >> using, against a standalone cluster.  This is what I observed:
> >>
> >> (in kafka working directory)
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> >> simple_logtest:2:0
> >> simple_logtest:4:0
> >> simple_logtest:1:0
> >> simple_logtest:3:0
> >> simple_logtest:0:0
> >>
> >> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> >> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> >> simple_logtest:2:31
> >> simple_logtest:4:31
> >> simple_logtest:1:31
> >> simple_logtest:3:31
> >> simple_logtest:0:31
> >>
> >> So in other words, there are 5 partitions, they all have messages in
> them
> >>
> >> (in spark working directory)
> >>
> >> bash-3.2$ ./bin/spark-submit --master
> >> spark://Codys-MacBook-Pro.local:7077 --class
> >> example.SimpleKafkaLoggingDriver
> >>
> >> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> >> localhost:9092 simple_logtest mygroup earliest
> >>
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >>
> >> simple_logtest 3 offsets: 0 to 31
> >> simple_logtest 0 offsets: 0 to 31
> >> simple_logtest 1 offsets: 0 to 31
> >> simple_logtest 2 offsets: 0 to 31
> >> simple_logtest 4 offsets: 0 to 31
> >>
> >> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> >> 1479574025000 ms.0 from job set of time 1479574025000 ms
> >> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> >> 1479574025000 ms (execution: 0.005 s)
> >> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> >> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> >> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403
> ms
> >> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> >> 147957403 ms.0 from job set of time 147957403 ms
> >>
> >> simple_logtest 3 offsets: 31 to 31
> >> simple_logtest 0 offsets: 31 to 31
> >> simple_logtest 1 offsets: 31 to 31
> >> simple_logtest 2 offsets: 31 to 31
> >> simple_logtest 4 offsets: 31 to 31
> >>
> >> So in other words, spark is indeed seeing offsets for each partition.
> >>
> >>
> >> The results you posted look to me like there aren't any messages going
> >> into the other partitions, which looks like a misbehaving producer.
> >>
> >> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
> >>  wrote:
> >> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we
> have
> >> > been
> >> > struggling with this show stopper problem.
> >> >
> >> > When we run our drivers with auto.offset.reset=latest ingesting from a
> >> > single kafka topic with 10 partitions, the driver reads correctly from
> >> > 

Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
This is running locally on my mac, but it's still a standalone spark
master with multiple separate executor jvms (i.e. using --master not
--local[2]), so it should be the same code paths.  I can't speak to
yarn one way or the other, but you said you tried it with the
standalone scheduler.

At the very least, you should run ./bin/kafka-run-class.sh
kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
to what you're seeing from spark.  The results you posted from spark
didn't show any incoming messages at all.

On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
 wrote:
> Hi Cody,
>
> Thank you for testing this on a Saturday morning!  I failed to mention that
> when our data engineer runs our drivers(even complex ones) locally on his
> Mac, the drivers work fine. However when we launch it into the cluster (4
> machines either for a YARN cluster or spark standalone) we get this issue.
>
> Heji
>
>
> On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger  wrote:
>>
>> I ran your example using the versions of kafka and spark you are
>> using, against a standalone cluster.  This is what I observed:
>>
>> (in kafka working directory)
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -2
>> simple_logtest:2:0
>> simple_logtest:4:0
>> simple_logtest:1:0
>> simple_logtest:3:0
>> simple_logtest:0:0
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -1
>> simple_logtest:2:31
>> simple_logtest:4:31
>> simple_logtest:1:31
>> simple_logtest:3:31
>> simple_logtest:0:31
>>
>> So in other words, there are 5 partitions, they all have messages in them
>>
>> (in spark working directory)
>>
>> bash-3.2$ ./bin/spark-submit --master
>> spark://Codys-MacBook-Pro.local:7077 --class
>> example.SimpleKafkaLoggingDriver
>>
>> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
>> localhost:9092 simple_logtest mygroup earliest
>>
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>>
>> simple_logtest 3 offsets: 0 to 31
>> simple_logtest 0 offsets: 0 to 31
>> simple_logtest 1 offsets: 0 to 31
>> simple_logtest 2 offsets: 0 to 31
>> simple_logtest 4 offsets: 0 to 31
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
>> 1479574025000 ms (execution: 0.005 s)
>> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
>> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
>> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
>> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
>> 147957403 ms.0 from job set of time 147957403 ms
>>
>> simple_logtest 3 offsets: 31 to 31
>> simple_logtest 0 offsets: 31 to 31
>> simple_logtest 1 offsets: 31 to 31
>> simple_logtest 2 offsets: 31 to 31
>> simple_logtest 4 offsets: 31 to 31
>>
>> So in other words, spark is indeed seeing offsets for each partition.
>>
>>
>> The results you posted look to me like there aren't any messages going
>> into the other partitions, which looks like a misbehaving producer.
>>
>> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>>  wrote:
>> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
>> > been
>> > struggling with this show stopper problem.
>> >
>> > When we run our drivers with auto.offset.reset=latest ingesting from a
>> > single kafka topic with 10 partitions, the driver reads correctly from
>> > all
>> > 10 partitions.
>> >
>> > However when we use auto.offset.reset=earliest, the driver will read
>> > only a
>> > single partition.
>> >
>> > When we turn on the debug logs, we sometimes see partitions being set to
>> > different offset configuration even though the consumer config correctly
>> > indicates auto.offset.reset=earliest.
>> >
>> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
>> >> offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 TRACE Sending ListOffsetRequest
>> >>
>> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 8 TRACE Received ListOffsetResponse
>> >>
>> >> 

Re: using StreamingKMeans

2016-11-19 Thread Cody Koeninger
So I haven't played around with streaming k means at all, but given
that no one responded to your message a couple of days ago, I'll say
what I can.

1. Can you not sample out some % of the stream for training?
2. Can you run multiple streams at the same time with different values
for k and compare their performance?
3. foreachRDD is fine in general, can't speak to the specifics.
4. If you haven't done any transformations yet on a direct stream,
foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
is very cheap, it's done on the driver only because the beginning and
ending offsets are known.  So you should be able to skip empty
batches.



On Sat, Nov 19, 2016 at 10:46 AM, debasishg  wrote:
> Hello -
>
> I am trying to implement an outlier detection application on streaming data.
> I am a newbie to Spark and hence would like some advice on the confusions
> that I have ..
>
> I am thinking of using StreamingKMeans - is this a good choice ? I have one
> stream of data and I need an online algorithm. But here are some questions
> that immediately come to my mind ..
>
> 1. I cannot do separate training, cross validation etc. Is this a good idea
> to do training and prediction online ?
>
> 2. The data will be read from the stream coming from Kafka in microbatches
> of (say) 3 seconds. I get a DStream on which I train and get the clusters.
> How can I decide on the number of clusters ? Using StreamingKMeans is there
> any way I can iterate on microbatches with different values of k to find the
> optimal one ?
>
> 3. Even if I fix k, after training on every microbatch I get a DStream. How
> can I compute things like clustering score on the DStream ?
> StreamingKMeansModel has a computeCost function but it takes an RDD. I can
> use dstream.foreachRDD { // process RDD for the micro batch here } - is this
> the idiomatic way ?
>
> 4. If I use dstream.foreachRDD { .. } and use functions like new
> StandardScaler().fit(rdd) to do feature normalization, then it works when I
> have data in the stream. But when the microbatch is empty (say I don't have
> data for some time), the fit method throws exception as it gets an empty
> collection. Things start working ok when data starts coming back to the
> stream. But is this the way to go ?
>
> any suggestion will be welcome ..
>
> regards.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



covert local tsv file to orc file on distributed cloud storage(openstack).

2016-11-19 Thread vr spark
Hi,
I am looking for scala or python code samples to covert local tsv file to
orc file and store on distributed cloud storage(openstack).

So, need these 3 samples. Please suggest.

1. read tsv
2. convert to orc
3. store on distributed cloud storage


thanks
VR


Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Hster Geguri
Hi Cody,

Thank you for testing this on a Saturday morning!  I failed to mention that
when our data engineer runs our drivers(even complex ones) locally on his
Mac, the drivers work fine. However when we launch it into the cluster (4
machines either for a YARN cluster or spark standalone) we get this issue.

Heji


On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger  wrote:

> I ran your example using the versions of kafka and spark you are
> using, against a standalone cluster.  This is what I observed:
>
> (in kafka working directory)
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
>
> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
> --broker-list 'localhost:9092' --topic simple_logtest --time -1
> simple_logtest:2:31
> simple_logtest:4:31
> simple_logtest:1:31
> simple_logtest:3:31
> simple_logtest:0:31
>
> So in other words, there are 5 partitions, they all have messages in them
>
> (in spark working directory)
>
> bash-3.2$ ./bin/spark-submit --master
> spark://Codys-MacBook-Pro.local:7077 --class
> example.SimpleKafkaLoggingDriver
> /private/var/tmp/kafka-bug-report/target/scala-2.11/
> kafka-example-assembly-2.0.0.jar
> localhost:9092 simple_logtest mygroup earliest
>
>
> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
>
> simple_logtest 3 offsets: 0 to 31
> simple_logtest 0 offsets: 0 to 31
> simple_logtest 1 offsets: 0 to 31
> simple_logtest 2 offsets: 0 to 31
> simple_logtest 4 offsets: 0 to 31
>
> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
> 1479574025000 ms.0 from job set of time 1479574025000 ms
> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
> 1479574025000 ms (execution: 0.005 s)
> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
> 147957403 ms.0 from job set of time 147957403 ms
>
> simple_logtest 3 offsets: 31 to 31
> simple_logtest 0 offsets: 31 to 31
> simple_logtest 1 offsets: 31 to 31
> simple_logtest 2 offsets: 31 to 31
> simple_logtest 4 offsets: 31 to 31
>
> So in other words, spark is indeed seeing offsets for each partition.
>
>
> The results you posted look to me like there aren't any messages going
> into the other partitions, which looks like a misbehaving producer.
>
> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>  wrote:
> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
> been
> > struggling with this show stopper problem.
> >
> > When we run our drivers with auto.offset.reset=latest ingesting from a
> > single kafka topic with 10 partitions, the driver reads correctly from
> all
> > 10 partitions.
> >
> > However when we use auto.offset.reset=earliest, the driver will read
> only a
> > single partition.
> >
> > When we turn on the debug logs, we sometimes see partitions being set to
> > different offset configuration even though the consumer config correctly
> > indicates auto.offset.reset=earliest.
> >
> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=8,timestamp=-2}]}]}
> >> to broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Sending ListOffsetRequest
> >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{
> partition=9,timestamp=-1}]}]}
> >> to broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> >> from broker 10.102.20.12:9092 (id: 12 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 TRACE Received ListOffsetResponse
> >> {responses=[{topic=simple_test,partition_responses=[{
> partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> >> from broker 10.102.20.13:9092 (id: 13 rack: null)
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
> >
> >
> >
> 

Re: Kafka direct approach,App UI shows wrong input rate

2016-11-19 Thread Cody Koeninger
There have definitely been issues with UI reporting for the direct
stream in the past, but I'm not able to reproduce this with 2.0.2 and
0.8.  See below:

https://i.imgsafe.org/086019ae57.png



On Fri, Nov 18, 2016 at 4:38 AM, Julian Keppel
 wrote:
> Hello,
>
> I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is 0.10.0.1
> (Scala 2.11). I read data from Kafka with the direct approach. The complete
> infrastructure runs on Google Container Engine.
>
> I wonder why the corresponding application UI says the input rate is zero
> records per second. This is definitely wrong. I checked it while I printed
> out the incoming records to the driver console. All other metrics seem to be
> correct (at least they are realistic).
>
> What is going on here? Do you have any idea? Thanks for you help.
>
> Julian

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Usage of mllib api in ml

2016-11-19 Thread janardhan shetty
Hi,

I am trying to use the evaluation metrics offered by mllib
multiclassmetrics in ml dataframe setting.
Is there any examples how to use it?


Re: Spark ML DataFrame API - need cosine similarity, how to convert to RDD Vectors?

2016-11-19 Thread Yanbo Liang
Hi Russell,

Do you want to use RowMatrix.columnSimilarities to calculate cosine
similarities?
If so, you should using the following steps:

val dataset: DataFrame
// Convert the type of features column from ml.linalg.Vector type to
mllib.linalg.Vector
val oldDataset: DataFrame = MLUtils.convertVectorColumnsFromML(dataset,
"features")
// Convert fromt DataFrame to RDD
val oldRDD: RDD[mllib.linalg.Vector] =
oldDataset.select(col("features")).rdd.map { row =>
row.getAs[mllib.linalg.Vector](0) }
// Generate RowMatrix
val mat: RowMatrix = new RowMatrix(oldRDD, nRows, nCols)
mat.columnSimilarities()

Please feel free to let me know whether it can satisfy your requirements.


Thanks
Yanbo

On Wed, Nov 16, 2016 at 9:26 AM, Russell Jurney 
wrote:

> Asher, can you cast like that? Does that casting work? That is my
> confusion: I don't know what a DataFrame Vector turns into in terms of an
> RDD type.
>
> I'll try this, thanks.
>
> On Tue, Nov 15, 2016 at 11:25 AM, Asher Krim  wrote:
>
>> What language are you using? For Java, you might convert the dataframe to
>> an rdd using something like this:
>>
>> df
>> .toJavaRDD()
>> .map(row -> (SparseVector)row.getAs(row.fieldIndex("columnName")));
>>
>> On Tue, Nov 15, 2016 at 1:06 PM, Russell Jurney > > wrote:
>>
>>> I have two dataframes with common feature vectors and I need to get the
>>> cosine similarity of one against the other. It looks like this is possible
>>> in the RDD based API, mllib, but not in ml.
>>>
>>> So, how do I convert my sparse dataframe vectors into something spark
>>> mllib can use? I've searched, but haven't found anything.
>>>
>>> Thanks!
>>> --
>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>>
>>
>>
>>
>> --
>> Asher Krim
>> Senior Software Engineer
>>
>
>
>
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>


Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
I ran your example using the versions of kafka and spark you are
using, against a standalone cluster.  This is what I observed:

(in kafka working directory)

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -2
simple_logtest:2:0
simple_logtest:4:0
simple_logtest:1:0
simple_logtest:3:0
simple_logtest:0:0

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -1
simple_logtest:2:31
simple_logtest:4:31
simple_logtest:1:31
simple_logtest:3:31
simple_logtest:0:31

So in other words, there are 5 partitions, they all have messages in them

(in spark working directory)

bash-3.2$ ./bin/spark-submit --master
spark://Codys-MacBook-Pro.local:7077 --class
example.SimpleKafkaLoggingDriver
/private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
localhost:9092 simple_logtest mygroup earliest


16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms

simple_logtest 3 offsets: 0 to 31
simple_logtest 0 offsets: 0 to 31
simple_logtest 1 offsets: 0 to 31
simple_logtest 2 offsets: 0 to 31
simple_logtest 4 offsets: 0 to 31

16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms
16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
1479574025000 ms (execution: 0.005 s)
16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
147957403 ms.0 from job set of time 147957403 ms

simple_logtest 3 offsets: 31 to 31
simple_logtest 0 offsets: 31 to 31
simple_logtest 1 offsets: 31 to 31
simple_logtest 2 offsets: 31 to 31
simple_logtest 4 offsets: 31 to 31

So in other words, spark is indeed seeing offsets for each partition.


The results you posted look to me like there aren't any messages going
into the other partitions, which looks like a misbehaving producer.

On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
 wrote:
> Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been
> struggling with this show stopper problem.
>
> When we run our drivers with auto.offset.reset=latest ingesting from a
> single kafka topic with 10 partitions, the driver reads correctly from all
> 10 partitions.
>
> However when we use auto.offset.reset=earliest, the driver will read only a
> single partition.
>
> When we turn on the debug logs, we sometimes see partitions being set to
> different offset configuration even though the consumer config correctly
> indicates auto.offset.reset=earliest.
>
>> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>> from broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>> from broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>
>
>
> I've enclosed below the completely stripped down trivial test driver that
> shows this behavior. We normally run with YARN 2.7.3 but have also tried
> running spark standalone mode which has the same behavior. Our drivers are
> normally java but we have tried the scala version which also has the same
> incorrect behavior. We have tried different LocationStrategies and partition
> assignment strategies all without success.  Any insight would be greatly
> appreciated.
>
> package com.x.labs.analytics.diagnostics.spark.drivers
>
> import org.apache.kafka.common.serialization.StringDeserializer
> import 

using StreamingKMeans

2016-11-19 Thread debasishg
Hello -

I am trying to implement an outlier detection application on streaming data.
I am a newbie to Spark and hence would like some advice on the confusions
that I have ..

I am thinking of using StreamingKMeans - is this a good choice ? I have one
stream of data and I need an online algorithm. But here are some questions
that immediately come to my mind ..

1. I cannot do separate training, cross validation etc. Is this a good idea
to do training and prediction online ? 

2. The data will be read from the stream coming from Kafka in microbatches
of (say) 3 seconds. I get a DStream on which I train and get the clusters.
How can I decide on the number of clusters ? Using StreamingKMeans is there
any way I can iterate on microbatches with different values of k to find the
optimal one ?

3. Even if I fix k, after training on every microbatch I get a DStream. How
can I compute things like clustering score on the DStream ?
StreamingKMeansModel has a computeCost function but it takes an RDD. I can
use dstream.foreachRDD { // process RDD for the micro batch here } - is this
the idiomatic way ? 

4. If I use dstream.foreachRDD { .. } and use functions like new
StandardScaler().fit(rdd) to do feature normalization, then it works when I
have data in the stream. But when the microbatch is empty (say I don't have
data for some time), the fit method throws exception as it gets an empty
collection. Things start working ok when data starts coming back to the
stream. But is this the way to go ?

any suggestion will be welcome ..

regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: VectorUDT and ml.Vector

2016-11-19 Thread Yanbo Liang
The reason behind this error can be inferred from the error log:
*MLUtils.convertMatrixColumnsFromML *was used to convert ml.linalg.Matrix
to mllib.linalg.Matrix,
but it looks like the column type is ml.linalg.Vector in your case.
Could you check the type of column "features" in your dataframe (Vector or
Matrix)? I think it's ml.linalg.Vector, so your should use
*MLUtils.convertVectorColumnsFromML.*

Thanks
Yanbo


On Mon, Nov 7, 2016 at 5:25 AM, Ganesh  wrote:

> I am trying to run a SVD on a dataframe and I have used ml TF-IDF which
> has created a dataframe.
> Now for Singular Value Decomposition I am trying to use RowMatrix which
> takes in RDD with mllib.Vector so I have to convert this Dataframe with
> what I assumed was ml.Vector
>
> However the conversion
>
> *val convertedTermDocMatrix =
> MLUtils.convertMatrixColumnsFromML(termDocMatrix,"features")*
>
> fails with
>
> java.lang.IllegalArgumentException: requirement failed: Column features
> must be new Matrix type to be converted to old type but got
> org.apache.spark.ml.linalg.VectorUDT
>
>
> So the question is: How do I perform SVD on a DataFrame? I assume all the
> functionalities of mllib has not be ported to ml.
>
>
> I tried to convert my entire project to use RDD but computeSVD on
> RowMatrix is throwing up out of Memory errors and anyway I would like to
> stick with DataFrame.
>
> Our text corpus is around 55 Gb of text data.
>
>
>
> Ganesh
>


Re: Kafka segmentation

2016-11-19 Thread Cody Koeninger
I mean I don't understand exactly what the issue is.  Can you fill in
these blanks

My settings are :

My code is :

I expected to see :

Instead, I saw :

On Thu, Nov 17, 2016 at 12:53 PM, Hoang Bao Thien  wrote:
> I am sorry I don't understand your idea. What do you mean exactly?
>
> On Fri, Nov 18, 2016 at 1:50 AM, Cody Koeninger  wrote:
>>
>> Ok, I don't think I'm clear on the issue then.  Can you say what the
>> expected behavior is, and what the observed behavior is?
>>
>> On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien 
>> wrote:
>> > Hi,
>> >
>> > Thanks for your comments. But in fact, I don't want to limit the size of
>> > batches, it could be any greater size as it does.
>> >
>> > Thien
>> >
>> > On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger 
>> > wrote:
>> >>
>> >> If you want a consistent limit on the size of batches, use
>> >> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
>> >> createDirectStream)
>> >>
>> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> >>
>> >> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien
>> >> 
>> >> wrote:
>> >> > Hi,
>> >> >
>> >> > I use CSV and other text files to Kafka just to test Kafka + Spark
>> >> > Streaming
>> >> > by using direct stream. That's why I don't want Spark streaming reads
>> >> > CSVs
>> >> > or text files directly.
>> >> > In addition, I don't want a giant batch of records like the link you
>> >> > sent.
>> >> > The problem is that we should receive the "similar" number of record
>> >> > of
>> >> > all
>> >> > batchs instead of the first two or three batches have so large number
>> >> > of
>> >> > records (e.g., 100K) but the last 1000 batches with only 200 records.
>> >> >
>> >> > I know that the problem is not from the auto.offset.reset=largest,
>> >> > but I
>> >> > don't know what I can do in this case.
>> >> >
>> >> > Do you and other ones could suggest me some solutions please as this
>> >> > seems
>> >> > the normal situation with Kafka+SpartStreaming.
>> >> >
>> >> > Thanks.
>> >> > Alex
>> >> >
>> >> >
>> >> >
>> >> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger 
>> >> > wrote:
>> >> >>
>> >> >> Yeah, if you're reporting issues, please be clear as to whether
>> >> >> backpressure is enabled, and whether maxRatePerPartition is set.
>> >> >>
>> >> >> I expect that there is something wrong with backpressure, see e.g.
>> >> >> https://issues.apache.org/jira/browse/SPARK-18371
>> >> >>
>> >> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang 
>> >> >> wrote:
>> >> >> > I hit similar issue with Spark Streaming. The batch size seemed a
>> >> >> > little
>> >> >> > random. Sometime it was large with many Kafka messages inside same
>> >> >> > batch,
>> >> >> > sometimes it was very small with just a few messages. Is it
>> >> >> > possible
>> >> >> > that
>> >> >> > was caused by the backpressure implementation in Spark Streaming?
>> >> >> >
>> >> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger
>> >> >> > 
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Moved to user list.
>> >> >> >>
>> >> >> >> I'm not really clear on what you're trying to accomplish (why put
>> >> >> >> the
>> >> >> >> csv file through Kafka instead of reading it directly with
>> >> >> >> spark?)
>> >> >> >>
>> >> >> >> auto.offset.reset=largest just means that when starting the job
>> >> >> >> without any defined offsets, it will start at the highest (most
>> >> >> >> recent) available offsets.  That's probably not what you want if
>> >> >> >> you've already loaded csv lines into kafka.
>> >> >> >>
>> >> >> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
>> >> >> >> 
>> >> >> >> wrote:
>> >> >> >> > Hi all,
>> >> >> >> >
>> >> >> >> > I would like to ask a question related to the size of Kafka
>> >> >> >> > stream. I
>> >> >> >> > want
>> >> >> >> > to put data (e.g., file *.csv) to Kafka then use Spark
>> >> >> >> > streaming
>> >> >> >> > to
>> >> >> >> > get
>> >> >> >> > the
>> >> >> >> > output from Kafka and then save to Hive by using SparkSQL. The
>> >> >> >> > file
>> >> >> >> > csv
>> >> >> >> > is
>> >> >> >> > about 100MB with ~250K messages/rows (Each row has about 10
>> >> >> >> > fields
>> >> >> >> > of
>> >> >> >> > integer). I see that Spark Streaming first received two
>> >> >> >> > partitions/batches,
>> >> >> >> > the first is of 60K messages and the second is of 50K msgs. But
>> >> >> >> > from
>> >> >> >> > the
>> >> >> >> > third batch, Spark just received 200 messages for each batch
>> >> >> >> > (or
>> >> >> >> > partition).
>> >> >> >> > I think that this problem is coming from Kafka or some
>> >> >> >> > configuration
>> >> >> >> > in
>> >> >> >> > Spark. I already tried to configure with the setting
>> >> >> >> > "auto.offset.reset=largest", but every batch only gets 200
>> >> >> >> > messages.
>> >> >> >> >

Re: why is method predict protected in PredictionModel

2016-11-19 Thread Yanbo Liang
This function is used internally currently, we will expose it as public to
support make prediction on single instance.
See discussion at https://issues.apache.org/jira/browse/SPARK-10413.

Thanks
Yanbo

On Thu, Nov 17, 2016 at 1:24 AM, wobu  wrote:

> Hi,
>
> we were using Spark 1.3.1 for a long time and now we want to upgrade to 2.0
> release.
> So we used till today the mllib package and the RDD API.
>
> Now im trying to refactor our mllib NaiveBayesClassifier to the new "ml"
> api.
>
> *The Question:*
> why is the method "predict" in the class "PredictionModel" of package "ml"
> protected?
>
> I am trying to find a way to load a saved prediction model and predict
> values without having to use the pipline or transformer api.
>
>
> Best regards
>
> wobu
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/why-is-method-predict-protected-in-
> PredictionModel-tp28095.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: DataFrame select non-existing column

2016-11-19 Thread Kristoffer Sjögren
Thanks. Here's my code example [1] and the printSchema() output [2].

This code still fails with the following message: "No such struct
field mobile in auction, geo"

By looking at the schema, it seems that pass.mobile did not get
nested, which is the way it needs to be for my use case. Is nested
columns not supported by withColumn()?

[1]

DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", lit(0L));
dataFrame.printSchema();
dataFrame.select("pass.mobile");

[2]

root
 |-- pass: struct (nullable = true)
 ||-- auction: struct (nullable = true)
 |||-- id: integer (nullable = true)
 ||-- geo: struct (nullable = true)
 |||-- postalCode: string (nullable = true)
 |-- pass.mobile: long (nullable = false)

On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf
 wrote:
> In pyspark for example you would do something like:
>
> df.withColumn("newColName",pyspark.sql.functions.lit(None))
>
> Assaf.
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Friday, November 18, 2016 9:19 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: DataFrame select non-existing column
>
> Thanks for your answer. I have been searching the API for doing that but I 
> could not find how to do it?
>
> Could you give me a code snippet?
>
> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf  
> wrote:
>> You can always add the columns to old dataframes giving them null (or some 
>> literal) as a preprocessing.
>>
>> -Original Message-
>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>> Sent: Friday, November 18, 2016 4:32 PM
>> To: user
>> Subject: DataFrame select non-existing column
>>
>> Hi
>>
>> We have evolved a DataFrame by adding a few columns but cannot write select 
>> statements on these columns for older data that doesn't have them since they 
>> fail with a AnalysisException with message "No such struct field".
>>
>> We also tried dropping columns but this doesn't work for nested columns.
>>
>> Any non-hacky ways to get around this?
>>
>> Cheers,
>> -Kristoffer
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Stateful aggregations with Structured Streaming

2016-11-19 Thread Yuval.Itzchakov
I've been using `DStream.mapWithState` and was looking forward to trying out
Structured Streaming. The thing I can't under is, does Structured Streaming
in it's current state support stateful aggregations?

Looking at the StateStore design document
(https://docs.google.com/document/d/1-ncawFx8JS5Zyfq1HAEGBx56RDet9wfVp_hDM8ZL254/edit#heading=h.2h7zw4ru3nw7),
and then doing a bit of digging around in the Spark codebase, I've seen
`mapPartitionsWithStateStore` as the only viable way of doing something with
a store, but the API requires an `UnsafeRow` for key and value which makes
we question if this is a real public API one should be using?

Does anyone know what the state of things are currently in regards to an
equivalent to `mapWithState` in Structured Streaming?

Thanks,
Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stateful-aggregations-with-Structured-Streaming-tp28108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Run spark with hadoop snapshot

2016-11-19 Thread Steve Loughran
I'd recommend you build a fill spark release with the new hadoop version; you 
should have built that locally earlier the same day (so that ivy/maven pick up 
the snapshot)


dev/make-distribution.sh -Pyarn,hadoop-2.7,hive -Dhadoop.version=2.9.0-SNAPSHOT;



> On 18 Nov 2016, at 19:31, lminer  wrote:
> 
> I'm trying to figure out how to run spark with a snapshot of Hadoop 2.8 that
> I built myself. I'm unclear on the configuration needed to get spark to work
> with the snapshot.
> 
> I'm running spark on mesos. Per the spark documentation, I run spark-submit
> as follows using the `spark-2.0.2-bin-without-hadoop`, but spark doesn't
> appear to be finding hadoop 2.8.
> 
>export SPARK_DIST_CLASSPATH=$(/path/to/hadoop2.8/bin/hadoop classpath)
>spark-submit --verbose --master mesos://$MASTER_HOST/mesos
> 
> I get the error:
> 
>Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/hadoop/fs/FSDataInputStream
>at
> org.apache.spark.deploy.SparkSubmitArguments.handle(SparkSubmitArguments.scala:403)
>at
> org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:163)
>at
> org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:98)
>at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
>at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.hadoop.fs.FSDataInputStream
>at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>at java.security.AccessController.doPrivileged(Native Method)
>at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>... 5 more
> 
> Any ideas on the proper configuration?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Run-spark-with-hadoop-snapshot-tp28105.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reading LZO files with Spark

2016-11-19 Thread Sean Owen
Are you missing the hadoop-lzo package? it's not part of Hadoop/Spark.

On Sat, Nov 19, 2016 at 4:20 AM learning_spark <
dibyendu.chakraba...@gmail.com> wrote:

> Hi Users, I am not sure about the latest status of this issue:
> https://issues.apache.org/jira/browse/SPARK-2394 However, I have seen the
> following link:
> https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/reading-lzo-files.md
> My experience is limited, but I had had partial success from Spark shell,
> but my stand alone program did not even compile. I suspect some jar file is
> required. val files =
> sc.newAPIHadoopFile("s3://support.elasticmapreduce/spark/examples/lzodataindexed/*.lzo",
> classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])
> Does any one know how to do this from a stand alone program? Thanks and
> regards,
> --
> View this message in context: Reading LZO files with Spark
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>