[
https://issues.apache.org/jira/browse/SPARK-19680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241577#comment-16241577
]
Serkan Taş edited comment on SPARK-19680 at 11/7/17 6:37 AM:
-------------------------------------------------------------
I frequently get the error of "numRecords must not be negative", but this time
I have the same issue also on a yarn cluster for a job running 4 days
terminated due to the same error :
diagnostics: User class threw exception: org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 18388.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 18388.0 (TID 21387, server, executor 1):
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of
range with no configured reset policy for partitions: {topic-0=33436703}
Exception in thread "main" org.apache.spark.SparkException: Application
application_fdsfdsfsdfsdf_0001 finished with failed status
Hadop : 2.8.0
Spark : 2.1.0
Kafka : 0.10.2.1
Configuration :
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "brokers",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "grp_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
was (Author: serkan_tas):
I frequently get the error of "numRecords must not be negative
", But this time I have the same issue also on a yarn cluster for a job running
4 days terminated due to the same error :
diagnostics: User class threw exception: org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 18388.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 18388.0 (TID 21387, server, executor 1):
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of
range with no configured reset policy for partitions: {topic-0=33436703}
Exception in thread "main" org.apache.spark.SparkException: Application
application_fdsfdsfsdfsdf_0001 finished with failed status
Hadop : 2.8.0
Spark : 2.1.0
Kafka : 0.10.2.1
Configuration :
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "brokers",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "grp_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
> Offsets out of range with no configured reset policy for partitions
> -------------------------------------------------------------------
>
> Key: SPARK-19680
> URL: https://issues.apache.org/jira/browse/SPARK-19680
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.1.0
> Reporter: Schakmann Rene
>
> I'm using spark streaming with kafka to acutally create a toplist. I want to
> read all the messages in kafka. So I set
> "auto.offset.reset" -> "earliest"
> Nevertheless when I start the job on our spark cluster it is not working I
> get:
> Error:
> {code:title=error.log|borderStyle=solid}
> Job aborted due to stage failure: Task 2 in stage 111.0 failed 4 times,
> most recent failure: Lost task 2.3 in stage 111.0 (TID 1270, 194.232.55.23,
> executor 2): org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> Offsets out of range with no configured reset policy for partitions:
> {SearchEvents-2=161803385}
> {code}
> This is somehow wrong because I did set the auto.offset.reset property
> Setup:
> Kafka Parameter:
> {code:title=Config.Scala|borderStyle=solid}
> def getDefaultKafkaReceiverParameter(properties: Properties):Map[String,
> Object] = {
> Map(
> "bootstrap.servers" ->
> properties.getProperty("kafka.bootstrap.servers"),
> "group.id" -> properties.getProperty("kafka.consumer.group"),
> "auto.offset.reset" -> "earliest",
> "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> "enable.auto.commit" -> "false",
> "key.deserializer" -> classOf[StringDeserializer],
> "value.deserializer" -> "at.willhaben.sid.DTOByteDeserializer")
> }
> {code}
> Job:
> {code:title=Job.Scala|borderStyle=solid}
> def processSearchKeyWords(stream: InputDStream[ConsumerRecord[String,
> Array[Byte]]], windowDuration: Int, slideDuration: Int, kafkaSink:
> Broadcast[KafkaSink[TopList]]): Unit = {
> getFilteredStream(stream.map(_.value()), windowDuration,
> slideDuration).foreachRDD(rdd => {
> val topList = new TopList
> topList.setCreated(new Date())
> topList.setTopListEntryList(rdd.take(TopListLength).toList)
> CurrentLogger.info("TopList length: " +
> topList.getTopListEntryList.size().toString)
> kafkaSink.value.send(SendToTopicName, topList)
> CurrentLogger.info("Last Run: " + System.currentTimeMillis())
> })
> }
> def getFilteredStream(result: DStream[Array[Byte]], windowDuration: Int,
> slideDuration: Int): DStream[TopListEntry] = {
> val Mapper = MapperObject.readerFor[SearchEventDTO]
> result.repartition(100).map(s => Mapper.readValue[SearchEventDTO](s))
> .filter(s => s != null && s.getSearchRequest != null &&
> s.getSearchRequest.getSearchParameters != null && s.getVertical ==
> Vertical.BAP &&
> s.getSearchRequest.getSearchParameters.containsKey(EspParameterEnum.KEYWORD.getName))
> .map(row => {
> val name =
> row.getSearchRequest.getSearchParameters.get(EspParameterEnum.KEYWORD.getName).getEspSearchParameterDTO.getValue.toLowerCase()
> (name, new TopListEntry(name, 1, row.getResultCount))
> })
> .reduceByKeyAndWindow(
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword,
> a.getSearchCount + b.getSearchCount, a.getMeanSearchHits +
> b.getMeanSearchHits),
> (a: TopListEntry, b: TopListEntry) => new TopListEntry(a.getKeyword,
> a.getSearchCount - b.getSearchCount, a.getMeanSearchHits -
> b.getMeanSearchHits),
> Minutes(windowDuration),
> Seconds(slideDuration))
> .filter((x: (String, TopListEntry)) => x._2.getSearchCount > 200L)
> .map(row => (row._2.getSearchCount, row._2))
> .transform(rdd => rdd.sortByKey(ascending = false))
> .map(row => new TopListEntry(row._2.getKeyword, row._2.getSearchCount,
> row._2.getMeanSearchHits / row._2.getSearchCount))
> }
> def main(properties: Properties): Unit = {
> val sparkSession = SparkUtil.getDefaultSparkSession(properties, TaskName)
> val kafkaSink =
> sparkSession.sparkContext.broadcast(KafkaSinkUtil.apply[TopList](SparkUtil.getDefaultSparkProperties(properties)))
> val kafkaParams: Map[String, Object] =
> SparkUtil.getDefaultKafkaReceiverParameter(properties)
> val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(30))
> ssc.checkpoint("/home/spark/checkpoints")
> val adEventStream =
> KafkaUtils.createDirectStream[String, Array[Byte]](ssc,
> PreferConsistent, Subscribe[String, Array[Byte]](Array(ReadFromTopicName),
> kafkaParams))
> processSearchKeyWords(adEventStream,
> SparkUtil.getWindowDuration(properties),
> SparkUtil.getSlideDuration(properties), kafkaSink)
> ssc.start()
> ssc.awaitTermination()
> }
> {code}
> As I saw in the code KafkaUtils
> {code:title=Job.Scala|borderStyle=solid}
> logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to
> none for executor")
> kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
> {code}
> This means as soon as one worker has a kafka partion that can no be processed
> because the offset is not valid anymore due to retention policy the streaming
> job will stop working
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]