[
https://issues.apache.org/jira/browse/SPARK-54585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-54585:
-----------------------------------
Labels: pull-request-available (was: )
> Always set changelogWriter to None on rollback
> ----------------------------------------------
>
> Key: SPARK-54585
> URL: https://issues.apache.org/jira/browse/SPARK-54585
> Project: Spark
> Issue Type: Task
> Components: Structured Streaming
> Affects Versions: 4.1.0
> Reporter: Dylan Wong
> Priority: Major
> Labels: pull-request-available
>
> Consider the case where {{abort()}} is called on
> {{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the
> {{RocksDB}} instance, which in turn calls
> {{changelogWriter.foreach(_.abort())}} and then sets {{{}changelogWriter =
> None{}}}.
> However, if {{changelogWriter.abort()}} throws an exception, the finally
> block still sets {{backingFileStream}} and {{compressedStream}} to
> {{{}null{}}}. The exception propagates, and we never reach the line that sets
> {{{}changelogWriter = None{}}}.
> This leaves the RocksDB instance in an inconsistent state:
> * changelogWriter = Some(changelogWriterWeAttemptedToAbort)
> * changelogWriterWeAttemptedToAbort.backingFileStream = null
> * changelogWriterWeAttemptedToAbort.compressedStream = null
> Now consider calling {{RocksDB.load()}} again. This calls
> {{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls
> {{{}changelogWriter.put(){}}}. At this point, the assertion
> {{assert(compressedStream != null)}} fails, causing an exception while
> loading the StateStore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]