This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 64fe82b  [SPARK-29189][SQL] Add an option to ignore block locations 
when listing file
64fe82b is described below

commit 64fe82b519bdc854fcbef40e906ac1fb181534c2
Author: gwang3 <gwa...@ebay.com>
AuthorDate: Mon Oct 7 14:52:55 2019 -0500

    [SPARK-29189][SQL] Add an option to ignore block locations when listing file
    
    ### What changes were proposed in this pull request?
    In our PROD env, we have a pure Spark cluster, I think this is also pretty 
common, where computation is separated from storage layer. In such deploy mode, 
data locality is never reachable.
    And there are some configurations in Spark scheduler to reduce waiting time 
for data locality(e.g. "spark.locality.wait"). While, problem is that, in 
listing file phase, the location informations of all the files, with all the 
blocks inside each file, are all fetched from the distributed file system. 
Actually, in a PROD environment, a table can be so huge that even fetching all 
these location informations need take tens of seconds.
    To improve such scenario, Spark need provide an option, where data locality 
can be totally ignored, all we need in the listing file phase are the files 
locations, without any block location informations.
    
    ### Why are the changes needed?
    And we made a benchmark in our PROD env, after ignore the block locations, 
we got a pretty huge improvement.
    
    Table Size | Total File Number | Total Block Number | List File 
Duration(With Block Location) | List File Duration(Without Block Location)
    -- | -- | -- | -- | --
    22.6T | 30000 | 120000 | 16.841s | 1.730s
    28.8 T | 42001 | 148964 | 10.099s | 2.858s
    3.4 T | 20000 | 20000 | 5.833s | 4.881s
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Via ut.
    
    Closes #25869 from wangshisan/SPARK-29189.
    
    Authored-by: gwang3 <gwa...@ebay.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 13 ++++++++++
 .../execution/datasources/InMemoryFileIndex.scala  | 11 ++++++--
 .../sql/execution/datasources/FileIndexSuite.scala | 29 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9b2e314..3d28b5e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -832,6 +832,17 @@ object SQLConf {
       .intConf
       .createWithDefault(10000)
 
+  val IGNORE_DATA_LOCALITY =
+    buildConf("spark.sql.sources.ignore.datalocality")
+      .doc("If true, Spark will not fetch the block locations for each file on 
" +
+        "listing files. This speeds up file listing, but the scheduler cannot 
" +
+        "schedule tasks to take advantage of data locality. It can be 
particularly " +
+        "useful if data is read from a remote cluster so the scheduler could 
never " +
+        "take advantage of locality anyway.")
+      .internal()
+      .booleanConf
+      .createWithDefault(false)
+
   // Whether to automatically resolve ambiguity in join conditions for 
self-joins.
   // See SPARK-6231.
   val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -2494,6 +2505,8 @@ class SQLConf extends Serializable with Logging {
 
   def defaultV2Catalog: Option[String] = getConf(DEFAULT_V2_CATALOG)
 
+  def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index cf7a130..ed860f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -171,6 +171,7 @@ object InMemoryFileIndex extends Logging {
       areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {
 
     val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles
+    val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality
 
     // Short-circuits parallel listing when serial listing is likely to be 
faster.
     if (paths.size <= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
@@ -181,6 +182,7 @@ object InMemoryFileIndex extends Logging {
           filter,
           Some(sparkSession),
           ignoreMissingFiles = ignoreMissingFiles,
+          ignoreLocality = ignoreLocality,
           isRootPath = areRootPaths)
         (path, leafFiles)
       }
@@ -221,6 +223,7 @@ object InMemoryFileIndex extends Logging {
               filter,
               None,
               ignoreMissingFiles = ignoreMissingFiles,
+              ignoreLocality = ignoreLocality,
               isRootPath = areRootPaths)
             (path, leafFiles)
           }.iterator
@@ -287,6 +290,7 @@ object InMemoryFileIndex extends Logging {
       filter: PathFilter,
       sessionOpt: Option[SparkSession],
       ignoreMissingFiles: Boolean,
+      ignoreLocality: Boolean,
       isRootPath: Boolean): Seq[FileStatus] = {
     logTrace(s"Listing $path")
     val fs = path.getFileSystem(hadoopConf)
@@ -299,7 +303,7 @@ object InMemoryFileIndex extends Logging {
         // to retrieve the file status with the file block location. The 
reason to still fallback
         // to listStatus is because the default implementation would 
potentially throw a
         // FileNotFoundException which is better handled by doing the lookups 
manually below.
-        case _: DistributedFileSystem =>
+        case _: DistributedFileSystem if !ignoreLocality =>
           val remoteIter = fs.listLocatedStatus(path)
           new Iterator[LocatedFileStatus]() {
             def next(): LocatedFileStatus = remoteIter.next
@@ -353,6 +357,7 @@ object InMemoryFileIndex extends Logging {
               filter,
               sessionOpt,
               ignoreMissingFiles = ignoreMissingFiles,
+              ignoreLocality = ignoreLocality,
               isRootPath = false)
           }
       }
@@ -376,7 +381,7 @@ object InMemoryFileIndex extends Logging {
       // - Here we are calling `getFileBlockLocations` in a sequential manner, 
but it should not
       //   be a big deal since we always use to `bulkListLeafFiles` when the 
number of
       //   paths exceeds threshold.
-      case f =>
+      case f if !ignoreLocality =>
         // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
         // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
         // subprocess and parse the stdout).
@@ -400,6 +405,8 @@ object InMemoryFileIndex extends Logging {
             missingFiles += f.getPath.toString
             None
         }
+
+      case f => Some(f)
     }
 
     if (missingFiles.nonEmpty) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 4b086e8..a7a2349 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -416,6 +416,35 @@ class FileIndexSuite extends SharedSparkSession {
     }
   }
 
+  test("Add an option to ignore block locations when listing file") {
+    withTempDir { dir =>
+      val partitionDirectory = new File(dir, "a=foo")
+      partitionDirectory.mkdir()
+      for (i <- 1 to 8) {
+        val file = new File(partitionDirectory, i + ".txt")
+        stringToFile(file, "text")
+      }
+      val path = new Path(dir.getCanonicalPath)
+      val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
+      withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "false",
+         "fs.file.impl" -> classOf[SpecialBlockLocationFileSystem].getName) {
+        val withBlockLocations = fileIndex.
+          listLeafFiles(Seq(new Path(partitionDirectory.getPath)))
+
+        withSQLConf(SQLConf.IGNORE_DATA_LOCALITY.key -> "true") {
+          val withoutBlockLocations = fileIndex.
+            listLeafFiles(Seq(new Path(partitionDirectory.getPath)))
+
+          assert(withBlockLocations.size == withoutBlockLocations.size)
+          assert(withBlockLocations.forall(b => 
b.isInstanceOf[LocatedFileStatus] &&
+            b.asInstanceOf[LocatedFileStatus].getBlockLocations.nonEmpty))
+          assert(withoutBlockLocations.forall(b => b.isInstanceOf[FileStatus] 
&&
+            !b.isInstanceOf[LocatedFileStatus]))
+          assert(withoutBlockLocations.forall(withBlockLocations.contains))
+        }
+      }
+    }
+  }
 }
 
 object DeletionRaceFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to