[ 
https://issues.apache.org/jira/browse/TOREE-351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15665553#comment-15665553
 ] 

Marius Van Niekerk commented on TOREE-351:
------------------------------------------

Thanks for investigating this Phil.  Can you make a PR with this change so that 
myself and the other Toree devs can run some tests on it?

> Kafka Direct Streaming does not work in torre
> ---------------------------------------------
>
>                 Key: TOREE-351
>                 URL: https://issues.apache.org/jira/browse/TOREE-351
>             Project: TOREE
>          Issue Type: Bug
>    Affects Versions: 0.1.0
>         Environment: spark 2.0, toree built from master branch,
> in kernel.json added "SPARK_OPTS": "--jars 
> file:/path/to/spark-streaming-kafka-0-10-assembly_2.11-2.0.0.jar" since 
> "%addjar" did not seem to work.
>     
>            Reporter: Phil Berkland
>
> The following code with throw a CastCastException when running in a notebook:
> -------------
> import org.apache.kafka.clients.consumer.ConsumerRecord
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "group.id" -> "example",
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean)
> )
> val streamingContext = new StreamingContext(sc, Seconds(1))
> val topics = Array("topicA", "topicB")
> val stream = KafkaUtils.createDirectStream[String, String](
>   streamingContext,
>   PreferConsistent,
>   Subscribe[String, String](topics, kafkaParams)
> )
> var rdd=stream.map(record => (record.key, record.value))
> rdd.print
> streamingContext.start
> streamingContext.awaitTermination
> ---------------
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
>   at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>   at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>   at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>   at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
>   at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
>   at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
>   ... 44 elided
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.clients.consumer.RangeAssignor
>   at java.lang.Class.asSubclass(Class.java:3404)
>   at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332)
>   at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
>   at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>   at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>   at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> --------------------
> This same code works ok when running inside spark-shell.
> Kafka is using Thread.currentThread().getContextClassLoader() to load a 
> class, but the class loader is (incorrectly) different than the original 
> loader.
> This appears to be the same root problem as TOREE-349.
> Note that all thread management is being done by Kafka, so we have no control 
> over threading or setting contextClassLoader in them. 
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to