[
https://issues.apache.org/jira/browse/SPARK-13941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hurshal Patel resolved SPARK-13941.
-----------------------------------
Resolution: Not A Bug
There was a versioning mismatch in my spark job which depended on kafka 0.9,
which is definitely not supported.
> kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
> -------------------------------------------------------------------
>
> Key: SPARK-13941
> URL: https://issues.apache.org/jira/browse/SPARK-13941
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.5.1
> Reporter: Hurshal Patel
>
> I am connecting to a Kafka cluster with the following (anonymized) code:
> {code}
> var stream = KafkaUtils.createDirectStream[String, Array[Byte],
> StringDecoder, DefaultDecoder](
> ssc, kafkaParams, topics)
> stream.foreachRDD { rdd =>
> val df = sqlContext.createDataFrame(rdd.map(bytesToString), stringSchema)
> df.foreachPartition { partition =>
> val targetNode = chooseTarget(TaskContext.partitionId)
> loadPartition(targetNode, partition)
> }
> }
> {code}
> I am using Kafka 0.8.2.0-1.kafka1.2.0.p0.2 (Cloudera CDH 5.3.1) and Spark
> 1.4.1 and this works fine.
> After upgrading to Spark 1.5.1, my tasks are failing (stacktrace is below).
> Are there any notable changes to the KafkaDirectStream or KafkaRDD that would
> cause this or does Cloudera's Kafka distribution have known issues with 1.5.1?
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in
> stage 12407.0 failed 4 times, most recent failure: Lost task 5.3 in stage
> 12407.0 (TID 55638, 172.18.203.25): org.apache.spark.SparkException: Couldn't
> connect to leader for topic XXX: java.lang.ClassCastException:
> kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:163)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)
> at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896)
> at
> org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply$mcV$sp(DataFrame.scala:1369)
> at
> org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply(DataFrame.scala:1369)
> at
> org.apache.spark.sql.DataFrame$$anonfun$foreachPartition$1.apply(DataFrame.scala:1369)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
> at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
> at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1368)
> at com.foobar.kafka.KafkaLoader.load(KafkaLoader.scala:35)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Couldn't connect to leader for
> topic XXX: java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot
> be cast to kafka.cluster.Broker
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$connectLeader$1.apply(KafkaRDD.scala:164)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.connectLeader(KafkaRDD.scala:163)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.<init>(KafkaRDD.scala:155)
> at org.apache.spark.streaming.kafka.KafkaRDD.compute(KafkaRDD.scala:135)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]