Sasaki Toru created SPARK-20050:
-----------------------------------

             Summary: Kafka 0.10 DirectStream doesn't commit last processed 
batch's offset when graceful shutdown
                 Key: SPARK-20050
                 URL: https://issues.apache.org/jira/browse/SPARK-20050
             Project: Spark
          Issue Type: Improvement
          Components: DStreams
    Affects Versions: 2.2.0
            Reporter: Sasaki Toru


I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and 
call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below

{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)

kafkaStream.map { input =>
  "key: " + input.key.toString + " value: " + input.value.toString + " offset: 
" + input.offset.toString
  }.foreachRDD { rdd =>
    rdd.foreach { input =>
    println(input)
  }
}

kafkaStream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}

Some records which processed in the last batch before Streaming graceful 
shutdown reprocess in the first batch after Spark Streaming restart.

It may cause offsets specified in commitAsync will commit in the head of next 
batch.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to