[
https://issues.apache.org/jira/browse/SPARK-28641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921253#comment-16921253
]
Gabor Somogyi edited comment on SPARK-28641 at 9/3/19 8:31 AM:
---------------------------------------------------------------
[~MariaCarrie] this is a know issue in Kafka, please see KAFKA-7703. This is
fixed in 2.4.1 and 3.0.0 in SPARK-26267. Please upgrade to higher versions.
was (Author: gsomogyi):
[~MariaCarrie] this is a know issue in Kafka, please see KAFKA-7703. This is
fixed on 2.4.1 and 3.0.0 in SPARK-26267. Please upgrade to higher versions.
> MicroBatchExecution committed offsets greater than available offsets
> --------------------------------------------------------------------
>
> Key: SPARK-28641
> URL: https://issues.apache.org/jira/browse/SPARK-28641
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 2.3.1
> Environment: HDP --> 3.0.0
> Spark --> 2.3.1
> Kafka --> 2.1.1
> Reporter: MariaCarrie
> Priority: Major
> Labels: MicroBatchExecution, dataAvailable
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> I use structure-streaming to consume Kafka data, Trigger Type is default and
> checkpoint is enabled, but looking at the log, I find the structure-streaming
> data before processing, the application log is as follows:
>
> {code}
> 19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start =
> Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
> end =
> \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}
> 19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()
> 19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's
> offset was changed from 13978260 to 9053058, some data may have been missed.^
> Some data may have been lost because they are not available in Kafka any
> more; either the
> data was aged out by Kafka or the topic may have been deleted before all the
> data in the
> topic was processed. If you want your streaming query to fail on such cases,
> set the source
> option "failOnDataLoss" to "true".
> {code}
>
> I see that when you get the {{latestOffsets}} they are compared with the
> {{committedOffsets}} to see if they are {{newData}}.
>
> {code}
> private def dataAvailable: Boolean = {
> availableOffsets.exists {
> case (source, available) =>
> committedOffsets.get(source).map(committed => committed !=
> available).getOrElse(true)
> }
> }
> {code}
>
> I think it is Kafka appeared what problem, cause the {{fetchLatestOffsets}}
> methods returned {{earliestOffsets}}. However, the data was successfully
> processed and committed. Whether or not it can be determined in the
> {{dataAvailable}} method, if {{availableOffsets}} has been committed, the
> batch will no longer be marked as newData.
> I don't know what I think is correct, if continue processing
> {{earliestOffsets}}, then the structured-streaming can't timely
> corresponding, I'm glad to receive any suggestion!
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]