Sebastian Herold created SPARK-27086:
----------------------------------------

             Summary: DataSourceV2 MicroBatchExecution commits last batch only 
if new batch is constructed
                 Key: SPARK-27086
                 URL: https://issues.apache.org/jira/browse/SPARK-27086
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.0
            Reporter: Sebastian Herold


I wanted to use the new {{DataSourceV2}} API to build a AWS SQS streaming data 
source which uses the new {{commit}} method of the {{MicroBatchReader}} to 
finally commit the message at SQS after it has been processed. If the 
processing of messages would fail and they got not committed, after a timeout 
the message would automatically reappear in SQS which is the intended behaviour 
without using special state storing or checkpointing.
Sadly, I noticed that an offset in the {{MicroBatchReader}} got only committed 
if a new batch is constructed ([see line 400 in 
{{MicroBatchExecution}}|https://github.com/apache/spark/blob/f7ad4ff040d39c7a55a9e01a990534e55c8178a5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L400])
 which is quite strange. Especially, in my SQS example it could happen that 
after a first batch of messages this there is a long break before the new 
messages are send to SQS. This would lead to a timeout and reappearance of the 
SQS messages from the previous batch, because they got processed, but not 
committed. Therefore, I would strongly recommend to commit an offset, once it 
is processed! The committing should be independent from the next batch!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to