This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 64fe82b [SPARK-29189][SQL] Add an option to ignore block locations when listing file 64fe82b is described below commit 64fe82b519bdc854fcbef40e906ac1fb181534c2 Author: gwang3 <gwa...@ebay.com> AuthorDate: Mon Oct 7 14:52:55 2019 -0500 [SPARK-29189][SQL] Add an option to ignore block locations when listing file ### What changes were proposed in this pull request? In our PROD env, we have a pure Spark cluster, I think this is also pretty common, where computation is separated from storage layer. In such deploy mode, data locality is never reachable. And there are some configurations in Spark scheduler to reduce waiting time for data locality(e.g. "spark.locality.wait"). While, problem is that, in listing file phase, the location informations of all the files, with all the blocks inside each file, are all fetched from the distributed file system. Actually, in a PROD environment, a table can be so huge that even fetching all these location informations need take tens of seconds. To improve such scenario, Spark need provide an option, where data locality can be totally ignored, all we need in the listing file phase are the files locations, without any block location informations. ### Why are the changes needed? And we made a benchmark in our PROD env, after ignore the block locations, we got a pretty huge improvement. Table Size | Total File Number | Total Block Number | List File Duration(With Block Location) | List File Duration(Without Block Location) -- | -- | -- | -- | -- 22.6T | 30000 | 120000 | 16.841s | 1.730s 28.8 T | 42001 | 148964 | 10.099s | 2.858s 3.4 T | 20000 | 20000 | 5.833s | 4.881s ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Via ut. Closes #25869 from wangshisan/SPARK-29189. Authored-by: gwang3 <gwa...@ebay.com> Signed-off-by: Imran Rashid <iras...@cloudera.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++++++ .../execution/datasources/InMemoryFileIndex.scala | 11 ++++++-- .../sql/execution/datasources/FileIndexSuite.scala | 29 ++++++++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9b2e314..3d28b5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -832,6 +832,17 @@ object SQLConf { .intConf .createWithDefault(10000) + val IGNORE_DATA_LOCALITY = + buildConf("spark.sql.sources.ignore.datalocality") + .doc("If true, Spark will not fetch the block locations for each file on " + + "listing files. This speeds up file listing, but the scheduler cannot " + + "schedule tasks to take advantage of data locality. It can be particularly " + + "useful if data is read from a remote cluster so the scheduler could never " + + "take advantage of locality anyway.") + .internal() + .booleanConf + .createWithDefault(false) + // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = @@ -2494,6 +2505,8 @@ class SQLConf extends Serializable with Logging { def defaultV2Catalog: Option[String] = getConf(DEFAULT_V2_CATALOG) + def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index cf7a130..ed860f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -171,6 +171,7 @@ object InMemoryFileIndex extends Logging { areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles + val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { @@ -181,6 +182,7 @@ object InMemoryFileIndex extends Logging { filter, Some(sparkSession), ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, isRootPath = areRootPaths) (path, leafFiles) } @@ -221,6 +223,7 @@ object InMemoryFileIndex extends Logging { filter, None, ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, isRootPath = areRootPaths) (path, leafFiles) }.iterator @@ -287,6 +290,7 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sessionOpt: Option[SparkSession], ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, isRootPath: Boolean): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -299,7 +303,7 @@ object InMemoryFileIndex extends Logging { // to retrieve the file status with the file block location. The reason to still fallback // to listStatus is because the default implementation would potentially throw a // FileNotFoundException which is better handled by doing the lookups manually below. - case _: DistributedFileSystem => + case _: DistributedFileSystem if !ignoreLocality => val remoteIter = fs.listLocatedStatus(path) new Iterator[LocatedFileStatus]() { def next(): LocatedFileStatus = remoteIter.next @@ -353,6 +357,7 @@ object InMemoryFileIndex extends Logging { filter, sessionOpt, ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, isRootPath = false) } } @@ -376,7 +381,7 @@ object InMemoryFileIndex extends Logging { // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not // be a big deal since we always use to `bulkListLeafFiles` when the number of // paths exceeds threshold. - case f => + case f if !ignoreLocality => // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), // which is very slow on some file system (RawLocalFileSystem, which is launch a // subprocess and parse the stdout). @@ -400,6 +405,8 @@ object InMemoryFileIndex extends Logging { missingFiles += f.getPath.toString None } + + case f => Some(f) } if (missingFiles.nonEmpty) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 4b086e8..a7a2349 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -416,6 +416,35 @@ class FileIndexSuite extends SharedSparkSession { } } + test("Add an option to ignore block locations when listing file") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + for (i <- 1 to 8) { + val file = new File(partitionDirectory, i + ".txt") + stringToFile(file, "text") + } + val path = new Path(dir.getCanonicalPath) + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "false", + "fs.file.impl" -> classOf[SpecialBlockLocationFileSystem].getName) { + val withBlockLocations = fileIndex. + listLeafFiles(Seq(new Path(partitionDirectory.getPath))) + + withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "true") { + val withoutBlockLocations = fileIndex. + listLeafFiles(Seq(new Path(partitionDirectory.getPath))) + + assert(withBlockLocations.size == withoutBlockLocations.size) + assert(withBlockLocations.forall(b => b.isInstanceOf[LocatedFileStatus] && + b.asInstanceOf[LocatedFileStatus].getBlockLocations.nonEmpty)) + assert(withoutBlockLocations.forall(b => b.isInstanceOf[FileStatus] && + !b.isInstanceOf[LocatedFileStatus])) + assert(withoutBlockLocations.forall(withBlockLocations.contains)) + } + } + } + } } object DeletionRaceFileSystem { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org