[ 
https://issues.apache.org/jira/browse/SPARK-27086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Herold updated SPARK-27086:
-------------------------------------
    Description: 
I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data 
source which offers the new {{commit}} method of the {{MicroBatchReader}} to 
finally commit the message at SQS after it has been processed. If the 
processing of messages would fail and they got not committed, after a timeout 
the message would automatically reappear in SQS which is the intended behaviour 
without using special state storing or checkpointing.
Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed 
if a new batch is constructed ([see line 400 in 
{{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
 which is quite strange. Especially, in my SQS example it could happen that 
after a first batch of messages this there is a long break before the new 
messages are send to SQS. This would lead to a timeout and reappearance of the 
SQS messages from the previous batch, because they got processed, but not 
committed. Therefore, I would strongly recommend to commit an offset, once it 
is processed! The committing should be independent from the next batch!

  was:
I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data 
source which uses the new {{commit}} method of the {{MicroBatchReader}} to 
finally commit the message at SQS after it has been processed. If the 
processing of messages would fail and they got not committed, after a timeout 
the message would automatically reappear in SQS which is the intended behaviour 
without using special state storing or checkpointing.
Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed 
if a new batch is constructed ([see line 400 in 
{{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
 which is quite strange. Especially, in my SQS example it could happen that 
after a first batch of messages this there is a long break before the new 
messages are send to SQS. This would lead to a timeout and reappearance of the 
SQS messages from the previous batch, because they got processed, but not 
committed. Therefore, I would strongly recommend to commit an offset, once it 
is processed! The committing should be independent from the next batch!


> DataSourceV2 MicroBatchExecution commits last batch only if new batch is 
> constructed
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-27086
>                 URL: https://issues.apache.org/jira/browse/SPARK-27086
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Sebastian Herold
>            Priority: Major
>              Labels: MicroBatchExecution, MicroBatchReader, Spark, Streaming, 
> Structured, commit
>
> I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming 
> data source which offers the new {{commit}} method of the 
> {{MicroBatchReader}} to finally commit the message at SQS after it has been 
> processed. If the processing of messages would fail and they got not 
> committed, after a timeout the message would automatically reappear in SQS 
> which is the intended behaviour without using special state storing or 
> checkpointing.
> Sadly, I noticed that an offset in the {{MicroBatchReader}} got only 
> committed if a new batch is constructed ([see line 400 in 
> {{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
>  which is quite strange. Especially, in my SQS example it could happen that 
> after a first batch of messages this there is a long break before the new 
> messages are send to SQS. This would lead to a timeout and reappearance of 
> the SQS messages from the previous batch, because they got processed, but not 
> committed. Therefore, I would strongly recommend to commit an offset, once it 
> is processed! The committing should be independent from the next batch!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to