fhan688 commented on code in PR #13206:
URL: https://github.com/apache/hudi/pull/13206#discussion_r2055797912


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala:
##########
@@ -65,42 +64,39 @@ class ShowInvalidParquetProcedure extends BaseProcedure 
with ProcedureBuilder {
     val metadataConfig = HoodieMetadataConfig.newBuilder.enable(false).build
     val metadata = HoodieTableMetadata.create(new 
HoodieSparkEngineContext(jsc), storage, metadataConfig, srcPath)
     val partitionPaths: java.util.List[String] = 
metadata.getPartitionPathWithPathPrefixes(partitions.split(",").toList.asJava)
-    val partitionPathsSize = if (partitionPaths.size() == 0) 1 else 
partitionPaths.size()
     val instantsList = if (StringUtils.isNullOrEmpty(instants)) 
Array.empty[String] else instants.split(",")
+    val fileStatus = partitionPaths.asScala.flatMap(part => {
+      val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
+      HadoopFSUtils.getAllDataFilesInPartition(fs, 
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
+    }).toList
 
-    val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, 
partitionPathsSize)
-    val parquetRdd = javaRdd.rdd.map(part => {
-        val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
-        HadoopFSUtils.getAllDataFilesInPartition(fs, 
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, 
part)).filter(fileStatus => {
-          var isFilter = true
-          if (!instantsList.isEmpty) {
-            val parquetCommitTime = 
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
-            isFilter = instantsList.contains(parquetCommitTime)
-          }
-          isFilter
-        })
-    }).flatMap(_.toList)
-      .filter(status => {
-        val filePath = status.getPath
-        var isInvalid = false
-        if (filePath.toString.endsWith(".parquet")) {
-          try ParquetFileReader.readFooter(storageConf.unwrap(), filePath, 
SKIP_ROW_GROUPS).getFileMetaData catch {
-            case e: Exception =>
-              isInvalid = e.getMessage.contains("is not a Parquet file")
-              if (isInvalid && needDelete) {
-                val fs = HadoopFSUtils.getFs(new Path(srcPath), 
storageConf.unwrap())
-                try {
-                  isInvalid = !fs.delete(filePath, false)
-                } catch {
-                  case ex: Exception =>
-                    isInvalid = true
-                }
+    val parquetRdd = jsc.parallelize(fileStatus, Math.max(fileStatus.size, 
1)).filter(fileStatus => {

Review Comment:
   thanks for advising, I'll introduce a custom parallelism for avoiding too 
many tasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to