[
https://issues.apache.org/jira/browse/SPARK-51331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17931201#comment-17931201
]
Alex commented on SPARK-51331:
------------------------------
[~kabhwan] I guess current behavior does not violate "at-least-once". But
what's wrong with committing offsets to source immediately after batch
completed? This way you can shift toward "exactly-once" for more scenarios.
We are consuming from SQS queue. There is no way to mark messages "processed"
except delete them from the queue, and the only way for us to know we can
delete is within "commit(end: Offset)" callback, which unfortunately delayed
for unnecessary long period.
> Structured streaming batch with fixed interval trigger is committed only when
> next batch is about to start
> ----------------------------------------------------------------------------------------------------------
>
> Key: SPARK-51331
> URL: https://issues.apache.org/jira/browse/SPARK-51331
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.3
> Environment: Spark 3.5.3 + Pyspark
> Reporter: Alex
> Priority: Major
>
>
> h1. Description
> When structured streaming is configured using
>
> {code:java}
> trigger(processingTime='10 minutes') {code}
> and micro-batch itself took 2 minutes, then its progress is not committed
> until next batch is about to start, i.e. it is delayed by 8 minutes.
>
> h1. Logs
> Below batch was finished at 1:02 but not committed until 1:10, i.e. when it
> was time for next batch
>
> {code:java}
> 25/02/27 01:00:00 INFO MicroBatchExecution: Committed offsets for batch 0.
> ....
> 25/02/27 01:02:12 INFO MicroBatchExecution: Streaming query made progress
> 25/02/27 01:10:00 INFO MicroBatchExecution: Committed offsets for batch 1
>
> {code}
> h1. Code references
> When next batch is being constructed in `constructNextBatch` function
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L600]
> it calls `cleanUpLastExecutedMicroBatch`
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L674]
> which commits offsets
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L996-L997]
> h1. Why it is problem?
> For long triggers like 10 minutes or even 1 hour, there is chance that job
> will be cancelled (e.g. for redeploy), so offsets will be not committed *even
> if data have been written to sink*
> h1.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]