[ https://issues.apache.org/jira/browse/SPARK-20050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938988#comment-15938988 ]
Sasaki Toru edited comment on SPARK-20050 at 3/23/17 6:45 PM: -------------------------------------------------------------- Thank you for your comment, but I can't understand your advice, sorry. I want to say some offset set in {{commitAsync}} will not commit to Kafka. I think callback function will be invoked when committed to Kafka completely (success or failed), so I think this function will not be invoked in this case. If I am wrong, please correct, thanks. was (Author: sasakitoa): Thank you for your comment, but I can't understand your advice, sorry. I want to say some offset set in {{commitAsync}} will not commit to Kafka. I think callback function will invoke when committed to Kafka completely (success or failed), so I think this function will not be invoked in this case. If I am wrong, please correct, thanks. > Kafka 0.10 DirectStream doesn't commit last processed batch's offset when > graceful shutdown > ------------------------------------------------------------------------------------------- > > Key: SPARK-20050 > URL: https://issues.apache.org/jira/browse/SPARK-20050 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.2.0 > Reporter: Sasaki Toru > > I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and > call 'DirectKafkaInputDStream#commitAsync' finally in each batches, such > below > {code} > val kafkaStream = KafkaUtils.createDirectStream[String, String](...) > kafkaStream.map { input => > "key: " + input.key.toString + " value: " + input.value.toString + " > offset: " + input.offset.toString > }.foreachRDD { rdd => > rdd.foreach { input => > println(input) > } > } > kafkaStream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > {\code} > Some records which processed in the last batch before Streaming graceful > shutdown reprocess in the first batch after Spark Streaming restart. > It may cause offsets specified in commitAsync will commit in the head of next > batch. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org