Re: Wrting data from Spark streaming to AWS Redshift?

2016-12-09 Thread ayan guha
Ideally, saving data to external sources should not be any different. give
the write options as stated in the bloga shot, but changing mode to append.

On Sat, Dec 10, 2016 at 8:25 AM, shyla deshpande 
wrote:

> Hello all,
>
> Is it possible to Write data from Spark streaming to AWS Redshift?
>
> I came across the following article, so looks like it works from a Spark
> batch program.
>
> https://databricks.com/blog/2015/10/19/introducing-
> redshift-data-source-for-spark.html
>
> I want to write to AWS Redshift from Spark Stream. Please share your
> experience and reference docs.
>
> Thanks
>



-- 
Best Regards,
Ayan Guha


Unsubscribe

2016-12-09 Thread cjuexuan
Unsubscribe

Re: Random Forest hangs without trace of error

2016-12-09 Thread Md. Rezaul Karim
I had similar experience last week. Even I could not find any error trace.

Later on, I did the following to get rid of the problem:
i) I downgraded to Spark 2.0.0
ii) Decreased the value of maxBins and maxDepth

Additionally, make sure that you set the featureSubsetStrategy as "auto" to
let the algorithm choose the best feature subset strategy for your data.
Finally, set the impurity as "gini" for the information gain.

However, setting the value of no. of trees to just 1 does not give you
either real advantage of the forest neither better predictive performance.



Best,
Karim


On Dec 9, 2016 11:29 PM, "mhornbech"  wrote:

> Hi
>
> I have spent quite some time trying to debug an issue with the Random
> Forest
> algorithm on Spark 2.0.2. The input dataset is relatively large at around
> 600k rows and 200MB, but I use subsampling to make each tree manageable.
> However even with only 1 tree and a low sample rate of 0.05 the job hangs
> at
> one of the final stages (see attached). I have checked the logs on all
> executors and the driver and find no traces of error. Could it be a memory
> issue even though no error appears? The error does seem sporadic to some
> extent so I also wondered whether it could be a data issue, that only
> occurs
> if the subsample includes the bad data rows.
>
> Please comment if you have a clue.
>
> Morten
>
>  file/n28192/Sk%C3%A6rmbillede_2016-12-10_kl.png>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Random-Forest-hangs-without-trace-of-
> error-tp28192.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark log4j] Turning off log4j while scala program runs on spark-submit

2016-12-09 Thread Irving Duran
Hi -
I have a question about log4j while running on spark-submit.

I would like to have spark only show errors when I am running
spark-submit.  I would like to accomplish with without having to edit log4j
config file on $SPARK_HOME, is there a way to do this?

I found this and it only works on spark-shell (not spark-submit) ->
http://stackoverflow.com/questions/27781187/how-to-stop-messages-displaying-on-spark-console

Thanks for your help in advance.

Thank You,

Irving Duran


Random Forest hangs without trace of error

2016-12-09 Thread mhornbech
Hi

I have spent quite some time trying to debug an issue with the Random Forest
algorithm on Spark 2.0.2. The input dataset is relatively large at around
600k rows and 200MB, but I use subsampling to make each tree manageable.
However even with only 1 tree and a low sample rate of 0.05 the job hangs at
one of the final stages (see attached). I have checked the logs on all
executors and the driver and find no traces of error. Could it be a memory
issue even though no error appears? The error does seem sporadic to some
extent so I also wondered whether it could be a data issue, that only occurs
if the subsample includes the bad data rows. 

Please comment if you have a clue.

Morten


 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Random-Forest-hangs-without-trace-of-error-tp28192.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark job server pros and cons

2016-12-09 Thread Shak S
Spark job Server(SJS) gives you the ability to have your spark job as a
service.  It has features like caching RDD, publish rest APIs to submit
your job and named RDDs. For more info, refer
https://github.com/spark-jobserver/spark-jobserver. Internally SJS too uses
the same spark job submit so it up to your spark program to deal with
Kafka, Cassandra etc.

Cons --> You need to tweak the settings and configuration if SJS is not
running out of the box. you need to build on your own for production using
sbt, so some Scala knowledge is desirable. It is not completely out of the
box tool, u need to have some learning curve and trouble shooting.


On Fri, Dec 9, 2016 at 4:31 PM, Cassa L  wrote:

> Hi,
> So far, I ran spark jobs directly using spark-submit options.  I have a
> use case to use Spark Job server to run the job. I wanted to find out PROS
> and CONs of using this job server? If anyone can share it, it will be
> great.  My jobs usually connected to multiple data sources like Kafka,
> Custom receiver, Cassandra etc. Will these use cases work as is in job
> server?
>
> Thanks,
> Leena
>


Spark job server pros and cons

2016-12-09 Thread Cassa L
Hi,
So far, I ran spark jobs directly using spark-submit options.  I have a use
case to use Spark Job server to run the job. I wanted to find out PROS and
CONs of using this job server? If anyone can share it, it will be great.
My jobs usually connected to multiple data sources like Kafka, Custom
receiver, Cassandra etc. Will these use cases work as is in job server?

Thanks,
Leena


Wrting data from Spark streaming to AWS Redshift?

2016-12-09 Thread shyla deshpande
Hello all,

Is it possible to Write data from Spark streaming to AWS Redshift?

I came across the following article, so looks like it works from a Spark
batch program.

https://databricks.com/blog/2015/10/19/introducing-redshift-data-source-for-spark.html

I want to write to AWS Redshift from Spark Stream. Please share your
experience and reference docs.

Thanks


Document Similarity -Spark Mllib

2016-12-09 Thread satyajit vegesna
Hi ALL,

I am trying to implement a mlllib spark job, to find the similarity between
documents(for my case is basically home addess).

i believe i cannot use DIMSUM for my use case as, DIMSUM is works well only
with matrix with thin columns and more rows in matrix.

matrix example format, for my use case:

 doc1(address1)  doc2(address2) .. m is
going to be huge as i have more add.
  san mateo 0.73462 0
  san fransico   ..   ..
  san bruno   ....
   .
   .
   .
   .
 and n is going to be thin compared to m

I would like to know if there is way to leverage DIMSUM to work on my use
case, and if not what other alogrithm i can try that is available in spark
mlllib.

Regards,
Satyajit.


SparkSQL

2016-12-09 Thread Niraj Kumar
Hi

I am working on SpqrkSQL using hiveContext (version 1.6.2).
Can someone help me to convert following queries in sparkSQL.

update calls set sample = 'Y' where accnt_call_id in (select accnt_call_id from 
samples);
insert into details (accnt_call_id, prdct_cd, prdct_id, dtl_pstn) select 
accnt_call_id, prdct_cd, prdct_id, 32 from samples where PRDCT_CD = 2114515;
delete from samples where PRDCT_CD in (2106861, 2114515);

Thanks and Regards,
Niraj Kumar

Disclaimer : 
This email communication may contain privileged and confidential information 
and is intended for the use of the addressee only. If you are not an intended 
recipient you are requested not to reproduce, copy disseminate or in any manner 
distribute this email communication as the same is strictly prohibited. If you 
have received this email in error, please notify the sender immediately by 
return e-mail and delete the communication sent in error. Email communications 
cannot be guaranteed to be secure & error free and Incedo Inc. is not liable 
for any errors in the email communication or for the proper, timely and 
complete transmission thereof.


Information required

2016-12-09 Thread Rishabh Wadhawan
Does anyone know the repository link for the src of 
GroupID: org.spark-project.hive
Artifact: 1.2.1.spark 

I was able to find
https://github.com/JoshRosen/hive/tree/release-1.2.1-spark2 which is
artifact 1.2.1.spark2 not 1.2.1.spark. 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Information-required-tp28191.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
I'd say unzip your actual assembly jar and verify whether the kafka
consumer classes are 0.10.1 or 0.10.0.  We've seen reports of odd
behavior with 0.10.1 classes.  Possibly unrelated, but good to
eliminate.

On Fri, Dec 9, 2016 at 10:38 AM, Debasish Ghosh
 wrote:
> oops .. it's 0.10.0 .. sorry for the confusion ..
>
> On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh 
> wrote:
>>
>> My assembly contains the 0.10.1 classes .. Here are the dependencies
>> related to kafka & spark that my assembly has ..
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.kafka"  %   "kafka-streams"  %
>> "0.10.0.0",
>>   "org.apache.spark" %%   "spark-streaming-kafka-0-10" % spark,
>>   "org.apache.spark" %%   "spark-core" % spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-streaming"% spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-mllib"% spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-sql"  % spark %
>> "provided"
>> )
>>
>> regards.
>>
>> On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger 
>> wrote:
>>>
>>> When you say 0.10.1 do you mean broker version only, or does your
>>> assembly contain classes from the 0.10.1 kafka consumer?
>>>
>>> On Fri, Dec 9, 2016 at 10:19 AM, debasishg 
>>> wrote:
>>> > Hello -
>>> >
>>> > I am facing some issues with the following snippet of code that reads
>>> > from
>>> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
>>> > with
>>> > Kafka 0.10.1 and Spark 2.0.1.
>>> >
>>> > // get the data from kafka
>>> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>>> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>>> > streamingContext,
>>> > PreferConsistent,
>>> > Subscribe[Array[Byte], (String, String)](topicToReadFrom,
>>> > kafkaParams)
>>> >   )
>>> >
>>> > // label and vectorize the value
>>> > val projected: DStream[(String, Vector)] = stream.map { record =>
>>> >   val (label, value) = record.value
>>> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>>> >   (label, vector)
>>> > }.transform(projectToLowerDimension)
>>> >
>>> > In the above snippet if I have the call to transform in the last line,
>>> > I get
>>> > the following exception ..
>>> >
>>> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
>>> > not
>>> > safe for multi-threaded access
>>> > at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>>> > at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>> > at
>>> >
>>> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>>> > at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>>> > at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>>> > at
>>> > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>>> > at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>>> > at
>>> >
>>> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>>> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>>> > 
>>> >
>>> > The transform method does a PCA and gives the top 2 principal
>>> > components ..
>>> >
>>> > private def projectToLowerDimension: RDD[(String, Vector)] =>
>>> > RDD[(String,
>>> > Vector)] = { rdd =>
>>> >   if (rdd.isEmpty) rdd else {
>>> > // reduce to 2 dimensions
>>> > val pca = new PCA(2).fit(rdd.map(_._2))
>>> >
>>> > // Project vectors to the linear space spanned by the top 2
>>> > principal
>>> > // components, keeping the label
>>> > rdd.map(p => (p._1, pca.transform(p._2)))
>>> >   }
>>> > }
>>> >
>>> > However if I remove the transform call, I can process everything
>>> > correctly.
>>> >
>>> > Any help will be most welcome ..
>>> >
>>> > regards.
>>> > - Debasish
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> > 

Re: how can I set the log configuration file for spark history server ?

2016-12-09 Thread Marcelo Vanzin
(-dev)

Just configure your log4j.properties in $SPARK_HOME/conf (or set a
custom $SPARK_CONF_DIR for the history server).

On Thu, Dec 8, 2016 at 7:20 PM, John Fang  wrote:
> ./start-history-server.sh
> starting org.apache.spark.deploy.history.HistoryServer, logging to
> /home/admin/koala/data/versions/0/SPARK/2.0.2/spark-2.0.2-bin-hadoop2.6/logs/spark-admin-org.apache.spark.deploy.history.HistoryServer-1-v069166214.sqa.zmf.out
>
> Then the history will print all log to the XXX.sqa.zmf.out, so i can't limit
> the file max size.  I want limit the size of the log file



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: problem with kafka createDirectStream ..

2016-12-09 Thread Debasish Ghosh
oops .. it's 0.10.0 .. sorry for the confusion ..

On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh 
wrote:

> My assembly contains the 0.10.1 classes .. Here are the dependencies
> related to kafka & spark that my assembly has ..
>
> libraryDependencies ++= Seq(
>   "org.apache.kafka"  %   "kafka-streams"  %
> "0.10.0.0",
>   "org.apache.spark" %%   "spark-streaming-kafka-0-10" % spark,
>   "org.apache.spark" %%   "spark-core" % spark %
> "provided",
>   "org.apache.spark" %%   "spark-streaming"% spark %
> "provided",
>   "org.apache.spark" %%   "spark-mllib"% spark %
> "provided",
>   "org.apache.spark" %%   "spark-sql"  % spark %
> "provided"
> )
>
> regards.
>
> On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger 
> wrote:
>
>> When you say 0.10.1 do you mean broker version only, or does your
>> assembly contain classes from the 0.10.1 kafka consumer?
>>
>> On Fri, Dec 9, 2016 at 10:19 AM, debasishg 
>> wrote:
>> > Hello -
>> >
>> > I am facing some issues with the following snippet of code that reads
>> from
>> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
>> with
>> > Kafka 0.10.1 and Spark 2.0.1.
>> >
>> > // get the data from kafka
>> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>> > streamingContext,
>> > PreferConsistent,
>> > Subscribe[Array[Byte], (String, String)](topicToReadFrom,
>> kafkaParams)
>> >   )
>> >
>> > // label and vectorize the value
>> > val projected: DStream[(String, Vector)] = stream.map { record =>
>> >   val (label, value) = record.value
>> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>> >   (label, vector)
>> > }.transform(projectToLowerDimension)
>> >
>> > In the above snippet if I have the call to transform in the last line,
>> I get
>> > the following exception ..
>> >
>> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
>> not
>> > safe for multi-threaded access
>> > at
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> > at
>> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaCo
>> nsumer.java:1132)
>> > at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek
>> (CachedKafkaConsumer.scala:95)
>> > at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(
>> CachedKafkaConsumer.scala:69)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:227)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:193)
>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> > at
>> > scala.collection.generic.Growable$class.$plus$plus$eq(Growab
>> le.scala:59)
>> > at
>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff
>> er.scala:104)
>> > at
>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff
>> er.scala:48)
>> > at scala.collection.TraversableOnce$class.to(TraversableOnce.
>> scala:310)
>> > at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>> > at
>> > scala.collection.TraversableOnce$class.toBuffer(
>> TraversableOnce.scala:302)
>> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>> > 
>> >
>> > The transform method does a PCA and gives the top 2 principal
>> components ..
>> >
>> > private def projectToLowerDimension: RDD[(String, Vector)] =>
>> RDD[(String,
>> > Vector)] = { rdd =>
>> >   if (rdd.isEmpty) rdd else {
>> > // reduce to 2 dimensions
>> > val pca = new PCA(2).fit(rdd.map(_._2))
>> >
>> > // Project vectors to the linear space spanned by the top 2
>> principal
>> > // components, keeping the label
>> > rdd.map(p => (p._1, pca.transform(p._2)))
>> >   }
>> > }
>> >
>> > However if I remove the transform call, I can process everything
>> correctly.
>> >
>> > Any help will be most welcome ..
>> >
>> > regards.
>> > - Debasish
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: 

Re: problem with kafka createDirectStream ..

2016-12-09 Thread Debasish Ghosh
My assembly contains the 0.10.1 classes .. Here are the dependencies
related to kafka & spark that my assembly has ..

libraryDependencies ++= Seq(
  "org.apache.kafka"  %   "kafka-streams"  % "0.10.0.0",
  "org.apache.spark" %%   "spark-streaming-kafka-0-10" % spark,
  "org.apache.spark" %%   "spark-core" % spark %
"provided",
  "org.apache.spark" %%   "spark-streaming"% spark %
"provided",
  "org.apache.spark" %%   "spark-mllib"% spark %
"provided",
  "org.apache.spark" %%   "spark-sql"  % spark %
"provided"
)

regards.

On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger  wrote:

> When you say 0.10.1 do you mean broker version only, or does your
> assembly contain classes from the 0.10.1 kafka consumer?
>
> On Fri, Dec 9, 2016 at 10:19 AM, debasishg 
> wrote:
> > Hello -
> >
> > I am facing some issues with the following snippet of code that reads
> from
> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
> with
> > Kafka 0.10.1 and Spark 2.0.1.
> >
> > // get the data from kafka
> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
> > streamingContext,
> > PreferConsistent,
> > Subscribe[Array[Byte], (String, String)](topicToReadFrom,
> kafkaParams)
> >   )
> >
> > // label and vectorize the value
> > val projected: DStream[(String, Vector)] = stream.map { record =>
> >   val (label, value) = record.value
> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
> >   (label, vector)
> > }.transform(projectToLowerDimension)
> >
> > In the above snippet if I have the call to transform in the last line, I
> get
> > the following exception ..
> >
> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
> not
> > safe for multi-threaded access
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(
> KafkaConsumer.java:1132)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> seek(CachedKafkaConsumer.scala:95)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:69)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> > at
> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> > at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(
> ArrayBuffer.scala:104)
> > at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> > at scala.collection.TraversableOnce$class.to(
> TraversableOnce.scala:310)
> > at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> > at
> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.
> scala:302)
> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> > 
> >
> > The transform method does a PCA and gives the top 2 principal components
> ..
> >
> > private def projectToLowerDimension: RDD[(String, Vector)] =>
> RDD[(String,
> > Vector)] = { rdd =>
> >   if (rdd.isEmpty) rdd else {
> > // reduce to 2 dimensions
> > val pca = new PCA(2).fit(rdd.map(_._2))
> >
> > // Project vectors to the linear space spanned by the top 2 principal
> > // components, keeping the label
> > rdd.map(p => (p._1, pca.transform(p._2)))
> >   }
> > }
> >
> > However if I remove the transform call, I can process everything
> correctly.
> >
> > Any help will be most welcome ..
> >
> > regards.
> > - Debasish
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
When you say 0.10.1 do you mean broker version only, or does your
assembly contain classes from the 0.10.1 kafka consumer?

On Fri, Dec 9, 2016 at 10:19 AM, debasishg  wrote:
> Hello -
>
> I am facing some issues with the following snippet of code that reads from
> Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
> Kafka 0.10.1 and Spark 2.0.1.
>
> // get the data from kafka
> val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
> streamingContext,
> PreferConsistent,
> Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
>   )
>
> // label and vectorize the value
> val projected: DStream[(String, Vector)] = stream.map { record =>
>   val (label, value) = record.value
>   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>   (label, vector)
> }.transform(projectToLowerDimension)
>
> In the above snippet if I have the call to transform in the last line, I get
> the following exception ..
>
> Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> 
>
> The transform method does a PCA and gives the top 2 principal components ..
>
> private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
> Vector)] = { rdd =>
>   if (rdd.isEmpty) rdd else {
> // reduce to 2 dimensions
> val pca = new PCA(2).fit(rdd.map(_._2))
>
> // Project vectors to the linear space spanned by the top 2 principal
> // components, keeping the label
> rdd.map(p => (p._1, pca.transform(p._2)))
>   }
> }
>
> However if I remove the transform call, I can process everything correctly.
>
> Any help will be most welcome ..
>
> regards.
> - Debasish
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



problem with kafka createDirectStream ..

2016-12-09 Thread debasishg
Hello -

I am facing some issues with the following snippet of code that reads from
Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
Kafka 0.10.1 and Spark 2.0.1.

// get the data from kafka
val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = 
  KafkaUtils.createDirectStream[Array[Byte], (String, String)](
streamingContext,
PreferConsistent,
Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
  )

// label and vectorize the value
val projected: DStream[(String, Vector)] = stream.map { record =>
  val (label, value) = record.value
  val vector = Vectors.dense(value.split(",").map(_.toDouble))
  (label, vector)
}.transform(projectToLowerDimension)

In the above snippet if I have the call to transform in the last line, I get
the following exception ..

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
safe for multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)


The transform method does a PCA and gives the top 2 principal components ..

private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
Vector)] = { rdd =>
  if (rdd.isEmpty) rdd else {
// reduce to 2 dimensions
val pca = new PCA(2).fit(rdd.map(_._2))

// Project vectors to the linear space spanned by the top 2 principal
// components, keeping the label
rdd.map(p => (p._1, pca.transform(p._2)))
  }
}

However if I remove the transform call, I can process everything correctly.

Any help will be most welcome ..

regards.
- Debasish



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



problem with kafka createDirectStream ..

2016-12-09 Thread Debasish Ghosh
Hello -

I am facing some issues with the following snippet of code that reads from
Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
with Kafka 0.10.1 and Spark 2.0.1.

// get the data from kafka
val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
  KafkaUtils.createDirectStream[Array[Byte], (String, String)](
streamingContext,
PreferConsistent,
Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
  )

// label and vectorize the value
val projected: DStream[(String, Vector)] = stream.map { record =>
  val (label, value) = record.value
  val vector = Vectors.dense(value.split(",").map(_.toDouble))
  (label, vector)
}.transform(projectToLowerDimension)

In the above snippet if I have the call to transform in the last line, I
get the following exception ..

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> 


The transform method does a PCA and gives the top 2 principal components ..

private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
Vector)] = { rdd =>
  if (rdd.isEmpty) rdd else {
// reduce to 2 dimensions
val pca = new PCA(2).fit(rdd.map(_._2))

// Project vectors to the linear space spanned by the top 2 principal
// components, keeping the label
rdd.map(p => (p._1, pca.transform(p._2)))
  }
}

However if I remove the transform call, I can process everything correctly.

Any help will be most welcome ..

regards.
-- 
Debasish Ghosh


Re: unit testing in spark

2016-12-09 Thread Michael Stratton
That sounds great, please include me so I can get involved.

On Fri, Dec 9, 2016 at 7:39 AM, Marco Mistroni  wrote:

> Me too as I spent most of my time writing unit/integ tests  pls advise
> on where I  can start
> Kr
>
> On 9 Dec 2016 12:15 am, "Miguel Morales"  wrote:
>
>> I would be interested in contributing.  Ive created my own library for
>> this as well.  In my blog post I talk about testing with Spark in RSpec
>> style:
>> https://medium.com/@therevoltingx/test-driven-development-w-
>> apache-spark-746082b44941
>>
>> Sent from my iPhone
>>
>> On Dec 8, 2016, at 4:09 PM, Holden Karau  wrote:
>>
>> There are also libraries designed to simplify testing Spark in the
>> various platforms, spark-testing-base
>>  for Scala/Java/Python (&
>> video https://www.youtube.com/watch?v=f69gSGSLGrY), sscheck
>>  (scala focused property based),
>> pyspark.test (python focused with py.test instead of unittest2) (& blog
>> post from nextdoor https://engblog.nextdoor.com/unit-testing-apache-
>> spark-with-py-test-3b8970dc013b#.jw3bdcej9 )
>>
>> Good luck on your Spark Adventures :)
>>
>> P.S.
>>
>> If anyone is interested in helping improve spark testing libraries I'm
>> always looking for more people to be involved with spark-testing-base
>> because I'm lazy :p
>>
>> On Thu, Dec 8, 2016 at 2:05 PM, Lars Albertsson 
>> wrote:
>>
>>> I wrote some advice in a previous post on the list:
>>> http://markmail.org/message/bbs5acrnksjxsrrs
>>>
>>> It does not mention python, but the strategy advice is the same. Just
>>> replace JUnit/Scalatest with pytest, unittest, or your favourite
>>> python test framework.
>>>
>>>
>>> I recently held a presentation on the subject. There is a video
>>> recording at https://vimeo.com/192429554 and slides at
>>> http://www.slideshare.net/lallea/test-strategies-for-data-pr
>>> ocessing-pipelines-67244458
>>>
>>> You can find more material on test strategies at
>>> http://www.mapflat.com/lands/resources/reading-list/index.html
>>>
>>>
>>>
>>>
>>> Lars Albertsson
>>> Data engineering consultant
>>> www.mapflat.com
>>> https://twitter.com/lalleal
>>> +46 70 7687109
>>> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
>>>
>>>
>>> On Thu, Dec 8, 2016 at 4:14 PM, pseudo oduesp 
>>> wrote:
>>> > somone can tell me how i can make unit test on pyspark ?
>>> > (book, tutorial ...)
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>>


Re: When will Structured Streaming support stream-to-stream joins?

2016-12-09 Thread ljwagerfield
Michael Armbrust's reply:

I would guess Spark 2.3, but maybe sooner maybe later depending on demand. 
I created https://issues.apache.org/jira/browse/SPARK-18791 so people can
describe their requirements / stay informed.

---

Lawrence's reply:

Please vote on the issue, people! Would be awesome to have this in :D



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/When-will-Structured-Streaming-support-stream-to-stream-joins-tp28185p28189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: About transformations

2016-12-09 Thread Mendelson, Assaf
This is a guess but I would bet that most of the time when into the loading of 
the data. The second time there are many places this could be cached (either  
by spark or even by the OS if you are reading from file).

-Original Message-
From: brccosta [mailto:brunocosta@gmail.com] 
Sent: Friday, December 09, 2016 1:24 PM
To: user@spark.apache.org
Subject: About transformations

Dear guys,

We're performing some tests to evaluate the behavior of transformations and 
actions in Spark with Spark SQL. In our tests, first we conceive a simple 
dataflow with 2 transformations and 1 action:

LOAD (result: df_1) > SELECT ALL FROM df_1 (result: df_2) > COUNT(df_2)

The execution time for this first dataflow was 10 seconds. Next, we added 
another action to our dataflow:

LOAD (result: df_1) > SELECT ALL FROM df_1 (result: df_2) > COUNT(df_2) >
COUNT(df_2)

Analyzing the second version of the dataflow, since all transformation are lazy 
and re-executed for each action (according to the documentation), when 
executing the second count, it should require the execution of the two previous 
transformations (LOAD and SELECT ALL). Thus, we expected that when executing 
this second version of our dataflow, the time would be around 20 seconds. 
However, the execution time was 11 seconds. Apparently, the results of the 
transformations required by the first count were cached by Spark for the second 
count.

Please, do you guys know what is happening? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/About-transformations-tp28188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: unit testing in spark

2016-12-09 Thread Marco Mistroni
Me too as I spent most of my time writing unit/integ tests  pls advise
on where I  can start
Kr

On 9 Dec 2016 12:15 am, "Miguel Morales"  wrote:

> I would be interested in contributing.  Ive created my own library for
> this as well.  In my blog post I talk about testing with Spark in RSpec
> style:
> https://medium.com/@therevoltingx/test-driven-development-w-apache-spark-
> 746082b44941
>
> Sent from my iPhone
>
> On Dec 8, 2016, at 4:09 PM, Holden Karau  wrote:
>
> There are also libraries designed to simplify testing Spark in the various
> platforms, spark-testing-base
>  for Scala/Java/Python (&
> video https://www.youtube.com/watch?v=f69gSGSLGrY), sscheck
>  (scala focused property based),
> pyspark.test (python focused with py.test instead of unittest2) (& blog
> post from nextdoor https://engblog.nextdoor.com/unit-testing-
> apache-spark-with-py-test-3b8970dc013b#.jw3bdcej9 )
>
> Good luck on your Spark Adventures :)
>
> P.S.
>
> If anyone is interested in helping improve spark testing libraries I'm
> always looking for more people to be involved with spark-testing-base
> because I'm lazy :p
>
> On Thu, Dec 8, 2016 at 2:05 PM, Lars Albertsson  wrote:
>
>> I wrote some advice in a previous post on the list:
>> http://markmail.org/message/bbs5acrnksjxsrrs
>>
>> It does not mention python, but the strategy advice is the same. Just
>> replace JUnit/Scalatest with pytest, unittest, or your favourite
>> python test framework.
>>
>>
>> I recently held a presentation on the subject. There is a video
>> recording at https://vimeo.com/192429554 and slides at
>> http://www.slideshare.net/lallea/test-strategies-for-data-
>> processing-pipelines-67244458
>>
>> You can find more material on test strategies at
>> http://www.mapflat.com/lands/resources/reading-list/index.html
>>
>>
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> https://twitter.com/lalleal
>> +46 70 7687109
>> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
>>
>>
>> On Thu, Dec 8, 2016 at 4:14 PM, pseudo oduesp 
>> wrote:
>> > somone can tell me how i can make unit test on pyspark ?
>> > (book, tutorial ...)
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>
>


groupByKey vs reduceByKey

2016-12-09 Thread Appu K
Hi,

Read somewhere that

groupByKey() in RDD disables map-side aggregation as the aggregation
function (appending to a list) does not save any space.


However from my understanding, using something like reduceByKey or
 (CombineByKey + a combiner function,) we could reduce the data shuffled
around.

Wondering why map-side aggregation is disabled for groupByKey() and why it
wouldn’t save space at the executor where data is received after the
shuffle.


cheers
Appu


About transformations

2016-12-09 Thread brccosta
Dear guys,

We're performing some tests to evaluate the behavior of transformations and
actions in Spark with Spark SQL. In our tests, first we conceive a simple
dataflow with 2 transformations and 1 action:

LOAD (result: df_1) > SELECT ALL FROM df_1 (result: df_2) > COUNT(df_2)

The execution time for this first dataflow was 10 seconds. Next, we added
another action to our dataflow:

LOAD (result: df_1) > SELECT ALL FROM df_1 (result: df_2) > COUNT(df_2) >
COUNT(df_2)

Analyzing the second version of the dataflow, since all transformation are
lazy and re-executed for each action (according to the documentation), when
executing the second count, it should require the execution of the two
previous transformations (LOAD and SELECT ALL). Thus, we expected that when
executing this second version of our dataflow, the time would be around 20
seconds. However, the execution time was 11 seconds. Apparently, the results
of the transformations required by the first count were cached by Spark for
the second count.

Please, do you guys know what is happening? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/About-transformations-tp28188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Few questions on reliability of accumulators value.

2016-12-09 Thread Sudev A C
Hi,

Can anyone please help clarity on how accumulators can be used reliably to
measure error/success/analytical metrics ?

Given below is use case / code snippet that I have.

val amtZero = sc.accumulator(0)
> val amtLarge = sc.accumulator(0)
> val amtNormal = sc.accumulator(0)
> val getAmount = (x: org.apache.spark.sql.Row) => if (x.isNullAt(somePos)) {
>   amtZero.add(1)
>   0.0
> } else {
>   val amount = x.getDouble(4)
>   if (amount > 1) amtLarge.add(1) else amtNormal.add(1)
>   amount
> }
> mrdd = rdd.map(s => (s, getAmount(s)))
> mrdd.cache()
> another_mrdd = rdd2.map(s => (s, getAmount(s)))
> mrdd.save_to_redshift
> another_mrdd.save_to_redshift
> mrdd.union(another_mrdd).map().groupByKey().save_to_redshift



// Get values from accumulators and persist it to a reliable store for
> analytics.
> save_to_datastore(amtZero.value, amtLarge.value, amtNormal.value)



Few questions :

1. How many times should I expect the counts for items within mrdd and
another_mrdd since both of these rdd's are being reused ? What happens when
a part of DAG is skipped due to caching in between (say I'm caching
only mrdd)?

2. Should I be worried about any possible stage/task failures (due to
master-wroker network issues/resource-starvation/speculative-execution),
can these events lead to wrong counts ?

3. Document says  **In transformations, users should be aware of that each
task’s update may be applied more than once if tasks or job stages are
re-executed.**
Here re-execution of stages/tasks is referring to failure re-executions or
re-execution due to stage/tasks position in DAG ?

4. Is it safe to say that usage of accumulators(for exact counts) are
limited to .foreach() as actions guarantee exactly once updates ?

Thanks
Sudev


Re: reading data from s3

2016-12-09 Thread Sudev A C
Hi Hitesh,

Schema of the table is inferred automatically if you are reading from JSON
file, wherein when you are reading from a text file you will have to
provide a schema for the table you want to create (JSON has schema within
it).

You can create a data frames and register them as tables.
1. Inferring schema using reflection

2. Programmatically Specifying the Schema


Also you may use packages like spark-csv to infer the schema from CSV.
https://github.com/databricks/spark-csv#sql-api

Thanks
Sudev

On Fri, Dec 9, 2016 at 11:13 AM Hitesh Goyal 
wrote:

Hi team,

I want to read the text file from s3. I am doing it using DataFrame. Like
below:-

DataFrame d=sql.read().text("s3://my_first_text_file.txt");

  d.registerTempTable("table1");

  DataFrame d1=sql.sql("Select * from table1");

  d1.printSchema();

  d1.show();



But it is not registering the text file as a temp table so that I  can make
SQL  queries on that. Can’t I do this on a text file ?? Or if I can,
suggest any way to do.

Like if I try to do it by JSON file, it is successful.



Regards,

*Hitesh Goyal*

Simpli5d Technologies

Cont No.: 9996588220