[ 
https://issues.apache.org/jira/browse/TOREE-351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664350#comment-15664350
 ] 

Phil Berkland commented on TOREE-351:
-------------------------------------

It appears the the problem occurs when the scala interpreter is called too 
early in the initialization process.
If the first call to the scala interpreter is delayed until after the end of 
the initialization process, this problem goes away.
I suspect that the actor system is somehow interfering with the context class 
loader.  As I said before, this problem does not occur when using the spark 
shell, so it is not caused by the scala interpreter itself, but by interference 
with something that Toree is adding to the mix (such as the actor system).

The following code changes will make the problem go away:

in kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala, add

  def postInit () = {}

in kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala, add call 
to "postInit" as last thing

     logger.info("Marking relay as ready for receiving messages")
     kernelMessageRelayActor ! true
+    interpreters foreach {_.postInit()}
 
in 
scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala,
 move indirect scala interpreter calls from "init" to "postInit"


  override def postInit(): Unit = {
         bindSparkSession()
         bindSparkContext()
  }

Another potential way to fix this could be to move the plugin initialization to 
the very end  of bootstrap, but that may cause other problems.


> Kafka Direct Streaming does not work in torre
> ---------------------------------------------
>
>                 Key: TOREE-351
>                 URL: https://issues.apache.org/jira/browse/TOREE-351
>             Project: TOREE
>          Issue Type: Bug
>    Affects Versions: 0.1.0
>         Environment: spark 2.0, toree built from master branch,
> in kernel.json added "SPARK_OPTS": "--jars 
> file:/path/to/spark-streaming-kafka-0-10-assembly_2.11-2.0.0.jar" since 
> "%addjar" did not seem to work.
>     
>            Reporter: Phil Berkland
>
> The following code with throw a CastCastException when running in a notebook:
> -------------
> import org.apache.kafka.clients.consumer.ConsumerRecord
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "group.id" -> "example",
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean)
> )
> val streamingContext = new StreamingContext(sc, Seconds(1))
> val topics = Array("topicA", "topicB")
> val stream = KafkaUtils.createDirectStream[String, String](
>   streamingContext,
>   PreferConsistent,
>   Subscribe[String, String](topics, kafkaParams)
> )
> var rdd=stream.map(record => (record.key, record.value))
> rdd.print
> streamingContext.start
> streamingContext.awaitTermination
> ---------------
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
>   at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>   at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>   at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>   at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
>   at 
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
>   at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
>   ... 44 elided
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.clients.consumer.RangeAssignor
>   at java.lang.Class.asSubclass(Class.java:3404)
>   at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332)
>   at 
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
>   at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
>   at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>   at 
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>   at 
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>   at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>   at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>   at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>   at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> --------------------
> This same code works ok when running inside spark-shell.
> Kafka is using Thread.currentThread().getContextClassLoader() to load a 
> class, but the class loader is (incorrectly) different than the original 
> loader.
> This appears to be the same root problem as TOREE-349.
> Note that all thread management is being done by Kafka, so we have no control 
> over threading or setting contextClassLoader in them. 
>   



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to