[ 
https://issues.apache.org/jira/browse/SPARK-19547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143760#comment-16143760
 ] 

Tao Tian commented on SPARK-19547:
----------------------------------

I have encounter this problem too. using Kafka 0.10.0 and spark-streaming_2.11. 
The error log is "Caused by: 
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages 
at [Partition=Offset]: {db4.CLAC.result-0=61638808} whose size is larger than 
the fetch size 1048576 and hence cannot be ever returned. Increase the fetch 
size on the client (using max.partition.fetch.bytes), or decrease the maximum 
message size the broker will allow (using message.max.bytes).
"  
I set  "max.partition.fetch.bytes = 10485720" for Kafka has fixed the problem.

> KafkaUtil throw 'No current assignment for partition' Exception
> ---------------------------------------------------------------
>
>                 Key: SPARK-19547
>                 URL: https://issues.apache.org/jira/browse/SPARK-19547
>             Project: Spark
>          Issue Type: Question
>          Components: DStreams
>    Affects Versions: 1.6.1
>            Reporter: wuchang
>
> Below is my scala code to create spark kafka stream:
> val kafkaParams = Map[String, Object](
>       "bootstrap.servers" -> "server110:2181,server110:9092",
>       "zookeeper" -> "server110:2181",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> "example",
>       "auto.offset.reset" -> "latest",
>       "enable.auto.commit" -> (false: java.lang.Boolean)
>     )
>     val topics = Array("ABTest")
>     val stream = KafkaUtils.createDirectStream[String, String](
>       ssc,
>       PreferConsistent,
>       Subscribe[String, String](topics, kafkaParams)
>     )
> But after run for 10 hours, it throws exceptions:
> 2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.ConsumerCoordinator: 
> Revoking previously assigned partitions [ABTest-0, ABTest-1] for group example
> 2017-02-10 10:56:20,000 INFO  [JobGenerator] internals.AbstractCoordinator: 
> (Re-)joining group example
> 2017-02-10 10:56:20,011 INFO  [JobGenerator] internals.AbstractCoordinator: 
> (Re-)joining group example
> 2017-02-10 10:56:40,057 INFO  [JobGenerator] internals.AbstractCoordinator: 
> Successfully joined group example with generation 5
> 2017-02-10 10:56:40,058 INFO  [JobGenerator] internals.ConsumerCoordinator: 
> Setting newly assigned partitions [ABTest-1] for group example
> 2017-02-10 10:56:40,080 ERROR [JobScheduler] scheduler.JobScheduler: Error 
> generating jobs for time 1486695380000 ms
> java.lang.IllegalStateException: No current assignment for partition ABTest-0
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:231)
>         at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:295)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1169)
>         at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:179)
>         at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>         at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>         at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>         at scala.Option.orElse(Option.scala:289)
>         at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>         at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>         at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>         at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
>         at scala.util.Try$.apply(Try.scala:192)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Obviously , The partition ABTestMsg-0 has already be revoked for this 
> consumer, but it seems that the spark streaming consumer are not aware of 
> that  and continue to consume data of this revoked topic-partition , so the 
> exception occurs and the total spark job aborted.
> I think the kafka rebalance event is very normal , how can I modify my code 
> to make Spark streaming deal with the  partition-revoke event correctly?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to