[
https://issues.apache.org/jira/browse/SPARK-54585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dylan Wong updated SPARK-54585:
-------------------------------
Description:
Issue 1:
When cancel() is called while the thread is in an interrupted state (e.g.,
during task cancellation), the previous implementation would fail. The code
submitted Futures to cancel each stream, then called awaitResult() to wait for
completion. However, awaitResult() checks the thread's interrupt flag and
throws InterruptedException immediately if the thread is interrupted.
Issue 2:
Consider the case where {{abort()}} is called on
{{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the {{RocksDB}}
instance, which in turn calls {{changelogWriter.foreach(_.abort())}} and then
sets {{{}changelogWriter = None{}}}.
However, if {{changelogWriter.abort()}} throws an exception, the finally block
still sets {{backingFileStream}} and {{compressedStream}} to {{{}null{}}}. The
exception propagates, and we never reach the line that sets {{{}changelogWriter
= None{}}}.
This leaves the RocksDB instance in an inconsistent state:
* changelogWriter = Some(changelogWriterWeAttemptedToAbort)
* changelogWriterWeAttemptedToAbort.backingFileStream = null
* changelogWriterWeAttemptedToAbort.compressedStream = null
Now consider calling {{RocksDB.load()}} again. This calls
{{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls
{{{}changelogWriter.put(){}}}. At this point, the assertion
{{assert(compressedStream != null)}} fails, causing an exception while loading
the StateStore.
was:
Consider the case where {{abort()}} is called on
{{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the {{RocksDB}}
instance, which in turn calls {{changelogWriter.foreach(_.abort())}} and then
sets {{{}changelogWriter = None{}}}.
However, if {{changelogWriter.abort()}} throws an exception, the finally block
still sets {{backingFileStream}} and {{compressedStream}} to {{{}null{}}}. The
exception propagates, and we never reach the line that sets {{{}changelogWriter
= None{}}}.
This leaves the RocksDB instance in an inconsistent state:
* changelogWriter = Some(changelogWriterWeAttemptedToAbort)
* changelogWriterWeAttemptedToAbort.backingFileStream = null
* changelogWriterWeAttemptedToAbort.compressedStream = null
Now consider calling {{RocksDB.load()}} again. This calls
{{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls
{{{}changelogWriter.put(){}}}. At this point, the assertion
{{assert(compressedStream != null)}} fails, causing an exception while loading
the StateStore.
> Fix State Store rollback when thread is in interrupted state
> ------------------------------------------------------------
>
> Key: SPARK-54585
> URL: https://issues.apache.org/jira/browse/SPARK-54585
> Project: Spark
> Issue Type: Task
> Components: Structured Streaming
> Affects Versions: 4.1.0
> Reporter: Dylan Wong
> Priority: Major
> Labels: pull-request-available
>
> Issue 1:
> When cancel() is called while the thread is in an interrupted state (e.g.,
> during task cancellation), the previous implementation would fail. The code
> submitted Futures to cancel each stream, then called awaitResult() to wait
> for completion. However, awaitResult() checks the thread's interrupt flag and
> throws InterruptedException immediately if the thread is interrupted.
> Issue 2:
> Consider the case where {{abort()}} is called on
> {{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the
> {{RocksDB}} instance, which in turn calls
> {{changelogWriter.foreach(_.abort())}} and then sets {{{}changelogWriter =
> None{}}}.
> However, if {{changelogWriter.abort()}} throws an exception, the finally
> block still sets {{backingFileStream}} and {{compressedStream}} to
> {{{}null{}}}. The exception propagates, and we never reach the line that sets
> {{{}changelogWriter = None{}}}.
> This leaves the RocksDB instance in an inconsistent state:
> * changelogWriter = Some(changelogWriterWeAttemptedToAbort)
> * changelogWriterWeAttemptedToAbort.backingFileStream = null
> * changelogWriterWeAttemptedToAbort.compressedStream = null
> Now consider calling {{RocksDB.load()}} again. This calls
> {{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls
> {{{}changelogWriter.put(){}}}. At this point, the assertion
> {{assert(compressedStream != null)}} fails, causing an exception while
> loading the StateStore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]