TheR1sing3un commented on code in PR #13206:
URL: https://github.com/apache/hudi/pull/13206#discussion_r2055187233


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala:
##########
@@ -65,42 +64,39 @@ class ShowInvalidParquetProcedure extends BaseProcedure 
with ProcedureBuilder {
     val metadataConfig = HoodieMetadataConfig.newBuilder.enable(false).build
     val metadata = HoodieTableMetadata.create(new 
HoodieSparkEngineContext(jsc), storage, metadataConfig, srcPath)
     val partitionPaths: java.util.List[String] = 
metadata.getPartitionPathWithPathPrefixes(partitions.split(",").toList.asJava)
-    val partitionPathsSize = if (partitionPaths.size() == 0) 1 else 
partitionPaths.size()
     val instantsList = if (StringUtils.isNullOrEmpty(instants)) 
Array.empty[String] else instants.split(",")
+    val fileStatus = partitionPaths.asScala.flatMap(part => {
+      val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
+      HadoopFSUtils.getAllDataFilesInPartition(fs, 
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
+    }).toList
 
-    val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, 
partitionPathsSize)
-    val parquetRdd = javaRdd.rdd.map(part => {
-        val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
-        HadoopFSUtils.getAllDataFilesInPartition(fs, 
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, 
part)).filter(fileStatus => {
-          var isFilter = true
-          if (!instantsList.isEmpty) {
-            val parquetCommitTime = 
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
-            isFilter = instantsList.contains(parquetCommitTime)
-          }
-          isFilter
-        })
-    }).flatMap(_.toList)
-      .filter(status => {
-        val filePath = status.getPath
-        var isInvalid = false
-        if (filePath.toString.endsWith(".parquet")) {
-          try ParquetFileReader.readFooter(storageConf.unwrap(), filePath, 
SKIP_ROW_GROUPS).getFileMetaData catch {
-            case e: Exception =>
-              isInvalid = e.getMessage.contains("is not a Parquet file")
-              if (isInvalid && needDelete) {
-                val fs = HadoopFSUtils.getFs(new Path(srcPath), 
storageConf.unwrap())
-                try {
-                  isInvalid = !fs.delete(filePath, false)
-                } catch {
-                  case ex: Exception =>
-                    isInvalid = true
-                }
+    val parquetRdd = jsc.parallelize(fileStatus, Math.max(fileStatus.size, 
1)).filter(fileStatus => {

Review Comment:
   How about allowing a custom parallelism to be passed here and then 
aggregating the file status into this parallelism? Using the number of 
partitions as the parallelism leads to too low concurrency and a long running 
time for a single task. But will directly using the number of files as the 
concurrency degree result in too many tasks? In some scenarios, tens of 
thousands of files are possible, but tens of thousands of concurrent degrees 
will put a lot of pressure on the task scheduling of spark. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to