This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d4edee24f6e [HUDI-9334] Optimize Parallelism of show_invalid_parquet
(#13206)
d4edee24f6e is described below
commit d4edee24f6e5b06ee416d8024369782a1218aff8
Author: fhan <[email protected]>
AuthorDate: Mon Jun 16 19:10:27 2025 +0800
[HUDI-9334] Optimize Parallelism of show_invalid_parquet (#13206)
* [HUDI-9334] optimize parallelism of show_invalid_parquet
* solve type mismatch
* introduce customParallelism
* rm invalid imports
* switch parallelism to optional
* fix scalastyle
* fix scalastyle
---------
Co-authored-by: fhan <[email protected]>
---
.../procedures/ShowInvalidParquetProcedure.scala | 62 ++++++++++++----------
.../TestShowInvalidParquetProcedure.scala | 24 +++++----
2 files changed, 47 insertions(+), 39 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
index 51c9ca8269c..e052f5b1fc5 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -29,7 +29,6 @@ import collection.JavaConverters._
import org.apache.hadoop.fs.Path
import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop.ParquetFileReader
-import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
@@ -38,10 +37,11 @@ import java.util.function.Supplier
class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "path", DataTypes.StringType),
- ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 100),
- ProcedureParameter.optional(2, "needDelete", DataTypes.BooleanType, false),
- ProcedureParameter.optional(3, "partitions", DataTypes.StringType, ""),
- ProcedureParameter.optional(4, "instants", DataTypes.StringType, "")
+ ProcedureParameter.optional(1, "parallelism", DataTypes.IntegerType, 100),
+ ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 100),
+ ProcedureParameter.optional(3, "needDelete", DataTypes.BooleanType, false),
+ ProcedureParameter.optional(4, "partitions", DataTypes.StringType, ""),
+ ProcedureParameter.optional(5, "instants", DataTypes.StringType, "")
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -56,31 +56,34 @@ class ShowInvalidParquetProcedure extends BaseProcedure
with ProcedureBuilder {
super.checkArgs(PARAMETERS, args)
val srcPath = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
- val limit = getArgValueOrDefault(args, PARAMETERS(1))
- val needDelete = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
- val partitions = getArgValueOrDefault(args,
PARAMETERS(3)).map(_.toString).getOrElse("")
- val instants = getArgValueOrDefault(args,
PARAMETERS(4)).map(_.toString).getOrElse("")
+ val parallelism = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[Int]
+ val limit = getArgValueOrDefault(args, PARAMETERS(2))
+ val needDelete = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val partitions = getArgValueOrDefault(args,
PARAMETERS(4)).map(_.toString).getOrElse("")
+ val instants = getArgValueOrDefault(args,
PARAMETERS(5)).map(_.toString).getOrElse("")
+
val storageConf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())
val storage = new HoodieHadoopStorage(srcPath, storageConf)
val metadataConfig = HoodieMetadataConfig.newBuilder.enable(false).build
val metadata = HoodieTableMetadata.create(new
HoodieSparkEngineContext(jsc), storage, metadataConfig, srcPath)
val partitionPaths: java.util.List[String] =
metadata.getPartitionPathWithPathPrefixes(partitions.split(",").toList.asJava)
- val partitionPathsSize = if (partitionPaths.size() == 0) 1 else
partitionPaths.size()
val instantsList = if (StringUtils.isNullOrEmpty(instants))
Array.empty[String] else instants.split(",")
+ val fileStatus = partitionPaths.asScala.flatMap(part => {
+ val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
+ HadoopFSUtils.getAllDataFilesInPartition(fs,
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath, part))
+ }).toList
- val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths,
partitionPathsSize)
- val parquetRdd = javaRdd.rdd.map(part => {
- val fs = HadoopFSUtils.getFs(new Path(srcPath), storageConf.unwrap())
- HadoopFSUtils.getAllDataFilesInPartition(fs,
HadoopFSUtils.constructAbsolutePathInHadoopPath(srcPath,
part)).filter(fileStatus => {
- var isFilter = true
- if (!instantsList.isEmpty) {
- val parquetCommitTime =
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
- isFilter = instantsList.contains(parquetCommitTime)
- }
- isFilter
- })
- }).flatMap(_.toList)
- .filter(status => {
+ if (fileStatus.isEmpty) {
+ Seq.empty
+ } else {
+ val parquetRdd = jsc.parallelize(fileStatus, Math.min(fileStatus.size,
parallelism)).filter(fileStatus => {
+ if (instantsList.nonEmpty) {
+ val parquetCommitTime =
FSUtils.getCommitTimeWithFullPath(fileStatus.getPath.toString)
+ instantsList.contains(parquetCommitTime)
+ } else {
+ true
+ }
+ }).filter(status => {
val filePath = status.getPath
var isInvalid = false
if (filePath.toString.endsWith(".parquet")) {
@@ -99,12 +102,13 @@ class ShowInvalidParquetProcedure extends BaseProcedure
with ProcedureBuilder {
}
}
isInvalid
- })
- .map(status => Row(status.getPath.toString))
- if (limit.isDefined) {
- parquetRdd.take(limit.get.asInstanceOf[Int])
- } else {
- parquetRdd.collect()
+ }).map(status => Row(status.getPath.toString))
+
+ if (limit.isDefined) {
+ parquetRdd.take(limit.get.asInstanceOf[Int])
+ } else {
+ parquetRdd.collect()
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
index 71945f7c945..d074ab56fcc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
@@ -31,6 +31,7 @@ class TestShowInvalidParquetProcedure extends
HoodieSparkProcedureTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ val customParallelism = 3
// create table
spark.sql(
s"""
@@ -68,13 +69,13 @@ class TestShowInvalidParquetProcedure extends
HoodieSparkProcedureTestBase {
// collect result for table
var result = spark.sql(
- s"""call show_invalid_parquet(path =>
'$basePath')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism)""".stripMargin).collect()
assertResult(2) {
result.length
}
result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', limit =>
1)""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, limit => 1)""".stripMargin).collect()
assertResult(1) {
result.length
}
@@ -85,6 +86,7 @@ class TestShowInvalidParquetProcedure extends
HoodieSparkProcedureTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ val customParallelism = 3
// create table
spark.sql(
s"""
@@ -118,7 +120,7 @@ class TestShowInvalidParquetProcedure extends
HoodieSparkProcedureTestBase {
// collect result for table
val result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', needDelete =>
true)""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, needDelete => true)""".stripMargin).collect()
assertResult(0) {
result.length
}
@@ -129,6 +131,7 @@ class TestShowInvalidParquetProcedure extends
HoodieSparkProcedureTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ val customParallelism = 3
// create table
spark.sql(
s"""
@@ -192,42 +195,42 @@ class TestShowInvalidParquetProcedure extends
HoodieSparkProcedureTestBase {
// collect result for table
var result = spark.sql(
- s"""call show_invalid_parquet(path =>
'$basePath')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism)""".stripMargin).collect()
assertResult(4) {
result.length
}
result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', partitions =>
'year=2022')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, partitions => 'year=2022')""".stripMargin).collect()
assertResult(4) {
result.length
}
result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', partitions =>
'year=2022/month=07')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, partitions =>
'year=2022/month=07')""".stripMargin).collect()
assertResult(2) {
result.length
}
result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', partitions =>
'year=2022/month=08/day=30,year=2022/month=08/day=31')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, partitions =>
'year=2022/month=08/day=30,year=2022/month=08/day=31')""".stripMargin).collect()
assertResult(2) {
result.length
}
result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', partitions =>
'year=2023')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, partitions => 'year=2023')""".stripMargin).collect()
assertResult(0) {
result.length
}
result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', instants =>
'$instantTime')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, instants => '$instantTime')""".stripMargin).collect()
assertResult(4) {
result.length
}
result = spark.sql(
- s"""call show_invalid_parquet(path => '$basePath', instants =>
'$instantTime', partitions => 'year=2022/month=08')""".stripMargin).collect()
+ s"""call show_invalid_parquet(path => '$basePath', parallelism =>
$customParallelism, instants => '$instantTime', partitions =>
'year=2022/month=08')""".stripMargin).collect()
assertResult(2) {
result.length
}
@@ -235,3 +238,4 @@ class TestShowInvalidParquetProcedure extends
HoodieSparkProcedureTestBase {
}
}
}
+