Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
>>> > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>>> > at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>>> > at
>>> >
>>> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>>> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>>> > 
>>> >
>>> > The transform method does a PCA and gives the top 2 principal
>>> > components ..
>>> >
>>> > private def projectToLowerDimension: RDD[(String, Vector)] =>
>>> > RDD[(String,
>>> > Vector)] = { rdd =>
>>> >   if (rdd.isEmpty) rdd else {
>>> > // reduce to 2 dimensions
>>> > val pca = new PCA(2).fit(rdd.map(_._2))
>>> >
>>> > // Project vectors to the linear space spanned by the top 2
>>> > principal
>>> > // components, keeping the label
>>> > rdd.map(p => (p._1, pca.transform(p._2)))
>>> >   }
>>> > }
>>> >
>>> > However if I remove the transform call, I can process everything
>>> > correctly.
>>> >
>>> > Any help will be most welcome ..
>>> >
>>> > regards.
>>> > - Debasish
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> > http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> > Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>
>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>
>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: problem with kafka createDirectStream ..

2016-12-09 Thread Debasish Ghosh
oops .. it's 0.10.0 .. sorry for the confusion ..

On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> My assembly contains the 0.10.1 classes .. Here are the dependencies
> related to kafka & spark that my assembly has ..
>
> libraryDependencies ++= Seq(
>   "org.apache.kafka"  %   "kafka-streams"  %
> "0.10.0.0",
>   "org.apache.spark" %%   "spark-streaming-kafka-0-10" % spark,
>   "org.apache.spark" %%   "spark-core" % spark %
> "provided",
>   "org.apache.spark" %%   "spark-streaming"% spark %
> "provided",
>   "org.apache.spark" %%   "spark-mllib"% spark %
> "provided",
>   "org.apache.spark" %%   "spark-sql"  % spark %
> "provided"
> )
>
> regards.
>
> On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> When you say 0.10.1 do you mean broker version only, or does your
>> assembly contain classes from the 0.10.1 kafka consumer?
>>
>> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com>
>> wrote:
>> > Hello -
>> >
>> > I am facing some issues with the following snippet of code that reads
>> from
>> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
>> with
>> > Kafka 0.10.1 and Spark 2.0.1.
>> >
>> > // get the data from kafka
>> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>> > streamingContext,
>> > PreferConsistent,
>> > Subscribe[Array[Byte], (String, String)](topicToReadFrom,
>> kafkaParams)
>> >   )
>> >
>> > // label and vectorize the value
>> > val projected: DStream[(String, Vector)] = stream.map { record =>
>> >   val (label, value) = record.value
>> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>> >   (label, vector)
>> > }.transform(projectToLowerDimension)
>> >
>> > In the above snippet if I have the call to transform in the last line,
>> I get
>> > the following exception ..
>> >
>> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
>> not
>> > safe for multi-threaded access
>> > at
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> > at
>> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaCo
>> nsumer.java:1132)
>> > at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek
>> (CachedKafkaConsumer.scala:95)
>> > at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(
>> CachedKafkaConsumer.scala:69)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:227)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato
>> r.next(KafkaRDD.scala:193)
>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> > at
>> > scala.collection.generic.Growable$class.$plus$plus$eq(Growab
>> le.scala:59)
>> > at
>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff
>> er.scala:104)
>> > at
>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff
>> er.scala:48)
>> > at scala.collection.TraversableOnce$class.to(TraversableOnce.
>> scala:310)
>> > at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>> > at
>> > scala.collection.TraversableOnce$class.toBuffer(
>> TraversableOnce.scala:302)
>> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>> > 
>> >
>> > The transform method does a PCA and gives the top 2 principal
>> components ..
>> >
>> > private def projectToLowerDimension: RDD[(String, Vector)] =>
>> RDD[(String,
>> > Vector)] = { rdd =>
>> >   if (rdd.isEmpty) rdd else {
>> > // reduce to 2 dimensions
>> 

Re: problem with kafka createDirectStream ..

2016-12-09 Thread Debasish Ghosh
My assembly contains the 0.10.1 classes .. Here are the dependencies
related to kafka & spark that my assembly has ..

libraryDependencies ++= Seq(
  "org.apache.kafka"  %   "kafka-streams"  % "0.10.0.0",
  "org.apache.spark" %%   "spark-streaming-kafka-0-10" % spark,
  "org.apache.spark" %%   "spark-core" % spark %
"provided",
  "org.apache.spark" %%   "spark-streaming"% spark %
"provided",
  "org.apache.spark" %%   "spark-mllib"% spark %
"provided",
  "org.apache.spark" %%   "spark-sql"  % spark %
"provided"
)

regards.

On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org> wrote:

> When you say 0.10.1 do you mean broker version only, or does your
> assembly contain classes from the 0.10.1 kafka consumer?
>
> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com>
> wrote:
> > Hello -
> >
> > I am facing some issues with the following snippet of code that reads
> from
> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
> with
> > Kafka 0.10.1 and Spark 2.0.1.
> >
> > // get the data from kafka
> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
> > streamingContext,
> > PreferConsistent,
> > Subscribe[Array[Byte], (String, String)](topicToReadFrom,
> kafkaParams)
> >   )
> >
> > // label and vectorize the value
> > val projected: DStream[(String, Vector)] = stream.map { record =>
> >   val (label, value) = record.value
> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
> >   (label, vector)
> > }.transform(projectToLowerDimension)
> >
> > In the above snippet if I have the call to transform in the last line, I
> get
> > the following exception ..
> >
> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
> not
> > safe for multi-threaded access
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> > at
> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(
> KafkaConsumer.java:1132)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> seek(CachedKafkaConsumer.scala:95)
> > at
> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.
> get(CachedKafkaConsumer.scala:69)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:227)
> > at
> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(
> KafkaRDD.scala:193)
> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> > at
> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> > at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(
> ArrayBuffer.scala:104)
> > at
> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> > at scala.collection.TraversableOnce$class.to(
> TraversableOnce.scala:310)
> > at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> > at
> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.
> scala:302)
> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> > 
> >
> > The transform method does a PCA and gives the top 2 principal components
> ..
> >
> > private def projectToLowerDimension: RDD[(String, Vector)] =>
> RDD[(String,
> > Vector)] = { rdd =>
> >   if (rdd.isEmpty) rdd else {
> > // reduce to 2 dimensions
> > val pca = new PCA(2).fit(rdd.map(_._2))
> >
> > // Project vectors to the linear space spanned by the top 2 principal
> > // components, keeping the label
> > rdd.map(p => (p._1, pca.transform(p._2)))
> >   }
> > }
> >
> > However if I remove the transform call, I can process everything
> correctly.
> >
> > Any help will be most welcome ..
> >
> > regards.
> > - Debasish
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>



-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
When you say 0.10.1 do you mean broker version only, or does your
assembly contain classes from the 0.10.1 kafka consumer?

On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com> wrote:
> Hello -
>
> I am facing some issues with the following snippet of code that reads from
> Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
> Kafka 0.10.1 and Spark 2.0.1.
>
> // get the data from kafka
> val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
> streamingContext,
> PreferConsistent,
> Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
>   )
>
> // label and vectorize the value
> val projected: DStream[(String, Vector)] = stream.map { record =>
>   val (label, value) = record.value
>   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>   (label, vector)
> }.transform(projectToLowerDimension)
>
> In the above snippet if I have the call to transform in the last line, I get
> the following exception ..
>
> Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> 
>
> The transform method does a PCA and gives the top 2 principal components ..
>
> private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
> Vector)] = { rdd =>
>   if (rdd.isEmpty) rdd else {
> // reduce to 2 dimensions
> val pca = new PCA(2).fit(rdd.map(_._2))
>
> // Project vectors to the linear space spanned by the top 2 principal
> // components, keeping the label
> rdd.map(p => (p._1, pca.transform(p._2)))
>   }
> }
>
> However if I remove the transform call, I can process everything correctly.
>
> Any help will be most welcome ..
>
> regards.
> - Debasish
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



problem with kafka createDirectStream ..

2016-12-09 Thread debasishg
Hello -

I am facing some issues with the following snippet of code that reads from
Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
Kafka 0.10.1 and Spark 2.0.1.

// get the data from kafka
val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = 
  KafkaUtils.createDirectStream[Array[Byte], (String, String)](
streamingContext,
PreferConsistent,
Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
  )

// label and vectorize the value
val projected: DStream[(String, Vector)] = stream.map { record =>
  val (label, value) = record.value
  val vector = Vectors.dense(value.split(",").map(_.toDouble))
  (label, vector)
}.transform(projectToLowerDimension)

In the above snippet if I have the call to transform in the last line, I get
the following exception ..

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
safe for multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
at
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)


The transform method does a PCA and gives the top 2 principal components ..

private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
Vector)] = { rdd =>
  if (rdd.isEmpty) rdd else {
// reduce to 2 dimensions
val pca = new PCA(2).fit(rdd.map(_._2))

// Project vectors to the linear space spanned by the top 2 principal
// components, keeping the label
rdd.map(p => (p._1, pca.transform(p._2)))
  }
}

However if I remove the transform call, I can process everything correctly.

Any help will be most welcome ..

regards.
- Debasish



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



problem with kafka createDirectStream ..

2016-12-09 Thread Debasish Ghosh
Hello -

I am facing some issues with the following snippet of code that reads from
Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
with Kafka 0.10.1 and Spark 2.0.1.

// get the data from kafka
val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
  KafkaUtils.createDirectStream[Array[Byte], (String, String)](
streamingContext,
PreferConsistent,
Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
  )

// label and vectorize the value
val projected: DStream[(String, Vector)] = stream.map { record =>
  val (label, value) = record.value
  val vector = Vectors.dense(value.split(",").map(_.toDouble))
  (label, vector)
}.transform(projectToLowerDimension)

In the above snippet if I have the call to transform in the last line, I
get the following exception ..

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> 


The transform method does a PCA and gives the top 2 principal components ..

private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
Vector)] = { rdd =>
  if (rdd.isEmpty) rdd else {
// reduce to 2 dimensions
val pca = new PCA(2).fit(rdd.map(_._2))

// Project vectors to the linear space spanned by the top 2 principal
// components, keeping the label
rdd.map(p => (p._1, pca.transform(p._2)))
  }
}

However if I remove the transform call, I can process everything correctly.

Any help will be most welcome ..

regards.
-- 
Debasish Ghosh