[ 
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878623#comment-15878623
 ] 

Cody Koeninger edited comment on SPARK-19680 at 2/22/17 4:25 PM:
-----------------------------------------------------------------

The issue here is likely that you have lost data (because of retention 
expiration) between the time the batch was defined on the driver, and the time 
the executor attempted to process the batch.  Having executor consumers obey 
auto offset reset would result in silent data loss, which is a bad thing.

There's a more detailed description of the semantic issues around this for 
kafka in KAFKA-3370 and for structured streaming kafka in SPARK-17937

If you've got really aggressive retention settings and are having trouble 
getting a stream started, look at specifying earliest + some margin on startup 
as a workaround.  If you're having this trouble after a stream has been running 
for a while, you need more retention or smaller batches.




was (Author: [email protected]):
The issue here is likely that you have lost data (because of retention 
expiration) between the time the batch was defined on the driver, and the time 
the executor attempted to process the batch.  Having executor consumers obey 
auto offset reset would result in silent data loss, which is a bad thing.

There's a more detailed description of the semantic issues around this for 
kafka in KAFKA-3370 and for structured streaming kafka in SPARK-17937



> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
>                 Key: SPARK-19680
>                 URL: https://issues.apache.org/jira/browse/SPARK-19680
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 2.1.0
>            Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to 
> read all the messages in kafka. So I set
>    "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I 
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
>       Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times, 
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23, 
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
>   def getDefaultKafkaReceiverParameter(properties: Properties):Map[String, 
> Object] = {
>     Map(
>       "bootstrap.servers" -> 
> properties.getProperty("kafka.bootstrap.servers"),
>       "group.id" -> properties.getProperty("kafka.consumer.group"),
>       "auto.offset.reset" -> "earliest",
>       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>       "enable.auto.commit" -> "false",
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
>   }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
>   def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String, 
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink: 
> Broadcast[KafkaSink[TopList]]): Unit = {
>     getFilteredStream(stream.map(_.value()), windowDuration, 
> slideDuration).foreachRDD(rdd => {
>       val topList = new TopList
>       topList.setCreated(new Date())
>       topList.setTopListEntryList(rdd.take(TopListLength).toList)
>       CurrentLogger.info("TopList length: " + 
> topList.getTopListEntryList.size().toString)
>       kafkaSink.value.send(SendToTopicName, topList)
>       CurrentLogger.info("Last Run: " + System.currentTimeMillis())
>     })
>   }
>   def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int, 
> slideDuration: Int): DStream[TopListEntry] = {
>     val Mapper = MapperObject.readerFor[SearchEventDTO]
>     result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
>       .filter(s => s != null && s.getSearchRequest != null && 
> s.getSearchRequest.getSearchParameters != null && s.getVertical == 
> Vertical.BAP && 
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
>       .map(row => {
>         val name = 
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
>         (name, new TopListEntry(name, 1, row.getResultCount))
>       })
>       .reduceByKeyAndWindow(
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits + 
> b.getMeanSearchHits),
>         (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword, 
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits - 
> b.getMeanSearchHits),
>         Minutes(windowDuration),
>         Seconds(slideDuration))
>       .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
>       .map(row => (row._2.getSearchCount, row._2))
>       .transform(rdd => rdd.sortByKey(ascending = false))
>       .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount, 
> row._2.getMeanSearchHits / row._2.getSearchCount))
>   }
>   def main(properties: Properties): Unit = {
>     val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
>     val kafkaSink = 
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
>     val kafkaParams: Map[String, Object] = 
> SparkUtil.getDefaultKafkaReceiverParameter(properties)
>     val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
>     ssc.checkpoint("/home/spark/checkpoints")
>     val adEventStream =
>       KafkaUtils.createDirectStream[String, Array[Byte]](ssc, 
> PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName), 
> kafkaParams))
>     processSearchKeyWords(adEventStream, 
> SparkUtil.getWindowDuration(properties), 
> SparkUtil.getSlideDuration(properties), kafkaSink)
>     ssc.start()
>     ssc.awaitTermination()
>   }
> {code}
> As I saw in the code KafkaUtils
> {code:title=Job.Scala|borderStyle=solid}
>     logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to 
> none for executor")
>     kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> {code}
> This means as soon as one worker has a kafka partion that can no be processed 
> because the offset is not valid anymore due to retention policy the streaming 
> job will stop working 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to