[
https://issues.apache.org/jira/browse/SPARK-28641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-28641:
---------------------------------
Description:
I use structure-streaming to consume kafka data, Trigger Type is default and
checkpoint is enabled, but looking at the log, I find the structure-streaming
data before processing, the application log is as follows:
{code}
19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start =
Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
end =
\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}
19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()
19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's
offset was changed from 13978260 to 9053058, some data may have been missed.^
Some data may have been lost because they are not available in Kafka any more;
either the
data was aged out by Kafka or the topic may have been deleted before all the
data in the
topic was processed. If you want your streaming query to fail on such cases,
set the source
option "failOnDataLoss" to "true".
{code}
I see that when you get the latestOffsets they are compared with the
committedOffsets to see if they are newData.
{code}
private def dataAvailable: Boolean = {
availableOffsets.exists {
case (source, available) =>
committedOffsets.get(source).map(committed => committed !=
available).getOrElse(true)
}
}
{code}
I think it is kafka appeared what problem, cause the fetchLatestOffsets methods
returned earliestOffsets. However, the data was successfully processed and
committed. Whether or not it can be determined in the dataAvailable method, if
availableOffsets has been commited, the batch will no longer be marked as
newData.
I don't know what I think is correct, if continue processing earliestOffsets,
then the structured-streaming can't timely corresponding, I'm glad to receive
any suggestion!
was:
I use structure-streaming to consume kafka data, Trigger Type is default and
checkpoint is enabled, but looking at the log, I find the structure-streaming
data before processing, the application log is as follows:
^19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start =
Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
end =
\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}^
^19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()^
^19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's
offset was changed from 13978260 to 9053058, some data may have been missed.^
^Some data may have been lost because they are not available in Kafka any more;
either the^
^data was aged out by Kafka or the topic may have been deleted before all the
data in the^
^topic was processed. If you want your streaming query to fail on such cases,
set the source^
^option "failOnDataLoss" to "true".^
I see that when you get the latestOffsets they are compared with the
committedOffsets to see if they are newData.
^private def dataAvailable: Boolean = {^
^availableOffsets.exists {^
^case (source, available) =>^
^committedOffsets^
^.get(source)^
^.map(committed => committed != available)^
^.getOrElse(true)^
^}^
^}^
I think it is kafka appeared what problem, cause the fetchLatestOffsets methods
returned earliestOffsets. However, the data was successfully processed and
committed. Whether or not it can be determined in the dataAvailable method, if
availableOffsets has been commited, the batch will no longer be marked as
newData.
I don't know what I think is correct, if continue processing earliestOffsets,
then the structured-streaming can't timely corresponding, I'm glad to receive
any suggestion!
> MicroBatchExecution committed offsets greater than available offsets
> --------------------------------------------------------------------
>
> Key: SPARK-28641
> URL: https://issues.apache.org/jira/browse/SPARK-28641
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 2.3.1
> Environment: HDP --> 3.0.0
> Spark --> 2.3.1
> Kafka --> 2.1.1
> Reporter: MariaCarrie
> Priority: Major
> Labels: MicroBatchExecution, dataAvailable
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> I use structure-streaming to consume kafka data, Trigger Type is default and
> checkpoint is enabled, but looking at the log, I find the structure-streaming
> data before processing, the application log is as follows:
>
> {code}
> 19/07/31 15:25:50 INFO KafkaSource: GetBatch called with start =
> Some(\{"dop_dvi_formatted-send_pus":{"2":13978245,"4":13978260,"1":13978249,"3":13978233,"0":13978242}}),
> end =
> \{"dop_dvi_formatted-send_pus":{"2":13978245,"4":9053058,"1":13978249,"3":13978233,"0":13978242}}
> 19/07/31 15:25:50 INFO KafkaSource: Partitions added: Map()
> 19/07/31 15:25:50 WARN KafkaSource: Partition dop_dvi_formatted-send_pus-4's
> offset was changed from 13978260 to 9053058, some data may have been missed.^
> Some data may have been lost because they are not available in Kafka any
> more; either the
> data was aged out by Kafka or the topic may have been deleted before all the
> data in the
> topic was processed. If you want your streaming query to fail on such cases,
> set the source
> option "failOnDataLoss" to "true".
> {code}
>
> I see that when you get the latestOffsets they are compared with the
> committedOffsets to see if they are newData.
>
> {code}
> private def dataAvailable: Boolean = {
> availableOffsets.exists {
> case (source, available) =>
> committedOffsets.get(source).map(committed => committed !=
> available).getOrElse(true)
> }
> }
> {code}
>
> I think it is kafka appeared what problem, cause the fetchLatestOffsets
> methods returned earliestOffsets. However, the data was successfully
> processed and committed. Whether or not it can be determined in the
> dataAvailable method, if availableOffsets has been commited, the batch will
> no longer be marked as newData.
>
> I don't know what I think is correct, if continue processing earliestOffsets,
> then the structured-streaming can't timely corresponding, I'm glad to receive
> any suggestion!
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]