This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d4edee24f6e [HUDI-9334] Optimize Parallelism of show_invalid_parquet 
(#13206)
d4edee24f6e is described below

commit d4edee24f6e5b06ee416d8024369782a1218aff8
Author: fhan <[email protected]>
AuthorDate: Mon Jun 16 19:10:27 2025 +0800

    [HUDI-9334] Optimize Parallelism of show_invalid_parquet (#13206)
    
    * [HUDI-9334] optimize parallelism of show_invalid_parquet
    
    * solve type mismatch
    
    * introduce customParallelism
    
    * rm invalid imports
    
    * switch parallelism to optional
    
    * fix scalastyle
    
    * fix scalastyle
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
---
 .../procedures/ShowInvalidParquetProcedure.scala   | 62 ++++++++++++----------
 .../TestShowInvalidParquetProcedure.scala          | 24 +++++----
 2 files changed, 47 insertions(+), 39 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
index 51c9ca8269c..e052f5b1fc5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -29,7 +29,6 @@ import collection.JavaConverters._
 import org.apache.hadoop.fs.Path
 import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.ParquetFileReader
-import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
 
@@ -38,10 +37,11 @@ import java.util.function.Supplier
 class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.required(0, "path", DataTypes.StringType),
-    ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100),
-    ProcedureParameter.optional(2, "needDelete", DataTypes.BooleanType, false),
-    ProcedureParameter.optional(3, "partitions", DataTypes.StringType, ""),
-    ProcedureParameter.optional(4, "instants", DataTypes.StringType, "")
+    ProcedureParameter.optional(1, "parallelism", DataTypes.IntegerType, 100),
+    ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 100),
+    ProcedureParameter.optional(3, "needDelete", DataTypes.BooleanType, false),
+    ProcedureParameter.optional(4, "partitions", DataTypes.StringType, ""),
+    ProcedureParameter.optional(5, "instants", DataTypes.StringType, "")
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -56,31 +56,34 @@ class ShowInvalidParquetProcedure extends BaseProcedure 
with ProcedureBuilder {
     super.checkArgs(PARAMETERS, args)
 
     val srcPath = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String]
-    val limit = getArgValueOrDefault(args, PARAMETERS(1))
-    val needDelete = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
-    val partitions = getArgValueOrDefault(args, 
PARAMETERS(3)).map(_.toString).getOrElse("")
-    val instants = getArgValueOrDefault(args, 
PARAMETERS(4)).map(_.toString).getOrElse("")
+    val parallelism = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Int]
+    val limit = getArgValueOrDefault(args, PARAMETERS(2))
+    val needDelete = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val partitions = getArgValueOrDefault(args, 
PARAMETERS(4)).map(_.toString).getOrElse("")
+    val instants = getArgValueOrDefault(args, 
PARAMETERS(5)).map(_.toString).getOrElse("")
+
     val storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())
     val storage = new HoodieHadoopStorage(srcPath, storageConf)
     val metadataConfig = HoodieMetadataConfig.newBuilder.enable(false).build
     val metadata = HoodieTableMetadata.create(new 
HoodieSparkEngineContext(jsc), storage, metadataConfig, srcPath)
     val partitionPaths: java.util.List[String] = 
metadata.getPartitionPathWithPathPrefixes(partitions.split(",").toList.asJava)
-    val partitionPathsSize = if (partitionPaths.size() == 0) 1 else 
partitionPaths.size()
     val instantsList = if (StringUtils.isNullOrEmpty(instants)) 
Array.empty[String] else instants.split(",")
+    val fileStatus = partitionPaths.asScala.flatMap(part => {
+      val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
+      HadoopFSUtils.getAllDataFilesInPartition(fs, 
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
+    }).toList
 
-    val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, 
partitionPathsSize)
-    val parquetRdd = javaRdd.rdd.map(part => {
-        val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
-        HadoopFSUtils.getAllDataFilesInPartition(fs, 
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, 
part)).filter(fileStatus => {
-          var isFilter = true
-          if (!instantsList.isEmpty) {
-            val parquetCommitTime = 
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
-            isFilter = instantsList.contains(parquetCommitTime)
-          }
-          isFilter
-        })
-    }).flatMap(_.toList)
-      .filter(status => {
+    if (fileStatus.isEmpty) {
+      Seq.empty
+    } else {
+      val parquetRdd = jsc.parallelize(fileStatus, Math.min(fileStatus.size, 
parallelism)).filter(fileStatus => {
+        if (instantsList.nonEmpty) {
+          val parquetCommitTime = 
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
+          instantsList.contains(parquetCommitTime)
+        } else {
+          true
+        }
+      }).filter(status => {
         val filePath = status.getPath
         var isInvalid = false
         if (filePath.toString.endsWith(".parquet")) {
@@ -99,12 +102,13 @@ class ShowInvalidParquetProcedure extends BaseProcedure 
with ProcedureBuilder {
           }
         }
         isInvalid
-      })
-      .map(status => Row(status.getPath.toString))
-    if (limit.isDefined) {
-      parquetRdd.take(limit.get.asInstanceOf[Int])
-    } else {
-      parquetRdd.collect()
+      }).map(status => Row(status.getPath.toString))
+
+      if (limit.isDefined) {
+        parquetRdd.take(limit.get.asInstanceOf[Int])
+      } else {
+        parquetRdd.collect()
+      }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
index 71945f7c945..d074ab56fcc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
@@ -31,6 +31,7 @@ class TestShowInvalidParquetProcedure extends 
HoodieSparkProcedureTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      val customParallelism = 3
       // create table
       spark.sql(
         s"""
@@ -68,13 +69,13 @@ class TestShowInvalidParquetProcedure extends 
HoodieSparkProcedureTestBase {
 
       // collect result for table
       var result = spark.sql(
-        s"""call show_invalid_parquet(path => 
'$basePath')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism)""".stripMargin).collect()
       assertResult(2) {
         result.length
       }
 
       result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', limit => 
1)""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, limit => 1)""".stripMargin).collect()
       assertResult(1) {
         result.length
       }
@@ -85,6 +86,7 @@ class TestShowInvalidParquetProcedure extends 
HoodieSparkProcedureTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      val customParallelism = 3
       // create table
       spark.sql(
         s"""
@@ -118,7 +120,7 @@ class TestShowInvalidParquetProcedure extends 
HoodieSparkProcedureTestBase {
 
       // collect result for table
       val result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', needDelete => 
true)""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, needDelete => true)""".stripMargin).collect()
       assertResult(0) {
         result.length
       }
@@ -129,6 +131,7 @@ class TestShowInvalidParquetProcedure extends 
HoodieSparkProcedureTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      val customParallelism = 3
       // create table
       spark.sql(
         s"""
@@ -192,42 +195,42 @@ class TestShowInvalidParquetProcedure extends 
HoodieSparkProcedureTestBase {
 
       // collect result for table
       var result = spark.sql(
-        s"""call show_invalid_parquet(path => 
'$basePath')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism)""".stripMargin).collect()
       assertResult(4) {
         result.length
       }
 
       result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', partitions => 
'year=2022')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, partitions => 'year=2022')""".stripMargin).collect()
       assertResult(4) {
         result.length
       }
 
       result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', partitions => 
'year=2022/month=07')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, partitions => 
'year=2022/month=07')""".stripMargin).collect()
       assertResult(2) {
         result.length
       }
 
       result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', partitions => 
'year=2022/month=08/day=30,year=2022/month=08/day=31')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, partitions => 
'year=2022/month=08/day=30,year=2022/month=08/day=31')""".stripMargin).collect()
       assertResult(2) {
         result.length
       }
 
       result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', partitions => 
'year=2023')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, partitions => 'year=2023')""".stripMargin).collect()
       assertResult(0) {
         result.length
       }
 
       result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', instants => 
'$instantTime')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, instants => '$instantTime')""".stripMargin).collect()
       assertResult(4) {
         result.length
       }
       result = spark.sql(
-        s"""call show_invalid_parquet(path => '$basePath', instants => 
'$instantTime', partitions => 'year=2022/month=08')""".stripMargin).collect()
+        s"""call show_invalid_parquet(path => '$basePath', parallelism => 
$customParallelism, instants => '$instantTime', partitions => 
'year=2022/month=08')""".stripMargin).collect()
       assertResult(2) {
         result.length
       }
@@ -235,3 +238,4 @@ class TestShowInvalidParquetProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 }
+

Reply via email to