Repository: spark
Updated Branches:
  refs/heads/master e3b7bb413 -> 5d572fc7c


[SPARK-25126][SQL] Avoid creating Reader for all orc files

## What changes were proposed in this pull request?

[SPARK-25126] (https://issues.apache.org/jira/browse/SPARK-25126)
reports loading a large number of orc files consumes a lot of memory
in both 2.0 and 2.3. The issue is caused by creating a Reader for every
orc file in order to infer the schema.

In OrFileOperator.ReadSchema, a Reader is created for every file
although only the first valid one is used. This uses significant
amount of memory when there `paths` have a lot of files. In 2.3
a different code path (OrcUtils.readSchema) is used for inferring
schema for orc files. This commit changes both functions to create
Reader lazily.

## How was this patch tested?

Pass the Jenkins with a newly added test case by dongjoon-hyun

Closes #22157 from raofu/SPARK-25126.

Lead-authored-by: Rao Fu <r...@coupang.com>
Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
Co-authored-by: Rao Fu <raof...@gmail.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d572fc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d572fc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d572fc7

Branch: refs/heads/master
Commit: 5d572fc7c35f76e27b2ab400674923eb8ba91745
Parents: e3b7bb4
Author: Rao Fu <r...@coupang.com>
Authored: Thu Aug 23 22:00:20 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Thu Aug 23 22:00:20 2018 +0800

----------------------------------------------------------------------
 .../execution/datasources/orc/OrcUtils.scala    |  7 ++--
 .../datasources/orc/OrcQuerySuite.scala         | 39 +++++++++++++++++++-
 .../spark/sql/hive/orc/OrcFileOperator.scala    | 11 +++---
 3 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d572fc7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index b404cfa..ac062fd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -79,9 +79,10 @@ object OrcUtils extends Logging {
     val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
     val conf = sparkSession.sessionState.newHadoopConf()
     // TODO: We need to support merge schema. Please see SPARK-11412.
-    files.map(_.getPath).flatMap(readSchema(_, conf, 
ignoreCorruptFiles)).headOption.map { schema =>
-      logDebug(s"Reading schema from file $files, got Hive schema string: 
$schema")
-      CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
+    files.toIterator.map(file => readSchema(file.getPath, conf, 
ignoreCorruptFiles)).collectFirst {
+      case Some(schema) =>
+        logDebug(s"Reading schema from file $files, got Hive schema string: 
$schema")
+        
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5d572fc7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index f58c331..e9dccbf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest {
       }
     }
 
+    def testAllCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+        spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+        val df = spark.read.orc(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString)
+        assert(df.count() == 0)
+      }
+    }
+
+    def testAllCorruptFilesWithoutSchemaInfer(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.json(new Path(basePath, 
"first").toString)
+        spark.range(1, 2).toDF("a").write.json(new Path(basePath, 
"second").toString)
+        val df = spark.read.schema("a long").orc(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString)
+        assert(df.count() == 0)
+      }
+    }
+
     withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
       testIgnoreCorruptFiles()
       testIgnoreCorruptFilesWithoutSchemaInfer()
+      val m1 = intercept[AnalysisException] {
+        testAllCorruptFiles()
+      }.getMessage
+      assert(m1.contains("Unable to infer schema for ORC"))
+      testAllCorruptFilesWithoutSchemaInfer()
     }
 
     withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
       val m1 = intercept[SparkException] {
         testIgnoreCorruptFiles()
       }.getMessage
-      assert(m1.contains("Could not read footer for file"))
+      assert(m1.contains("Malformed ORC file"))
       val m2 = intercept[SparkException] {
         testIgnoreCorruptFilesWithoutSchemaInfer()
       }.getMessage
       assert(m2.contains("Malformed ORC file"))
+      val m3 = intercept[SparkException] {
+        testAllCorruptFiles()
+      }.getMessage
+      assert(m3.contains("Could not read footer for file"))
+      val m4 = intercept[SparkException] {
+        testAllCorruptFilesWithoutSchemaInfer()
+      }.getMessage
+      assert(m4.contains("Malformed ORC file"))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5d572fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 80e44ca..713b70f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -92,11 +92,12 @@ private[hive] object OrcFileOperator extends Logging {
       : Option[StructType] = {
     // Take the first file where we can open a valid reader if we can find 
one.  Otherwise just
     // return None to indicate we can't infer the schema.
-    paths.flatMap(getFileReader(_, conf, ignoreCorruptFiles)).headOption.map { 
reader =>
-      val readerInspector = 
reader.getObjectInspector.asInstanceOf[StructObjectInspector]
-      val schema = readerInspector.getTypeName
-      logDebug(s"Reading schema from file $paths, got Hive schema string: 
$schema")
-      CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType]
+    paths.toIterator.map(getFileReader(_, conf, 
ignoreCorruptFiles)).collectFirst {
+      case Some(reader) =>
+        val readerInspector = 
reader.getObjectInspector.asInstanceOf[StructObjectInspector]
+        val schema = readerInspector.getTypeName
+        logDebug(s"Reading schema from file $paths, got Hive schema string: 
$schema")
+        CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType]
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to