Repository: spark Updated Branches: refs/heads/master e3b7bb413 -> 5d572fc7c
[SPARK-25126][SQL] Avoid creating Reader for all orc files ## What changes were proposed in this pull request? [SPARK-25126] (https://issues.apache.org/jira/browse/SPARK-25126) reports loading a large number of orc files consumes a lot of memory in both 2.0 and 2.3. The issue is caused by creating a Reader for every orc file in order to infer the schema. In OrFileOperator.ReadSchema, a Reader is created for every file although only the first valid one is used. This uses significant amount of memory when there `paths` have a lot of files. In 2.3 a different code path (OrcUtils.readSchema) is used for inferring schema for orc files. This commit changes both functions to create Reader lazily. ## How was this patch tested? Pass the Jenkins with a newly added test case by dongjoon-hyun Closes #22157 from raofu/SPARK-25126. Lead-authored-by: Rao Fu <r...@coupang.com> Co-authored-by: Dongjoon Hyun <dongj...@apache.org> Co-authored-by: Rao Fu <raof...@gmail.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d572fc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d572fc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d572fc7 Branch: refs/heads/master Commit: 5d572fc7c35f76e27b2ab400674923eb8ba91745 Parents: e3b7bb4 Author: Rao Fu <r...@coupang.com> Authored: Thu Aug 23 22:00:20 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Thu Aug 23 22:00:20 2018 +0800 ---------------------------------------------------------------------- .../execution/datasources/orc/OrcUtils.scala | 7 ++-- .../datasources/orc/OrcQuerySuite.scala | 39 +++++++++++++++++++- .../spark/sql/hive/orc/OrcFileOperator.scala | 11 +++--- 3 files changed, 48 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5d572fc7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala index b404cfa..ac062fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala @@ -79,9 +79,10 @@ object OrcUtils extends Logging { val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles val conf = sparkSession.sessionState.newHadoopConf() // TODO: We need to support merge schema. Please see SPARK-11412. - files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema => - logDebug(s"Reading schema from file $files, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] + files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst { + case Some(schema) => + logDebug(s"Reading schema from file $files, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType] } } http://git-wip-us.apache.org/repos/asf/spark/blob/5d572fc7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index f58c331..e9dccbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -562,20 +562,57 @@ abstract class OrcQueryTest extends OrcTest { } } + def testAllCorruptFiles(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) + val df = spark.read.orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) + assert(df.count() == 0) + } + } + + def testAllCorruptFilesWithoutSchemaInfer(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.json(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.json(new Path(basePath, "second").toString) + val df = spark.read.schema("a long").orc( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString) + assert(df.count() == 0) + } + } + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { testIgnoreCorruptFiles() testIgnoreCorruptFilesWithoutSchemaInfer() + val m1 = intercept[AnalysisException] { + testAllCorruptFiles() + }.getMessage + assert(m1.contains("Unable to infer schema for ORC")) + testAllCorruptFilesWithoutSchemaInfer() } withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { val m1 = intercept[SparkException] { testIgnoreCorruptFiles() }.getMessage - assert(m1.contains("Could not read footer for file")) + assert(m1.contains("Malformed ORC file")) val m2 = intercept[SparkException] { testIgnoreCorruptFilesWithoutSchemaInfer() }.getMessage assert(m2.contains("Malformed ORC file")) + val m3 = intercept[SparkException] { + testAllCorruptFiles() + }.getMessage + assert(m3.contains("Could not read footer for file")) + val m4 = intercept[SparkException] { + testAllCorruptFilesWithoutSchemaInfer() + }.getMessage + assert(m4.contains("Malformed ORC file")) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5d572fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 80e44ca..713b70f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -92,11 +92,12 @@ private[hive] object OrcFileOperator extends Logging { : Option[StructType] = { // Take the first file where we can open a valid reader if we can find one. Otherwise just // return None to indicate we can't infer the schema. - paths.flatMap(getFileReader(_, conf, ignoreCorruptFiles)).headOption.map { reader => - val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] - val schema = readerInspector.getTypeName - logDebug(s"Reading schema from file $paths, got Hive schema string: $schema") - CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType] + paths.toIterator.map(getFileReader(_, conf, ignoreCorruptFiles)).collectFirst { + case Some(reader) => + val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector] + val schema = readerInspector.getTypeName + logDebug(s"Reading schema from file $paths, got Hive schema string: $schema") + CatalystSqlParser.parseDataType(schema).asInstanceOf[StructType] } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org