This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cee1bb2527 [SPARK-39847][SS] Fix race condition in 
RocksDBLoader.loadLibrary() if caller thread is interrupted
9cee1bb2527 is described below

commit 9cee1bb2527a496943ffedbd935dc737246a2d89
Author: Josh Rosen <[email protected]>
AuthorDate: Sat Jul 23 15:45:58 2022 -0700

    [SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if 
caller thread is interrupted
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a race condition in `RocksDBLoader.loadLibrary()`, which can 
occur if the thread which calls that method is interrupted.
    
    One of our jobs experienced a failure in `RocksDBLoader`:
    
    ```
    Caused by: java.lang.IllegalThreadStateException
            at java.lang.Thread.start(Thread.java:708)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDBLoader$.loadLibrary(RocksDBLoader.scala:51)
    ```
    
    After investigation, we determined that this was due to task 
cancellation/interruption: if the task which starts the RocksDB library loading 
is interrupted, another thread may begin a load and crash with the thread state 
exception:
    
    - Although the `loadLibraryThread` child thread is is uninterruptible, the 
task thread which calls loadLibrary is still interruptible.
    - Let's say we have two tasks, A and B, both of which will call 
`RocksDBLoader.loadLibrary()`
    - Say that Task A wins the race to perform the load and enters the 
`synchronized` block in `loadLibrary()`, starts the `loadLibraryThread`, then 
blocks in the `loadLibraryThread.join()` call.
    - If Task A is interrupted, an `InterruptedException` will be thrown and it 
will exit the loadLibrary synchronized block.
    - At this point, Task B enters the synchronized block of its `loadLibrary() 
call and sees that `exception == null` because the `loadLibraryThread` started 
by the other task is still running, so Task B calls `loadLibraryThread.start()` 
and hits the thread state error because it tries to start an already-started 
thread.
    
    This PR fixes this issue by adding code to check `loadLibraryThread`'s 
state before calling `start()`: if the thread has already been started then we 
will skip the `start()` call and proceed directly to the `join()`. I also 
modified the logging so that we can detect when this case occurs.
    
    ### Why are the changes needed?
    
    Fix a bug that can lead to task or job failures.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    I reproduced the original race condition by adding a `Thread.sleep(10000)` 
to `loadLibraryThread.run()` (so it wouldn't complete instantly), then ran
    
    ```scala
      test("multi-threaded RocksDBLoader calls with interruption") {
    
        val taskThread = new Thread("interruptible Task Thread 1") {
          override def run(): Unit = {
            RocksDBLoader.loadLibrary()
          }
        }
    
        taskThread.start()
        // Give the thread time to enter the `loadLibrary()` call:
        Thread.sleep(1000)
        taskThread.interrupt()
        // Check that the load hasn't finished:
        assert(RocksDBLoader.exception == null)
        assert(RocksDBLoader.loadLibraryThread.getState != Thread.State.NEW)
        // Simulate the second task thread starting the load:
        RocksDBLoader.loadLibrary()
        // The load should finish successfully:
        RocksDBLoader.exception.isEmpty
      }
    ```
    
    This test failed prior to my changes and succeeds afterwards.
    
    I don't want to actually commit this test because I'm concerned about 
flakiness and false-negatives: in order to ensure that the test would have 
failed before my change, we need to carefully control the thread interleaving. 
This code rarely changes and is relatively simple, so I think the ROI on 
spending time to write and commit a reliable test is low.
    
    Closes #37260 from JoshRosen/rocksdbloader-fix.
    
    Authored-by: Josh Rosen <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/execution/streaming/state/RocksDBLoader.scala   | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
index cc518192437..02c98c14f86 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
@@ -48,7 +48,16 @@ object RocksDBLoader extends Logging {
 
   def loadLibrary(): Unit = synchronized {
     if (exception == null) {
-      loadLibraryThread.start()
+      // SPARK-39847: if a task thread is interrupted while blocking in this 
loadLibrary()
+      // call then a second task thread might start a loadLibrary() call while 
the first
+      // call's loadLibraryThread is still running. Checking 
loadLibraryThread's state here
+      // ensures that the second loadLibrary() call will wait for the original 
call's
+      // loadLibraryThread to complete. If we didn't have this call then the 
second
+      // loadLibraryCall() would call start() on an already-started thread, 
causing a
+      // java.lang.IllegalThreadStateException error.
+      if (loadLibraryThread.getState == Thread.State.NEW) {
+        loadLibraryThread.start()
+      }
       logInfo("RocksDB library loading thread started")
       loadLibraryThread.join()
       exception.foreach(throw _)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to