TheR1sing3un commented on code in PR #13206:
URL: https://github.com/apache/hudi/pull/13206#discussion_r2055187233
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala:
##########
@@ -65,42 +64,39 @@ class ShowInvalidParquetProcedure extends BaseProcedure
with ProcedureBuilder {
val metadataConfig = HoodieMetadataConfig.newBuilder.enable(false).build
val metadata = HoodieTableMetadata.create(new
HoodieSparkEngineContext(jsc), storage, metadataConfig, srcPath)
val partitionPaths: java.util.List[String] =
metadata.getPartitionPathWithPathPrefixes(partitions.split(",").toList.asJava)
- val partitionPathsSize = if (partitionPaths.size() == 0) 1 else
partitionPaths.size()
val instantsList = if (StringUtils.isNullOrEmpty(instants))
Array.empty[String] else instants.split(",")
+ val fileStatus = partitionPaths.asScala.flatMap(part => {
+ val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
+ HadoopFSUtils.getAllDataFilesInPartition(fs,
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
+ }).toList
- val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths,
partitionPathsSize)
- val parquetRdd = javaRdd.rdd.map(part => {
- val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
- HadoopFSUtils.getAllDataFilesInPartition(fs,
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath,
part)).filter(fileStatus => {
- var isFilter = true
- if (!instantsList.isEmpty) {
- val parquetCommitTime =
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
- isFilter = instantsList.contains(parquetCommitTime)
- }
- isFilter
- })
- }).flatMap(_.toList)
- .filter(status => {
- val filePath = status.getPath
- var isInvalid = false
- if (filePath.toString.endsWith(".parquet")) {
- try ParquetFileReader.readFooter(storageConf.unwrap(), filePath,
SKIP_ROW_GROUPS).getFileMetaData catch {
- case e: Exception =>
- isInvalid = e.getMessage.contains("is not a Parquet file")
- if (isInvalid && needDelete) {
- val fs = HadoopFSUtils.getFs(new Path(srcPath),
storageConf.unwrap())
- try {
- isInvalid = !fs.delete(filePath, false)
- } catch {
- case ex: Exception =>
- isInvalid = true
- }
+ val parquetRdd = jsc.parallelize(fileStatus, Math.max(fileStatus.size,
1)).filter(fileStatus => {
Review Comment:
How about allowing a custom parallelism to be passed here and then
aggregating the file status into this parallelism? Using the number of
partitions as the parallelism leads to too low concurrency and a long running
time for a single task. But will directly using the number of files as the
concurrency degree result in too many tasks? In some scenarios, tens of
thousands of files are possible, but tens of thousands of concurrent degrees
will put a lot of pressure on the task scheduling of spark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]