Re: problem with kafka createDirectStream ..
>>> > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) >>> > at scala.collection.AbstractIterator.to(Iterator.scala:1336) >>> > at >>> > >>> > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) >>> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) >>> > >>> > >>> > The transform method does a PCA and gives the top 2 principal >>> > components .. >>> > >>> > private def projectToLowerDimension: RDD[(String, Vector)] => >>> > RDD[(String, >>> > Vector)] = { rdd => >>> > if (rdd.isEmpty) rdd else { >>> > // reduce to 2 dimensions >>> > val pca = new PCA(2).fit(rdd.map(_._2)) >>> > >>> > // Project vectors to the linear space spanned by the top 2 >>> > principal >>> > // components, keeping the label >>> > rdd.map(p => (p._1, pca.transform(p._2))) >>> > } >>> > } >>> > >>> > However if I remove the transform call, I can process everything >>> > correctly. >>> > >>> > Any help will be most welcome .. >>> > >>> > regards. >>> > - Debasish >>> > >>> > >>> > >>> > -- >>> > View this message in context: >>> > http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html >>> > Sent from the Apache Spark User List mailing list archive at >>> > Nabble.com. >>> > >>> > - >>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> > >> >> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg > > > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: problem with kafka createDirectStream ..
oops .. it's 0.10.0 .. sorry for the confusion .. On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > My assembly contains the 0.10.1 classes .. Here are the dependencies > related to kafka & spark that my assembly has .. > > libraryDependencies ++= Seq( > "org.apache.kafka" % "kafka-streams" % > "0.10.0.0", > "org.apache.spark" %% "spark-streaming-kafka-0-10" % spark, > "org.apache.spark" %% "spark-core" % spark % > "provided", > "org.apache.spark" %% "spark-streaming"% spark % > "provided", > "org.apache.spark" %% "spark-mllib"% spark % > "provided", > "org.apache.spark" %% "spark-sql" % spark % > "provided" > ) > > regards. > > On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> When you say 0.10.1 do you mean broker version only, or does your >> assembly contain classes from the 0.10.1 kafka consumer? >> >> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com> >> wrote: >> > Hello - >> > >> > I am facing some issues with the following snippet of code that reads >> from >> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) >> with >> > Kafka 0.10.1 and Spark 2.0.1. >> > >> > // get the data from kafka >> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = >> > KafkaUtils.createDirectStream[Array[Byte], (String, String)]( >> > streamingContext, >> > PreferConsistent, >> > Subscribe[Array[Byte], (String, String)](topicToReadFrom, >> kafkaParams) >> > ) >> > >> > // label and vectorize the value >> > val projected: DStream[(String, Vector)] = stream.map { record => >> > val (label, value) = record.value >> > val vector = Vectors.dense(value.split(",").map(_.toDouble)) >> > (label, vector) >> > }.transform(projectToLowerDimension) >> > >> > In the above snippet if I have the call to transform in the last line, >> I get >> > the following exception .. >> > >> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is >> not >> > safe for multi-threaded access >> > at >> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire( >> KafkaConsumer.java:1431) >> > at >> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaCo >> nsumer.java:1132) >> > at >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek >> (CachedKafkaConsumer.scala:95) >> > at >> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get( >> CachedKafkaConsumer.scala:69) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:227) >> > at >> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterato >> r.next(KafkaRDD.scala:193) >> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) >> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) >> > at scala.collection.Iterator$class.foreach(Iterator.scala:893) >> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >> > at >> > scala.collection.generic.Growable$class.$plus$plus$eq(Growab >> le.scala:59) >> > at >> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff >> er.scala:104) >> > at >> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuff >> er.scala:48) >> > at scala.collection.TraversableOnce$class.to(TraversableOnce. >> scala:310) >> > at scala.collection.AbstractIterator.to(Iterator.scala:1336) >> > at >> > scala.collection.TraversableOnce$class.toBuffer( >> TraversableOnce.scala:302) >> > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) >> > >> > >> > The transform method does a PCA and gives the top 2 principal >> components .. >> > >> > private def projectToLowerDimension: RDD[(String, Vector)] => >> RDD[(String, >> > Vector)] = { rdd => >> > if (rdd.isEmpty) rdd else { >> > // reduce to 2 dimensions >>
Re: problem with kafka createDirectStream ..
My assembly contains the 0.10.1 classes .. Here are the dependencies related to kafka & spark that my assembly has .. libraryDependencies ++= Seq( "org.apache.kafka" % "kafka-streams" % "0.10.0.0", "org.apache.spark" %% "spark-streaming-kafka-0-10" % spark, "org.apache.spark" %% "spark-core" % spark % "provided", "org.apache.spark" %% "spark-streaming"% spark % "provided", "org.apache.spark" %% "spark-mllib"% spark % "provided", "org.apache.spark" %% "spark-sql" % spark % "provided" ) regards. On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org> wrote: > When you say 0.10.1 do you mean broker version only, or does your > assembly contain classes from the 0.10.1 kafka consumer? > > On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com> > wrote: > > Hello - > > > > I am facing some issues with the following snippet of code that reads > from > > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) > with > > Kafka 0.10.1 and Spark 2.0.1. > > > > // get the data from kafka > > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = > > KafkaUtils.createDirectStream[Array[Byte], (String, String)]( > > streamingContext, > > PreferConsistent, > > Subscribe[Array[Byte], (String, String)](topicToReadFrom, > kafkaParams) > > ) > > > > // label and vectorize the value > > val projected: DStream[(String, Vector)] = stream.map { record => > > val (label, value) = record.value > > val vector = Vectors.dense(value.split(",").map(_.toDouble)) > > (label, vector) > > }.transform(projectToLowerDimension) > > > > In the above snippet if I have the call to transform in the last line, I > get > > the following exception .. > > > > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is > not > > safe for multi-threaded access > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > acquire(KafkaConsumer.java:1431) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.seek( > KafkaConsumer.java:1132) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > seek(CachedKafkaConsumer.scala:95) > > at > > org.apache.spark.streaming.kafka010.CachedKafkaConsumer. > get(CachedKafkaConsumer.scala:69) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:227) > > at > > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next( > KafkaRDD.scala:193) > > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > > at > > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > > at > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq( > ArrayBuffer.scala:104) > > at > > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > > at scala.collection.TraversableOnce$class.to( > TraversableOnce.scala:310) > > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > > at > > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce. > scala:302) > > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > > > > > > The transform method does a PCA and gives the top 2 principal components > .. > > > > private def projectToLowerDimension: RDD[(String, Vector)] => > RDD[(String, > > Vector)] = { rdd => > > if (rdd.isEmpty) rdd else { > > // reduce to 2 dimensions > > val pca = new PCA(2).fit(rdd.map(_._2)) > > > > // Project vectors to the linear space spanned by the top 2 principal > > // components, keeping the label > > rdd.map(p => (p._1, pca.transform(p._2))) > > } > > } > > > > However if I remove the transform call, I can process everything > correctly. > > > > Any help will be most welcome .. > > > > regards. > > - Debasish > > > > > > > > -- > > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
Re: problem with kafka createDirectStream ..
When you say 0.10.1 do you mean broker version only, or does your assembly contain classes from the 0.10.1 kafka consumer? On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com> wrote: > Hello - > > I am facing some issues with the following snippet of code that reads from > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with > Kafka 0.10.1 and Spark 2.0.1. > > // get the data from kafka > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = > KafkaUtils.createDirectStream[Array[Byte], (String, String)]( > streamingContext, > PreferConsistent, > Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams) > ) > > // label and vectorize the value > val projected: DStream[(String, Vector)] = stream.map { record => > val (label, value) = record.value > val vector = Vectors.dense(value.split(",").map(_.toDouble)) > (label, vector) > }.transform(projectToLowerDimension) > > In the above snippet if I have the call to transform in the last line, I get > the following exception .. > > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not > safe for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > > > The transform method does a PCA and gives the top 2 principal components .. > > private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String, > Vector)] = { rdd => > if (rdd.isEmpty) rdd else { > // reduce to 2 dimensions > val pca = new PCA(2).fit(rdd.map(_._2)) > > // Project vectors to the linear space spanned by the top 2 principal > // components, keeping the label > rdd.map(p => (p._1, pca.transform(p._2))) > } > } > > However if I remove the transform call, I can process everything correctly. > > Any help will be most welcome .. > > regards. > - Debasish > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
problem with kafka createDirectStream ..
Hello - I am facing some issues with the following snippet of code that reads from Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with Kafka 0.10.1 and Spark 2.0.1. // get the data from kafka val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = KafkaUtils.createDirectStream[Array[Byte], (String, String)]( streamingContext, PreferConsistent, Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams) ) // label and vectorize the value val projected: DStream[(String, Vector)] = stream.map { record => val (label, value) = record.value val vector = Vectors.dense(value.split(",").map(_.toDouble)) (label, vector) }.transform(projectToLowerDimension) In the above snippet if I have the call to transform in the last line, I get the following exception .. Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) The transform method does a PCA and gives the top 2 principal components .. private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String, Vector)] = { rdd => if (rdd.isEmpty) rdd else { // reduce to 2 dimensions val pca = new PCA(2).fit(rdd.map(_._2)) // Project vectors to the linear space spanned by the top 2 principal // components, keeping the label rdd.map(p => (p._1, pca.transform(p._2))) } } However if I remove the transform call, I can process everything correctly. Any help will be most welcome .. regards. - Debasish -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
problem with kafka createDirectStream ..
Hello - I am facing some issues with the following snippet of code that reads from Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with Kafka 0.10.1 and Spark 2.0.1. // get the data from kafka val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] = KafkaUtils.createDirectStream[Array[Byte], (String, String)]( streamingContext, PreferConsistent, Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams) ) // label and vectorize the value val projected: DStream[(String, Vector)] = stream.map { record => val (label, value) = record.value val vector = Vectors.dense(value.split(",").map(_.toDouble)) (label, vector) }.transform(projectToLowerDimension) In the above snippet if I have the call to transform in the last line, I get the following exception .. Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not > safe for multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at scala.collection.TraversableOnce$class.to > (TraversableOnce.scala:310) > at scala.collection.AbstractIterator.to(Iterator.scala:1336) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) > The transform method does a PCA and gives the top 2 principal components .. private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String, Vector)] = { rdd => if (rdd.isEmpty) rdd else { // reduce to 2 dimensions val pca = new PCA(2).fit(rdd.map(_._2)) // Project vectors to the linear space spanned by the top 2 principal // components, keeping the label rdd.map(p => (p._1, pca.transform(p._2))) } } However if I remove the transform call, I can process everything correctly. Any help will be most welcome .. regards. -- Debasish Ghosh