[
https://issues.apache.org/jira/browse/SPARK-27086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sebastian Herold updated SPARK-27086:
-------------------------------------
Description:
I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data
source which offers the new {{commit}} method of the {{MicroBatchReader}} to
finally commit the message at SQS after it has been processed. If the
processing of messages would fail and they got not committed, after a timeout
the message would automatically reappear in SQS which is the intended behaviour
without using special state storing or checkpointing.
Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed
if a new batch is constructed ([see line 400 in
{{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
which is quite strange. Especially, in my SQS example it could happen that
after a first batch of messages this there is a long break before the new
messages are send to SQS. This would lead to a timeout and reappearance of the
SQS messages from the previous batch, because they got processed, but not
committed. Therefore, I would strongly recommend to commit an offset, once it
is processed! The committing should be independent from the next batch!
was:
I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data
source which uses the new {{commit}} method of the {{MicroBatchReader}} to
finally commit the message at SQS after it has been processed. If the
processing of messages would fail and they got not committed, after a timeout
the message would automatically reappear in SQS which is the intended behaviour
without using special state storing or checkpointing.
Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed
if a new batch is constructed ([see line 400 in
{{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
which is quite strange. Especially, in my SQS example it could happen that
after a first batch of messages this there is a long break before the new
messages are send to SQS. This would lead to a timeout and reappearance of the
SQS messages from the previous batch, because they got processed, but not
committed. Therefore, I would strongly recommend to commit an offset, once it
is processed! The committing should be independent from the next batch!
> DataSourceV2 MicroBatchExecution commits last batch only if new batch is
> constructed
> ------------------------------------------------------------------------------------
>
> Key: SPARK-27086
> URL: https://issues.apache.org/jira/browse/SPARK-27086
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0
> Reporter: Sebastian Herold
> Priority: Major
> Labels: MicroBatchExecution, MicroBatchReader, Spark, Streaming,
> Structured, commit
>
> I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming
> data source which offers the new {{commit}} method of the
> {{MicroBatchReader}} to finally commit the message at SQS after it has been
> processed. If the processing of messages would fail and they got not
> committed, after a timeout the message would automatically reappear in SQS
> which is the intended behaviour without using special state storing or
> checkpointing.
> Sadly, I noticed that an offset in the {{MicroBatchReader}} got only
> committed if a new batch is constructed ([see line 400 in
> {{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
> which is quite strange. Especially, in my SQS example it could happen that
> after a first batch of messages this there is a long break before the new
> messages are send to SQS. This would lead to a timeout and reappearance of
> the SQS messages from the previous batch, because they got processed, but not
> committed. Therefore, I would strongly recommend to commit an offset, once it
> is processed! The committing should be independent from the next batch!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]