[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16467498#comment-16467498
 ] 

Marcin Kuthan commented on SPARK-19680:
---------------------------------------

We have started migration our spark streaming job from 1.6 to 2.x for two main 
reasons:
 # escape from legacy spark version
 # replace custom zookeeper based offset store with built-in spark support for 
offset management (commitAsync).

But this issue is a real blocker to migrate our long running streaming service. 
The production job is heavily monitored to avoid situation if the processing is 
stopped and we are out of retention on Kafka. But this job still needs best 
effort, self healing solution for OffsetsOutOfRangeException. During an 
incident I want to easily restart my service with minimal data loss, as time 
goes by and Kafka retention drops precious data. Just let me decide how to 
handle OffsetsOutOfRangeException with full responsibility for data lost.

> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
>                 Key: SPARK-19680
>                 URL: https://issues.apache.org/jira/browse/SPARK-19680
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Schakmann Rene
>            Priority: Major
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>    "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>       Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
>     Map(
>       "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>       "group.id" -> properties.getProperty("kafka.consumer.group"),
>       "auto.offset.reset" -> "earliest",
>       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>       "enable.auto.commit" -> "false",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
>     getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>       val topList = new TopList
>       topList.setCreated(new Date())
>       topList.setTopListEntryList(rdd.take(TopListLength).toList)
>       CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>       kafkaSink.value.send(SendToTopicName, topList)
>       CurrentLogger.info("Last Run: " + System.currentTimeMillis())
>     })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
>     val Mapper = MapperObject.readerFor[SearchEventDTO]
>     result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>       .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>       .map(row => {
>         val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
>         (name, new TopListEntry(name, 1, row.getResultCount))
>       })
>       .reduceByKeyAndWindow(
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
>         Minutes(windowDuration),
>         Seconds(slideDuration))
>       .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>       .map(row => (row._2.getSearchCount, row._2))
>       .transform(rdd => rdd.sortByKey(ascending = false))
>       .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
>     val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
>     val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
>     val kafkaParams: Map[String, Object] = 
> SparkUtil.getDefaultKafkaReceiverParameter(properties)
>     val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
>     ssc.checkpoint("/home/spark/checkpoints")
>     val adEventStream =
>       KafkaUtils.createDirectStream[String, Array[Byte]](ssc, 
> PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), 
> kafkaParams))
>     processSearchKeyWords(adEventStream, 
> SparkUtil.getWindowDuration(properties), 
> SparkUtil.getSlideDuration(properties), kafkaSink)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> {code}
> As I saw in the code KafkaUtils
> {code:title=Job.Scala|borderStyle=solid}
>     logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to 
> none for executor")
>     kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> {code}
> This means as soon as one worker has a kafka partion that can no be processed 
> because the offset is not valid anymore due to retention policy the streaming 
> job will stop working 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to