This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9cee1bb2527 [SPARK-39847][SS] Fix race condition in
RocksDBLoader.loadLibrary() if caller thread is interrupted
9cee1bb2527 is described below
commit 9cee1bb2527a496943ffedbd935dc737246a2d89
Author: Josh Rosen <[email protected]>
AuthorDate: Sat Jul 23 15:45:58 2022 -0700
[SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if
caller thread is interrupted
### What changes were proposed in this pull request?
This PR fixes a race condition in `RocksDBLoader.loadLibrary()`, which can
occur if the thread which calls that method is interrupted.
One of our jobs experienced a failure in `RocksDBLoader`:
```
Caused by: java.lang.IllegalThreadStateException
at java.lang.Thread.start(Thread.java:708)
at
org.apache.spark.sql.execution.streaming.state.RocksDBLoader$.loadLibrary(RocksDBLoader.scala:51)
```
After investigation, we determined that this was due to task
cancellation/interruption: if the task which starts the RocksDB library loading
is interrupted, another thread may begin a load and crash with the thread state
exception:
- Although the `loadLibraryThread` child thread is is uninterruptible, the
task thread which calls loadLibrary is still interruptible.
- Let's say we have two tasks, A and B, both of which will call
`RocksDBLoader.loadLibrary()`
- Say that Task A wins the race to perform the load and enters the
`synchronized` block in `loadLibrary()`, starts the `loadLibraryThread`, then
blocks in the `loadLibraryThread.join()` call.
- If Task A is interrupted, an `InterruptedException` will be thrown and it
will exit the loadLibrary synchronized block.
- At this point, Task B enters the synchronized block of its `loadLibrary()
call and sees that `exception == null` because the `loadLibraryThread` started
by the other task is still running, so Task B calls `loadLibraryThread.start()`
and hits the thread state error because it tries to start an already-started
thread.
This PR fixes this issue by adding code to check `loadLibraryThread`'s
state before calling `start()`: if the thread has already been started then we
will skip the `start()` call and proceed directly to the `join()`. I also
modified the logging so that we can detect when this case occurs.
### Why are the changes needed?
Fix a bug that can lead to task or job failures.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
I reproduced the original race condition by adding a `Thread.sleep(10000)`
to `loadLibraryThread.run()` (so it wouldn't complete instantly), then ran
```scala
test("multi-threaded RocksDBLoader calls with interruption") {
val taskThread = new Thread("interruptible Task Thread 1") {
override def run(): Unit = {
RocksDBLoader.loadLibrary()
}
}
taskThread.start()
// Give the thread time to enter the `loadLibrary()` call:
Thread.sleep(1000)
taskThread.interrupt()
// Check that the load hasn't finished:
assert(RocksDBLoader.exception == null)
assert(RocksDBLoader.loadLibraryThread.getState != Thread.State.NEW)
// Simulate the second task thread starting the load:
RocksDBLoader.loadLibrary()
// The load should finish successfully:
RocksDBLoader.exception.isEmpty
}
```
This test failed prior to my changes and succeeds afterwards.
I don't want to actually commit this test because I'm concerned about
flakiness and false-negatives: in order to ensure that the test would have
failed before my change, we need to carefully control the thread interleaving.
This code rarely changes and is relatively simple, so I think the ROI on
spending time to write and commit a reliable test is low.
Closes #37260 from JoshRosen/rocksdbloader-fix.
Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/streaming/state/RocksDBLoader.scala | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
index cc518192437..02c98c14f86 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
@@ -48,7 +48,16 @@ object RocksDBLoader extends Logging {
def loadLibrary(): Unit = synchronized {
if (exception == null) {
- loadLibraryThread.start()
+ // SPARK-39847: if a task thread is interrupted while blocking in this
loadLibrary()
+ // call then a second task thread might start a loadLibrary() call while
the first
+ // call's loadLibraryThread is still running. Checking
loadLibraryThread's state here
+ // ensures that the second loadLibrary() call will wait for the original
call's
+ // loadLibraryThread to complete. If we didn't have this call then the
second
+ // loadLibraryCall() would call start() on an already-started thread,
causing a
+ // java.lang.IllegalThreadStateException error.
+ if (loadLibraryThread.getState == Thread.State.NEW) {
+ loadLibraryThread.start()
+ }
logInfo("RocksDB library loading thread started")
loadLibraryThread.join()
exception.foreach(throw _)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]