Dylan Wong created SPARK-54585:
----------------------------------

             Summary: Always set changelogWriter to None on rollback
                 Key: SPARK-54585
                 URL: https://issues.apache.org/jira/browse/SPARK-54585
             Project: Spark
          Issue Type: Task
          Components: Structured Streaming
    Affects Versions: 4.1.0
            Reporter: Dylan Wong


Consider the case where {{abort()}} is called on 
{{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the {{RocksDB}} 
instance, which in turn calls {{changelogWriter.foreach(_.abort())}} and then 
sets {{{}changelogWriter = None{}}}.

However, if {{changelogWriter.abort()}} throws an exception, the finally block 
still sets {{backingFileStream}} and {{compressedStream}} to {{{}null{}}}. The 
exception propagates, and we never reach the line that sets {{{}changelogWriter 
= None{}}}.

This leaves the RocksDB instance in an inconsistent state:
 * changelogWriter = Some(changelogWriterWeAttemptedToAbort)
 * changelogWriterWeAttemptedToAbort.backingFileStream = null
 * changelogWriterWeAttemptedToAbort.compressedStream = null

Now consider calling {{RocksDB.load()}} again. This calls 
{{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls 
{{{}changelogWriter.put(){}}}. At this point, the assertion 
{{assert(compressedStream != null)}} fails, causing an exception while loading 
the StateStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to