[
https://issues.apache.org/jira/browse/TOREE-351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664350#comment-15664350
]
Phil Berkland commented on TOREE-351:
-------------------------------------
It appears the the problem occurs when the scala interpreter is called too
early in the initialization process.
If the first call to the scala interpreter is delayed until after the end of
the initialization process, this problem goes away.
I suspect that the actor system is somehow interfering with the context class
loader. As I said before, this problem does not occur when using the spark
shell, so it is not caused by the scala interpreter itself, but by interference
with something that Toree is adding to the mix (such as the actor system).
The following code changes will make the problem go away:
in kernel-api/src/main/scala/org/apache/toree/interpreter/Interpreter.scala, add
def postInit () = {}
in kernel/src/main/scala/org/apache/toree/boot/KernelBootstrap.scala, add call
to "postInit" as last thing
logger.info("Marking relay as ready for receiving messages")
kernelMessageRelayActor ! true
+ interpreters foreach {_.postInit()}
in
scala-interpreter/src/main/scala/org/apache/toree/kernel/interpreter/scala/ScalaInterpreter.scala,
move indirect scala interpreter calls from "init" to "postInit"
override def postInit(): Unit = {
bindSparkSession()
bindSparkContext()
}
Another potential way to fix this could be to move the plugin initialization to
the very end of bootstrap, but that may cause other problems.
> Kafka Direct Streaming does not work in torre
> ---------------------------------------------
>
> Key: TOREE-351
> URL: https://issues.apache.org/jira/browse/TOREE-351
> Project: TOREE
> Issue Type: Bug
> Affects Versions: 0.1.0
> Environment: spark 2.0, toree built from master branch,
> in kernel.json added "SPARK_OPTS": "--jars
> file:/path/to/spark-streaming-kafka-0-10-assembly_2.11-2.0.0.jar" since
> "%addjar" did not seem to work.
>
> Reporter: Phil Berkland
>
> The following code with throw a CastCastException when running in a notebook:
> -------------
> import org.apache.kafka.clients.consumer.ConsumerRecord
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
> val kafkaParams = Map[String, Object](
> "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
> "key.deserializer" -> classOf[StringDeserializer],
> "value.deserializer" -> classOf[StringDeserializer],
> "group.id" -> "example",
> "auto.offset.reset" -> "latest",
> "enable.auto.commit" -> (false: java.lang.Boolean)
> )
> val streamingContext = new StreamingContext(sc, Seconds(1))
> val topics = Array("topicA", "topicB")
> val stream = KafkaUtils.createDirectStream[String, String](
> streamingContext,
> PreferConsistent,
> Subscribe[String, String](topics, kafkaParams)
> )
> var rdd=stream.map(record => (record.key, record.value))
> rdd.print
> streamingContext.start
> streamingContext.awaitTermination
> ---------------
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
> at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
> at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
> at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
> ... 44 elided
> Caused by: java.lang.ClassCastException: class
> org.apache.kafka.clients.consumer.RangeAssignor
> at java.lang.Class.asSubclass(Class.java:3404)
> at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:332)
> at
> org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:225)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:637)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
> at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:74)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:225)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
> at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
> at
> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
> at
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
> at
> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
> at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
> at
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
> at
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
> at
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
> at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> --------------------
> This same code works ok when running inside spark-shell.
> Kafka is using Thread.currentThread().getContextClassLoader() to load a
> class, but the class loader is (incorrectly) different than the original
> loader.
> This appears to be the same root problem as TOREE-349.
> Note that all thread management is being done by Kafka, so we have no control
> over threading or setting contextClassLoader in them.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)