[ 
https://issues.apache.org/jira/browse/SPARK-28641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-28641:
---------------------------------
    Description: 
I use structure-streaming to consume kafka data, Trigger Type is default and 
checkpoint is enabled, but looking at the log, I find the structure-streaming 
data before processing, the application log is as follows:

 
{code}
19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = 
Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
 end = 
\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}
19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()
19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's 
offset was changed from 13978260 to 9053058, some data may have been missed.^
Some data may have been lost because they are not available in Kafka any more; 
either the
 data was aged out by Kafka or the topic may have been deleted before all the 
data in the
 topic was processed. If you want your streaming query to fail on such cases, 
set the source
 option "failOnDataLoss" to "true".
{code}
 

I see that when you get the latestOffsets they are compared with the 
committedOffsets to see if they are newData.

 
{code}
private def dataAvailable: Boolean = {
  availableOffsets.exists {
    case (source, available) =>
      committedOffsets.get(source).map(committed => committed != 
available).getOrElse(true)
   }
 }
{code}
 

I think it is kafka appeared what problem, cause the fetchLatestOffsets methods 
returned earliestOffsets. However, the data was successfully processed and 
committed. Whether or not it can be determined in the dataAvailable method, if 
availableOffsets has been commited, the batch will no longer be marked as 
newData.

 

I don't know what I think is correct, if continue processing earliestOffsets, 
then the structured-streaming can't timely corresponding, I'm glad to receive 
any suggestion!

 

  was:
I use structure-streaming to consume kafka data, Trigger Type is default and 
checkpoint is enabled, but looking at the log, I find the structure-streaming 
data before processing, the application log is as follows:

 

^19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = 
Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
 end = 
\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}^
^19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()^
^19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's 
offset was changed from 13978260 to 9053058, some data may have been missed.^ 
^Some data may have been lost because they are not available in Kafka any more; 
either the^
 ^data was aged out by Kafka or the topic may have been deleted before all the 
data in the^
 ^topic was processed. If you want your streaming query to fail on such cases, 
set the source^
 ^option "failOnDataLoss" to "true".^

 

I see that when you get the latestOffsets they are compared with the 
committedOffsets to see if they are newData.

 

^private def dataAvailable: Boolean = {^
 ^availableOffsets.exists {^
 ^case (source, available) =>^
 ^committedOffsets^
 ^.get(source)^
 ^.map(committed => committed != available)^
 ^.getOrElse(true)^
 ^}^
 ^}^

 

I think it is kafka appeared what problem, cause the fetchLatestOffsets methods 
returned earliestOffsets. However, the data was successfully processed and 
committed. Whether or not it can be determined in the dataAvailable method, if 
availableOffsets has been commited, the batch will no longer be marked as 
newData.

 

I don't know what I think is correct, if continue processing earliestOffsets, 
then the structured-streaming can't timely corresponding, I'm glad to receive 
any suggestion!

 


> MicroBatchExecution committed offsets greater than available offsets
> --------------------------------------------------------------------
>
>                 Key: SPARK-28641
>                 URL: https://issues.apache.org/jira/browse/SPARK-28641
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.1
>         Environment: HDP --> 3.0.0
> Spark --> 2.3.1
> Kafka --> 2.1.1
>            Reporter: MariaCarrie
>            Priority: Major
>              Labels: MicroBatchExecution, dataAvailable
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> I use structure-streaming to consume kafka data, Trigger Type is default and 
> checkpoint is enabled, but looking at the log, I find the structure-streaming 
> data before processing, the application log is as follows:
>  
> {code}
> 19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start = 
> Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
>  end = 
> \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}
> 19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()
> 19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's 
> offset was changed from 13978260 to 9053058, some data may have been missed.^
> Some data may have been lost because they are not available in Kafka any 
> more; either the
>  data was aged out by Kafka or the topic may have been deleted before all the 
> data in the
>  topic was processed. If you want your streaming query to fail on such cases, 
> set the source
>  option "failOnDataLoss" to "true".
> {code}
>  
> I see that when you get the latestOffsets they are compared with the 
> committedOffsets to see if they are newData.
>  
> {code}
> private def dataAvailable: Boolean = {
>   availableOffsets.exists {
>     case (source, available) =>
>       committedOffsets.get(source).map(committed => committed != 
> available).getOrElse(true)
>    }
>  }
> {code}
>  
> I think it is kafka appeared what problem, cause the fetchLatestOffsets 
> methods returned earliestOffsets. However, the data was successfully 
> processed and committed. Whether or not it can be determined in the 
> dataAvailable method, if availableOffsets has been commited, the batch will 
> no longer be marked as newData.
>  
> I don't know what I think is correct, if continue processing earliestOffsets, 
> then the structured-streaming can't timely corresponding, I'm glad to receive 
> any suggestion!
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to