This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new fa697931ed7d [SPARK-51083][CORE] Modify JavaUtils to not swallow 
InterruptedExceptions
fa697931ed7d is described below

commit fa697931ed7d29962f5704a801752dbfd4ee3dd3
Author: Neil Ramaswamy <[email protected]>
AuthorDate: Wed Feb 19 13:43:08 2025 +0800

    [SPARK-51083][CORE] Modify JavaUtils to not swallow InterruptedExceptions
    
    ### What changes were proposed in this pull request?
    
    These changes modify the `deleteRecursivelyUsingUnixNative` method in 
`JavaUtils.java` to not swallow `InterruptedException`s. The bulk of the 
changes in this PR relate to modifying the signatures of methods that directly 
or indirectly use `JavaUtils.deleteRecursivelyUsingUnixNative`.
    
    ### Why are the changes needed?
    
    `JavaUtils.deleteRecursively` swallows `InterruptedException`s, and that 
can cause entire executor loss. Consider a streaming task running a streaming 
aggregation. It takes the following steps:
    
    1. It 
[writes](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L720)
 all data to the state store, and then 
[reads](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L732)
 this data back to emit it (in, say, Complete mode).
    2. With the RocksDB state store, it doesn't actually acquire any locks, 
perform any sleeping, etc. during (1). This means that if the query is 
cancelled and it is interrupted, it won't respond to the interrupt.
    3. After (1) is done, it [calls 
commit](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L744)
 on the state store.
    4. In RocksDB, this turns into a call to `RocksDB::commit`, which can call 
[createSnapshot](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala#L879).
    5. In createSnapshot, [we call 
Utils.deleteRecursively](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala#L879)
 to make sure that the temporary directory into which we place the RocksDB 
checkpoint is empty.
    6. In deleteRecursively, we [call 
deleteRecursivelyUsingUnixNative](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L115)
 which issues an `rm -rf` and then [calls 
process.waitFor](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L183),
 which declares that it could throw an InterruptedE [...]
    7. Unfortunately, deleteRecursivelyUsingUnixNative [indiscriminately 
catches all 
Exceptions](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L184),
 which includes the `InterruptedException` potentially thrown by 
`process.waitFor`. Then, it rethrows an IOException.
    8. That IOException is caught 
[here](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L117),
 and a warning is logged. The interrupt has now been swallowed.
    
    A streaming task thread that misses this interrupted exception will now not 
exit. If it doesn't exit for 60 seconds, then the TaskReaper will kill the JVM.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This is fairly tricky to test; in fact, we don't exercise the code-path for 
`deleteRecursivelyUsingUnixNative` during testing on Apple Silicon, which made 
it hard for me.
    
    My approach here was to modify `deleteRecursivelyUsingUnixNative` to 
explicitly sleep. Then, I called that method in a background thread, 
interrupting it from the test thread. If all went correctly, then the 
background thread _should_ have received an interrupt that it did not swallow, 
and the test thread could assert that that happened. You can see this test in 
[this 
commit](https://github.com/apache/spark/pull/49796/commits/95577db37c9209e64034db2e22879590ae636020),
 but since it's a [...]
    
    ```
    [info] UtilsSuite:
    1. Starting thread
    2. Waiting for it to get to its sleep
    3. Going to spawn rm -rf
    4. Finished deleting directory
    5. Sleeping for 10 seconds
    6. Interrupting thread that is sleeping
    [info] - deleteRecursively throws InterruptedException (5 seconds, 18 
milliseconds)
    7. Catching and rethrowing interrupted exception
    8. Thread exiting
    9. gotInterruptedException = true
    ```
    
    I validated that this test case I added _fails_ without my changes to fix 
the interrupt swallowing; you can check out that commit 
[here](https://github.com/apache/spark/pull/49796/commits/70f2a7b3e916a7a54b23901919159adc49ab8dda).
    
    If you are a reviewer who can think of a way to create a test case we can 
check-in, please leave a review comment.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49796 from neilramaswamy/nr/spark-51083.
    
    Authored-by: Neil Ramaswamy <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 2c628997a1cdc9ed6bdc6b49e5669e2fc85be870)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../java/org/apache/spark/network/StreamTestHelper.java    |  3 +--
 .../org/apache/spark/network/util/DBProviderSuite.java     |  7 ++++---
 .../spark/network/shuffle/TestShuffleDataContext.java      |  2 +-
 .../main/java/org/apache/spark/network/util/JavaUtils.java | 14 ++++++++++----
 4 files changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
 
b/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
index 7d5db149d4e4..b7754e3fd211 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
@@ -19,7 +19,6 @@ package org.apache.spark.network;
 
 import java.io.File;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 
@@ -85,7 +84,7 @@ class StreamTestHelper {
     if (tempDir != null) {
       try {
         JavaUtils.deleteRecursively(tempDir);
-      } catch (IOException io) {
+      } catch (Exception io) {
         throw new RuntimeException(io);
       }
     }
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java
index e258b9e6ff40..81bfc55264c4 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java
@@ -32,17 +32,18 @@ import static org.junit.jupiter.api.Assumptions.assumeFalse;
 public class DBProviderSuite {
 
   @Test
-  public void testRockDBCheckVersionFailed() throws IOException {
+  public void testRockDBCheckVersionFailed() throws IOException, 
InterruptedException {
     testCheckVersionFailed(DBBackend.ROCKSDB, "rocksdb");
   }
 
   @Test
-  public void testLevelDBCheckVersionFailed() throws IOException {
+  public void testLevelDBCheckVersionFailed() throws IOException, 
InterruptedException {
     assumeFalse(SystemUtils.IS_OS_MAC_OSX && 
SystemUtils.OS_ARCH.equals("aarch64"));
     testCheckVersionFailed(DBBackend.LEVELDB, "leveldb");
   }
 
-  private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix) 
throws IOException {
+  private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix)
+      throws IOException, InterruptedException {
     String root = System.getProperty("java.io.tmpdir");
     File dbFile = JavaUtils.createDirectory(root, namePrefix);
     try {
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index eeb936773aaa..49b17824c3c7 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -63,7 +63,7 @@ public class TestShuffleDataContext {
     for (String localDir : localDirs) {
       try {
         JavaUtils.deleteRecursively(new File(localDir));
-      } catch (IOException e) {
+      } catch (Exception e) {
         logger.warn("Unable to cleanup localDir = " + localDir, e);
       }
     }
diff --git 
a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java 
b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 3482c6addfee..94f9f02ed2c9 100644
--- a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -90,7 +90,7 @@ public class JavaUtils {
    * @param file Input file / dir to be deleted
    * @throws IOException if deletion is unsuccessful
    */
-  public static void deleteRecursively(File file) throws IOException {
+  public static void deleteRecursively(File file) throws IOException, 
InterruptedException {
     deleteRecursively(file, null);
   }
 
@@ -103,7 +103,8 @@ public class JavaUtils {
    *               are deleted.
    * @throws IOException if deletion is unsuccessful
    */
-  public static void deleteRecursively(File file, FilenameFilter filter) 
throws IOException {
+  public static void deleteRecursively(File file, FilenameFilter filter)
+      throws IOException, InterruptedException {
     if (file == null) { return; }
 
     // On Unix systems, use operating system command to run faster
@@ -125,7 +126,7 @@ public class JavaUtils {
 
   private static void deleteRecursivelyUsingJavaIO(
       File file,
-      FilenameFilter filter) throws IOException {
+      FilenameFilter filter) throws IOException, InterruptedException {
     BasicFileAttributes fileAttributes = readFileAttributes(file);
     // SPARK-50716: If the file attributes are null, that is, the file 
attributes cannot be read,
     // or if the file does not exist and is not a broken symbolic link, then 
return directly.
@@ -168,7 +169,8 @@ public class JavaUtils {
     }
   }
 
-  private static void deleteRecursivelyUsingUnixNative(File file) throws 
IOException {
+  private static void deleteRecursivelyUsingUnixNative(File file)
+      throws InterruptedException, IOException {
     ProcessBuilder builder = new ProcessBuilder("rm", "-rf", 
file.getAbsolutePath());
     Process process = null;
     int exitCode = -1;
@@ -181,6 +183,10 @@ public class JavaUtils {
       process = builder.start();
 
       exitCode = process.waitFor();
+    } catch (InterruptedException e) {
+      // SPARK-51083: Specifically rethrow InterruptedException if it occurs, 
since swallowing
+      // it will lead to tasks missing cancellation.
+      throw e;
     } catch (Exception e) {
       throw new IOException("Failed to delete: " + file.getAbsolutePath(), e);
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to