[ 
https://issues.apache.org/jira/browse/SPARK-28641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16904968#comment-16904968
 ] 

Hyukjin Kwon commented on SPARK-28641:
--------------------------------------

Out of curiosity, is it reproducible with dummy source, for instance with 
{{rate}} source?

> MicroBatchExecution committed offsets greater than available offsets
> --------------------------------------------------------------------
>
>                 Key: SPARK-28641
>                 URL: https://issues.apache.org/jira/browse/SPARK-28641
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>         Environment: HDP --> 3.0.0
> Spark --> 2.3.1
> Kafka --> 2.1.1
>            Reporter: MariaCarrie
>            Priority: Major
>              Labels: MicroBatchExecution, dataAvailable
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> I use structure-streaming to consume kafka data, Trigger Type is default and 
> checkpoint is enabled, but looking at the log, I find the structure-streaming 
> data before processing, the application log is as follows:
>  
> ^19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = 
> Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
>  end = 
> \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}^
> ^19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()^
> ^19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's 
> offset was changed from 13978260 to 9053058, some data may have been missed.^ 
> ^Some data may have been lost because they are not available in Kafka any 
> more; either the^
>  ^data was aged out by Kafka or the topic may have been deleted before all 
> the data in the^
>  ^topic was processed. If you want your streaming query to fail on such 
> cases, set the source^
>  ^option "failOnDataLoss" to "true".^
>  
> I see that when you get the latestOffsets they are compared with the 
> committedOffsets to see if they are newData.
>  
> ^private def dataAvailable: Boolean = {^
>  ^availableOffsets.exists {^
>  ^case (source, available) =>^
>  ^committedOffsets^
>  ^.get(source)^
>  ^.map(committed => committed != available)^
>  ^.getOrElse(true)^
>  ^}^
>  ^}^
>  
> I think it is kafka appeared what problem, cause the fetchLatestOffsets 
> methods returned earliestOffsets. However, the data was successfully 
> processed and committed. Whether or not it can be determined in the 
> dataAvailable method, if availableOffsets has been commited, the batch will 
> no longer be marked as newData.
>  
> I don't know what I think is correct, if continue processing earliestOffsets, 
> then the structured-streaming can't timely corresponding, I'm glad to receive 
> any suggestion!
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to