This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d243fb88803 [SPARK-43248][SQL] Unnecessary serialize/deserialize of
Path on parallel gather partition stats
d243fb88803 is described below
commit d243fb888039ab3407e88310401d57b054638ab6
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Apr 26 20:40:22 2023 -0700
[SPARK-43248][SQL] Unnecessary serialize/deserialize of Path on parallel
gather partition stats
### What changes were proposed in this pull request?
Remove unnecessary serialize/deserialize of `Path` on parallel gather
partition stats.
### Why are the changes needed?
Simplify code, since `Path` is serializable in Hadoop3.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA.
Closes #40920 from pan3793/SPARK-43248.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../apache/spark/sql/execution/command/ddl.scala | 29 +++++++++++-----------
1 file changed, 15 insertions(+), 14 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index fdd4f10c793..5eae3259729 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -710,7 +710,7 @@ case class RepairTableCommand(
val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter,
threshold)
} else {
- Map.empty[String, PartitionStatistics]
+ Map.empty[Path, PartitionStatistics]
}
logInfo(s"Finished to gather the fast stats for all $total partitions.")
@@ -785,32 +785,33 @@ case class RepairTableCommand(
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
fs: FileSystem,
pathFilter: PathFilter,
- threshold: Int): Map[String, PartitionStatistics] = {
- if (partitionSpecsAndLocs.length > threshold) {
+ threshold: Int): Map[Path, PartitionStatistics] = {
+ val partitionNum = partitionSpecsAndLocs.length
+ if (partitionNum > threshold) {
val hadoopConf = spark.sessionState.newHadoopConf()
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
- val serializedPaths = partitionSpecsAndLocs.map(_._2.toString).toArray
+ val locations = partitionSpecsAndLocs.map(_._2)
// Set the number of parallelism to prevent following file listing from
generating many tasks
// in case of large #defaultParallelism.
- val numParallelism = Math.min(serializedPaths.length,
+ val numParallelism = Math.min(partitionNum,
Math.min(spark.sparkContext.defaultParallelism, 10000))
// gather the fast stats for all the partitions otherwise Hive metastore
will list all the
// files for all the new partitions in sequential way, which is super
slow.
logInfo(s"Gather the fast stats in parallel using $numParallelism
tasks.")
- spark.sparkContext.parallelize(serializedPaths, numParallelism)
- .mapPartitions { paths =>
+ spark.sparkContext.parallelize(locations, numParallelism)
+ .mapPartitions { locationsEachPartition =>
val pathFilter = getPathFilter(serializableConfiguration.value)
- paths.map(new Path(_)).map{ path =>
- val fs = path.getFileSystem(serializableConfiguration.value)
- val statuses = fs.listStatus(path, pathFilter)
- (path.toString, PartitionStatistics(statuses.length,
statuses.map(_.getLen).sum))
+ locationsEachPartition.map { location =>
+ val fs = location.getFileSystem(serializableConfiguration.value)
+ val statuses = fs.listStatus(location, pathFilter)
+ (location, PartitionStatistics(statuses.length,
statuses.map(_.getLen).sum))
}
}.collectAsMap().toMap
} else {
partitionSpecsAndLocs.map { case (_, location) =>
val statuses = fs.listStatus(location, pathFilter)
- (location.toString, PartitionStatistics(statuses.length,
statuses.map(_.getLen).sum))
+ (location, PartitionStatistics(statuses.length,
statuses.map(_.getLen).sum))
}.toMap
}
}
@@ -819,7 +820,7 @@ case class RepairTableCommand(
spark: SparkSession,
table: CatalogTable,
partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
- partitionStats: Map[String, PartitionStatistics]): Unit = {
+ partitionStats: Map[Path, PartitionStatistics]): Unit = {
val total = partitionSpecsAndLocs.length
var done = 0L
// Hive metastore may not have enough memory to handle millions of
partitions in single RPC,
@@ -829,7 +830,7 @@ case class RepairTableCommand(
partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
val parts = batch.map { case (spec, location) =>
- val params = partitionStats.get(location.toString).map {
+ val params = partitionStats.get(location).map {
case PartitionStatistics(numFiles, totalSize) =>
// This two fast stat could prevent Hive metastore to list the
files again.
Map(NUM_FILES -> numFiles.toString,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]