This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 082bdb7f33e2 [SPARK-49980][CORE][SQL] Fix potential file stream leaks 
caused by interruption in canceled tasks
082bdb7f33e2 is described below

commit 082bdb7f33e21e575f40bf3be2145d0ad118abee
Author: Davin Tjong <[email protected]>
AuthorDate: Thu Oct 31 16:08:19 2024 +0900

    [SPARK-49980][CORE][SQL] Fix potential file stream leaks caused by 
interruption in canceled tasks
    
    ### What changes were proposed in this pull request?
    
    Fix potential file stream leaks caused by interruption in canceled tasks.
    
    ### Why are the changes needed?
    
    There is a common pattern of:
    
    ```
    val intermediateReader = ... constructor ...
    intermediateReader.initialize(...) <- this may throw an error, i.e. if the 
task is interrupted.
    return WrapperIterator(intermediateReader)
    ```
    
    If an error is thrown in `initialize`, `intermediateReader` is never closed 
and the caller has no reference and cannot close it itself, causing the open 
stream to leak.
    
    This is handled in many other analogous places, but not everywhere. Fixing 
some of them here.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #48483 from davintjong-db/file-stream-leak.
    
    Authored-by: Davin Tjong <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/util/SparkErrorUtils.scala    | 16 ++++++++++++
 .../scala/org/apache/spark/rdd/NewHadoopRDD.scala  | 10 +++++---
 .../datasources/HadoopFileLinesReader.scala        | 18 ++++++++------
 .../datasources/HadoopFileWholeTextReader.scala    | 12 ++++++---
 .../v2/parquet/ParquetPartitionReaderFactory.scala | 29 ++++++++++++----------
 5 files changed, 57 insertions(+), 28 deletions(-)

diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala 
b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
index 9f604e4bf47f..872c89e5a29a 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala
@@ -48,6 +48,22 @@ private[spark] trait SparkErrorUtils extends Logging {
     try f.apply(resource) finally resource.close()
   }
 
+  /**
+   * Try to initialize a resource. If an exception is throw during 
initialization, closes the
+   * resource before propagating the error. Otherwise, the caller is 
responsible for closing
+   * the resource. This means that [[T]] should provide some way to close the 
resource.
+   */
+  def tryInitializeResource[R <: Closeable, T](createResource: => 
R)(initialize: R => T): T = {
+    val resource = createResource
+    try {
+      initialize(resource)
+    } catch {
+      case e: Throwable =>
+        resource.close()
+        throw e
+    }
+  }
+
   /**
    * Execute a block of code, then a finally block, but if exceptions happen in
    * the finally block, do not suppress the original exception.
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index bf539320b598..2b6f322d1805 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -242,10 +242,12 @@ class NewHadoopRDD[K, V](
       private var finished = false
       private var reader =
         try {
-          val _reader = format.createRecordReader(
-            split.serializableHadoopSplit.value, hadoopAttemptContext)
-          _reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
-          _reader
+          Utils.tryInitializeResource(
+            format.createRecordReader(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
+          ) { reader =>
+            reader.initialize(split.serializableHadoopSplit.value, 
hadoopAttemptContext)
+            reader
+          }
         } catch {
           case e: FileNotFoundException if ignoreMissingFiles =>
             logWarning(log"Skipped missing file: ${MDC(PATH, 
split.serializableHadoopSplit)}", e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
index 5ec17290c37d..bf6da8765e51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -25,6 +25,8 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
+import org.apache.spark.util.Utils
+
 /**
  * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which 
are all of the lines
  * in that file.
@@ -54,14 +56,16 @@ class HadoopFileLinesReader(
     val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
     val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
 
-    val reader = lineSeparator match {
-      case Some(sep) => new LineRecordReader(sep)
-      // If the line separator is `None`, it covers `\r`, `\r\n` and `\n`.
-      case _ => new LineRecordReader()
+    Utils.tryInitializeResource(
+      lineSeparator match {
+        case Some(sep) => new LineRecordReader(sep)
+        // If the line separator is `None`, it covers `\r`, `\r\n` and `\n`.
+        case _ => new LineRecordReader()
+      }
+    ) { reader =>
+      reader.initialize(fileSplit, hadoopAttemptContext)
+      new RecordReaderIterator(reader)
     }
-
-    reader.initialize(fileSplit, hadoopAttemptContext)
-    new RecordReaderIterator(reader)
   }
 
   override def hasNext: Boolean = _iterator.hasNext
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
index 17649f62d84a..f49c66f9198f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark.input.WholeTextFileRecordReader
+import org.apache.spark.util.Utils
 
 /**
  * An adaptor from a [[PartitionedFile]] to an [[Iterator]] of [[Text]], which 
is all of the lines
@@ -42,10 +43,13 @@ class HadoopFileWholeTextReader(file: PartitionedFile, 
conf: Configuration)
       Array.empty[String])
     val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
     val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-    val reader = new WholeTextFileRecordReader(fileSplit, 
hadoopAttemptContext, 0)
-    reader.setConf(hadoopAttemptContext.getConfiguration)
-    reader.initialize(fileSplit, hadoopAttemptContext)
-    new RecordReaderIterator(reader)
+    Utils.tryInitializeResource(
+      new WholeTextFileRecordReader(fileSplit, hadoopAttemptContext, 0)
+    ) { reader =>
+      reader.setConf(hadoopAttemptContext.getConfiguration)
+      reader.initialize(fileSplit, hadoopAttemptContext)
+      new RecordReaderIterator(reader)
+    }
   }
 
   override def hasNext: Boolean = _iterator.hasNext
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 5774df95ac07..d3643f7426db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
  * A factory used to create Parquet readers.
@@ -261,19 +261,22 @@ case class ParquetPartitionReaderFactory(
     val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
       footerFileMetaData.getKeyValueMetaData.get,
       int96RebaseModeInRead)
-    val reader = buildReaderFunc(
-      file.partitionValues,
-      pushed,
-      convertTz,
-      datetimeRebaseSpec,
-      int96RebaseSpec)
-    reader match {
-      case vectorizedReader: VectorizedParquetRecordReader =>
-        vectorizedReader.initialize(split, hadoopAttemptContext, 
Option.apply(fileFooter))
-      case _ =>
-        reader.initialize(split, hadoopAttemptContext)
+    Utils.tryInitializeResource(
+      buildReaderFunc(
+        file.partitionValues,
+        pushed,
+        convertTz,
+        datetimeRebaseSpec,
+        int96RebaseSpec)
+    ) { reader =>
+      reader match {
+        case vectorizedReader: VectorizedParquetRecordReader =>
+          vectorizedReader.initialize(split, hadoopAttemptContext, 
Option.apply(fileFooter))
+        case _ =>
+          reader.initialize(split, hadoopAttemptContext)
+      }
+      reader
     }
-    reader
   }
 
   private def createRowBaseReader(file: PartitionedFile): RecordReader[Void, 
InternalRow] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to