I'm running into below error while trying to consume message from Kafka
through Spark streaming (Kafka direct API). This used to work OK when using
Spark standalone cluster manager. We're just switching to using Cloudera
5.7 using Yarn to manage Spark cluster and started to see the below error.

Few details:
- Spark 1.6.0
- Using Kafka direct stream API
- Kafka broker version (0.8.2.1)
- Kafka version in the classpath of Yarn executors (0.9)
- Kafka brokers not managed by Cloudera

The only difference I see between using standalone cluster manager and yarn
is the Kafka version being used on the consumer end. (0.8.2.1 vs 0.9)

*Trying to figure if version mismatch is really an issue ? If indeed the
case, what would be the fix for this other than upgrading Kafka brokers to
0.9 as well. (eventually yes but not for now) OR is there something else
I'm missing here.*

Appreciate the help.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 200.0 failed 4 times, most recent failure: Lost task 0.3 in stage
200.0 (TID 203,..): java.nio.BufferUnderflowException
    at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
    at java.nio.ByteBuffer.get(ByteBuffer.java:715)
    at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:40)
    at kafka.api.TopicData$.readFrom(FetchResponse.scala:96)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:170)
    at kafka.api.FetchResponse$$anonfun$4.apply(FetchResponse.scala:169)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.FetchResponse$.readFrom(FetchResponse.scala:169)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:135)
    at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
    at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at
org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
    at
org.apache.spark.rdd.RDD$$anonfun$toLocalIterator$1$$anonfun$org$apache$spark$rdd$RDD$$anonfun$$collectPartition$1$1.apply(RDD.scala:942)
    at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1869)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)



Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

Reply via email to