[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16539906#comment-16539906
 ] 

Biplob Biswas commented on SPARK-19680:
---------------------------------------

This essentially poisons the Consumer Group, and the only way to fix reading 
messages again was to use a new consumer group which start from the latest 
offset (Also, as spark-executor * groupid's are recreated for the new consumer 
group). Is there a way to salvage the consumer group without removing them from 
Kafka? 

Also, the executor assigned consumer group is not available on kafka, where 
exactly is it stored? 

> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
>                 Key: SPARK-19680
>                 URL: https://issues.apache.org/jira/browse/SPARK-19680
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Schakmann Rene
>            Priority: Major
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>    "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>       Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
>     Map(
>       "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>       "group.id" -> properties.getProperty("kafka.consumer.group"),
>       "auto.offset.reset" -> "earliest",
>       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>       "enable.auto.commit" -> "false",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
>     getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>       val topList = new TopList
>       topList.setCreated(new Date())
>       topList.setTopListEntryList(rdd.take(TopListLength).toList)
>       CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>       kafkaSink.value.send(SendToTopicName, topList)
>       CurrentLogger.info("Last Run: " + System.currentTimeMillis())
>     })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
>     val Mapper = MapperObject.readerFor[SearchEventDTO]
>     result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>       .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>       .map(row => {
>         val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
>         (name, new TopListEntry(name, 1, row.getResultCount))
>       })
>       .reduceByKeyAndWindow(
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
>         Minutes(windowDuration),
>         Seconds(slideDuration))
>       .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>       .map(row => (row._2.getSearchCount, row._2))
>       .transform(rdd => rdd.sortByKey(ascending = false))
>       .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
>     val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
>     val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
>     val kafkaParams: Map[String, Object] = 
> SparkUtil.getDefaultKafkaReceiverParameter(properties)
>     val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
>     ssc.checkpoint("/home/spark/checkpoints")
>     val adEventStream =
>       KafkaUtils.createDirectStream[String, Array[Byte]](ssc, 
> PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), 
> kafkaParams))
>     processSearchKeyWords(adEventStream, 
> SparkUtil.getWindowDuration(properties), 
> SparkUtil.getSlideDuration(properties), kafkaSink)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> {code}
> As I saw in the code KafkaUtils
> {code:title=Job.Scala|borderStyle=solid}
>     logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to 
> none for executor")
>     kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> {code}
> This means as soon as one worker has a kafka partion that can no be processed 
> because the offset is not valid anymore due to retention policy the streaming 
> job will stop working 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to