Repository: spark
Updated Branches:
  refs/heads/master d6f5e172b -> 357babde5


[SPARK-23399][SQL] Register a task completion listener first for 
OrcColumnarBatchReader

## What changes were proposed in this pull request?

This PR aims to resolve an open file leakage issue reported at 
[SPARK-23390](https://issues.apache.org/jira/browse/SPARK-23390) by moving the 
listener registration position. Currently, the sequence is like the following.

1. Create `batchReader`
2. `batchReader.initialize` opens a ORC file.
3. `batchReader.initBatch` may take a long time to alloc memory in some 
environment and cause errors.
4. `Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))`

This PR moves 4 before 2 and 3. To sum up, the new sequence is 1 -> 4 -> 2 -> 3.

## How was this patch tested?

Manual. The following test case makes OOM intentionally to cause leaked 
filesystem connection in the current code base. With this patch, leakage 
doesn't occurs.

```scala
  // This should be tested manually because it raises OOM intentionally
  // in order to cause `Leaked filesystem connection`.
  test("SPARK-23399 Register a task completion listener first for 
OrcColumnarBatchReader") {
    withSQLConf(SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE.key -> 
s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("orc").save(new Path(basePath, 
"first").toString)
        Seq(1).toDF("a").write.format("orc").save(new Path(basePath, 
"second").toString)
        val df = spark.read.orc(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }
```

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #20590 from dongjoon-hyun/SPARK-23399.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/357babde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/357babde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/357babde

Branch: refs/heads/master
Commit: 357babde5a8eb9710de7016d7ae82dee21fa4ef3
Parents: d6f5e17
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Wed Feb 14 10:55:24 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Feb 14 10:55:24 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/orc/OrcFileFormat.scala  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/357babde/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index dbf3bc6..1de2ca2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -188,6 +188,12 @@ class OrcFileFormat
         if (enableVectorizedReader) {
           val batchReader = new OrcColumnarBatchReader(
             enableOffHeapColumnVector && taskContext.isDefined, copyToSpark, 
capacity)
+          // SPARK-23399 Register a task completion listener first to call 
`close()` in all cases.
+          // There is a possibility that `initialize` and `initBatch` hit some 
errors (like OOM)
+          // after opening a file.
+          val iter = new RecordReaderIterator(batchReader)
+          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
+
           batchReader.initialize(fileSplit, taskAttemptContext)
           batchReader.initBatch(
             reader.getSchema,
@@ -196,8 +202,6 @@ class OrcFileFormat
             partitionSchema,
             file.partitionValues)
 
-          val iter = new RecordReaderIterator(batchReader)
-          Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => 
iter.close()))
           iter.asInstanceOf[Iterator[InternalRow]]
         } else {
           val orcRecordReader = new OrcInputFormat[OrcStruct]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to