Sasaki Toru created SPARK-20050:
-----------------------------------
Summary: Kafka 0.10 DirectStream doesn't commit last processed
batch's offset when graceful shutdown
Key: SPARK-20050
URL: https://issues.apache.org/jira/browse/SPARK-20050
Project: Spark
Issue Type: Improvement
Components: DStreams
Affects Versions: 2.2.0
Reporter: Sasaki Toru
I use Kafka 0.10 DirectStream with properties 'enable.auto.commit=false' and
call 'DirectKafkaInputDStream#commitAsync' finally in each batches such below
{code}
val kafkaStream = KafkaUtils.createDirectStream[String, String](...)
kafkaStream.map { input =>
"key: " + input.key.toString + " value: " + input.value.toString + " offset:
" + input.offset.toString
}.foreachRDD { rdd =>
rdd.foreach { input =>
println(input)
}
}
kafkaStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
{\code}
Some records which processed in the last batch before Streaming graceful
shutdown reprocess in the first batch after Spark Streaming restart.
It may cause offsets specified in commitAsync will commit in the head of next
batch.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]