This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d243fb88803 [SPARK-43248][SQL] Unnecessary serialize/deserialize of 
Path on parallel gather partition stats
d243fb88803 is described below

commit d243fb888039ab3407e88310401d57b054638ab6
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Apr 26 20:40:22 2023 -0700

    [SPARK-43248][SQL] Unnecessary serialize/deserialize of Path on parallel 
gather partition stats
    
    ### What changes were proposed in this pull request?
    
    Remove unnecessary serialize/deserialize of `Path` on parallel gather 
partition stats.
    
    ### Why are the changes needed?
    
    Simplify code, since `Path` is serializable in Hadoop3.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #40920 from pan3793/SPARK-43248.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Chao Sun <[email protected]>
---
 .../apache/spark/sql/execution/command/ddl.scala   | 29 +++++++++++-----------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index fdd4f10c793..5eae3259729 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -710,7 +710,7 @@ case class RepairTableCommand(
       val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
         gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, 
threshold)
       } else {
-        Map.empty[String, PartitionStatistics]
+        Map.empty[Path, PartitionStatistics]
       }
       logInfo(s"Finished to gather the fast stats for all $total partitions.")
 
@@ -785,32 +785,33 @@ case class RepairTableCommand(
       partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
       fs: FileSystem,
       pathFilter: PathFilter,
-      threshold: Int): Map[String, PartitionStatistics] = {
-    if (partitionSpecsAndLocs.length > threshold) {
+      threshold: Int): Map[Path, PartitionStatistics] = {
+    val partitionNum = partitionSpecsAndLocs.length
+    if (partitionNum > threshold) {
       val hadoopConf = spark.sessionState.newHadoopConf()
       val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-      val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
+      val locations = partitionSpecsAndLocs.map(_._2)
 
       // Set the number of parallelism to prevent following file listing from 
generating many tasks
       // in case of large #defaultParallelism.
-      val numParallelism = Math.min(serializedPaths.length,
+      val numParallelism = Math.min(partitionNum,
         Math.min(spark.sparkContext.defaultParallelism, 10000))
       // gather the fast stats for all the partitions otherwise Hive metastore 
will list all the
       // files for all the new partitions in sequential way, which is super 
slow.
       logInfo(s"Gather the fast stats in parallel using $numParallelism 
tasks.")
-      spark.sparkContext.parallelize(serializedPaths, numParallelism)
-        .mapPartitions { paths =>
+      spark.sparkContext.parallelize(locations, numParallelism)
+        .mapPartitions { locationsEachPartition =>
           val pathFilter = getPathFilter(serializableConfiguration.value)
-          paths.map(new Path(_)).map{ path =>
-            val fs = path.getFileSystem(serializableConfiguration.value)
-            val statuses = fs.listStatus(path, pathFilter)
-            (path.toString, PartitionStatistics(statuses.length, 
statuses.map(_.getLen).sum))
+          locationsEachPartition.map { location =>
+            val fs = location.getFileSystem(serializableConfiguration.value)
+            val statuses = fs.listStatus(location, pathFilter)
+            (location, PartitionStatistics(statuses.length, 
statuses.map(_.getLen).sum))
           }
         }.collectAsMap().toMap
     } else {
       partitionSpecsAndLocs.map { case (_, location) =>
         val statuses = fs.listStatus(location, pathFilter)
-        (location.toString, PartitionStatistics(statuses.length, 
statuses.map(_.getLen).sum))
+        (location, PartitionStatistics(statuses.length, 
statuses.map(_.getLen).sum))
       }.toMap
     }
   }
@@ -819,7 +820,7 @@ case class RepairTableCommand(
       spark: SparkSession,
       table: CatalogTable,
       partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
-      partitionStats: Map[String, PartitionStatistics]): Unit = {
+      partitionStats: Map[Path, PartitionStatistics]): Unit = {
     val total = partitionSpecsAndLocs.length
     var done = 0L
     // Hive metastore may not have enough memory to handle millions of 
partitions in single RPC,
@@ -829,7 +830,7 @@ case class RepairTableCommand(
     partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
       val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
       val parts = batch.map { case (spec, location) =>
-        val params = partitionStats.get(location.toString).map {
+        val params = partitionStats.get(location).map {
           case PartitionStatistics(numFiles, totalSize) =>
             // This two fast stat could prevent Hive metastore to list the 
files again.
             Map(NUM_FILES -> numFiles.toString,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to