fhan688 commented on code in PR #13206:
URL: https://github.com/apache/hudi/pull/13206#discussion_r2055797912
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala:
##########
@@ -65,42 +64,39 @@ class ShowInvalidParquetProcedure extends BaseProcedure
with ProcedureBuilder {
val metadataConfig = HoodieMetadataConfig.newBuilder.enable(false).build
val metadata = HoodieTableMetadata.create(new
HoodieSparkEngineContext(jsc), storage, metadataConfig, srcPath)
val partitionPaths: java.util.List[String] =
metadata.getPartitionPathWithPathPrefixes(partitions.split(",").toList.asJava)
- val partitionPathsSize = if (partitionPaths.size() == 0) 1 else
partitionPaths.size()
val instantsList = if (StringUtils.isNullOrEmpty(instants))
Array.empty[String] else instants.split(",")
+ val fileStatus = partitionPaths.asScala.flatMap(part => {
+ val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
+ HadoopFSUtils.getAllDataFilesInPartition(fs,
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
+ }).toList
- val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths,
partitionPathsSize)
- val parquetRdd = javaRdd.rdd.map(part => {
- val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
- HadoopFSUtils.getAllDataFilesInPartition(fs,
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath,
part)).filter(fileStatus => {
- var isFilter = true
- if (!instantsList.isEmpty) {
- val parquetCommitTime =
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
- isFilter = instantsList.contains(parquetCommitTime)
- }
- isFilter
- })
- }).flatMap(_.toList)
- .filter(status => {
- val filePath = status.getPath
- var isInvalid = false
- if (filePath.toString.endsWith(".parquet")) {
- try ParquetFileReader.readFooter(storageConf.unwrap(), filePath,
SKIP_ROW_GROUPS).getFileMetaData catch {
- case e: Exception =>
- isInvalid = e.getMessage.contains("is not a Parquet file")
- if (isInvalid && needDelete) {
- val fs = HadoopFSUtils.getFs(new Path(srcPath),
storageConf.unwrap())
- try {
- isInvalid = !fs.delete(filePath, false)
- } catch {
- case ex: Exception =>
- isInvalid = true
- }
+ val parquetRdd = jsc.parallelize(fileStatus, Math.max(fileStatus.size,
1)).filter(fileStatus => {
Review Comment:
thanks for advising, I'll introduce a custom parallelism for avoiding too
many tasks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]