This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 25503ec  [SPARK-36500][CORE] Fix temp_shuffle file leaking when a task 
is interrupted
25503ec is described below

commit 25503ecdde99db244172602c70be7450204e4c3f
Author: Xingbo Jiang <[email protected]>
AuthorDate: Fri Aug 13 19:25:20 2021 +0900

    [SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is interrupted
    
    ### What changes were proposed in this pull request?
    
    When a task thread is interrupted, the underlying output stream referred by 
`DiskBlockObjectWriter.mcs` may have been closed, then we get IOException when 
flushing the buffered data. This breaks the assumption that 
`revertPartialWritesAndClose()` should not throw exceptions.
    
    To fix the issue, we can catch the IOException in 
`ManualCloseOutputStream.manualClose()`.
    
    ### Why are the changes needed?
    
    Previously the IOException was not captured, thus 
`revertPartialWritesAndClose()` threw an exception. When this happens, 
`BypassMergeSortShuffleWriter.stop()` would stop deleting the temp_shuffle 
files tracked by `partitionWriters`, hens lead to temp_shuffle file leak issues.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is an internal bug fix.
    
    ### How was this patch tested?
    
    Tested by running a longevity stress test. After the fix, there is no more 
leaked temp_shuffle files.
    
    Closes #33731 from jiangxb1987/temp_shuffle.
    
    Authored-by: Xingbo Jiang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit ec5f3a17e33f7afe03e48f8b7690a8b18ae0c058)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/storage/DiskBlockObjectWriter.scala    | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index e55c0927..38b845e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
+import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, 
OutputStream}
 import java.nio.channels.{ClosedByInterruptException, FileChannel}
 
 import org.apache.spark.internal.Logging
@@ -62,7 +62,16 @@ private[spark] class DiskBlockObjectWriter(
     }
 
     def manualClose(): Unit = {
-      super.close()
+      try {
+        super.close()
+      } catch {
+        // The output stream may have been closed when the task thread is 
interrupted, then we
+        // get IOException when flushing the buffered data. We should catch 
and log the exception
+        // to ensure the revertPartialWritesAndClose() function doesn't throw 
an exception.
+        case e: IOException =>
+          logError("Exception occurred while manually close the output stream 
to file "
+            + file + ", " + e.getMessage)
+      }
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to