This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0bbfc0754b0 [HUDI-7185] Fix call show_fsview_all failure error due to
not specify partition path (#10257)
0bbfc0754b0 is described below
commit 0bbfc0754b051490450b9484b69e2bc708ec475b
Author: empcl <[email protected]>
AuthorDate: Thu Dec 7 12:45:25 2023 +0800
[HUDI-7185] Fix call show_fsview_all failure error due to not specify
partition path (#10257)
Co-authored-by: chenlei677 <[email protected]>
---
.../java/org/apache/hudi/common/fs/FSUtils.java | 14 ++++
.../procedures/ShowFileSystemViewProcedure.scala | 11 ++-
.../sql/hudi/procedure/TestFsViewProcedure.scala | 93 ++++++++++++++++++++++
3 files changed, 115 insertions(+), 3 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 770c811a2e8..4e9c7d205cc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -832,6 +832,20 @@ public class FSUtils {
return result;
}
+ public static List<FileStatus> getAllDataFileStatus(FileSystem fs, Path
path) throws IOException {
+ List<FileStatus> statuses = new ArrayList<>();
+ for (FileStatus status : fs.listStatus(path)) {
+ if
(!status.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) {
+ if (status.isDirectory()) {
+ statuses.addAll(getAllDataFileStatus(fs, status.getPath()));
+ } else {
+ statuses.add(status);
+ }
+ }
+ }
+ return statuses;
+ }
+
public static Map<String, Boolean> deleteFilesParallelize(
HoodieTableMetaClient metaClient,
List<String> paths,
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
index 4a7ad91b80c..2f98dc4dd9d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileSystemViewProcedure.scala
@@ -18,12 +18,13 @@
package org.apache.spark.sql.hudi.command.procedures
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.fs.{FSUtils, HoodieWrapperFileSystem}
import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{CompletionTimeQueryView,
HoodieDefaultTimeline, HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util
+import org.apache.hudi.common.util.StringUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
@@ -92,8 +93,12 @@ class ShowFileSystemViewProcedure(showLatest: Boolean)
extends BaseProcedure wit
val basePath = getBasePath(table)
val metaClient =
HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val fs = metaClient.getFs
- val globPath = String.format("%s/%s/*", basePath, globRegex)
- val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new
Path(globPath))
+ val statuses = if (globRegex == PARAMETERS_ALL.apply(6).default) {
+ FSUtils.getAllDataFileStatus(fs, new Path(basePath))
+ } else {
+ val globPath = String.format("%s/%s/*", basePath, globRegex)
+ FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath))
+ }
var timeline: HoodieTimeline = if (excludeCompaction) {
metaClient.getActiveTimeline.getCommitsTimeline
} else {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
index 64da833b9dc..9de1f1b0ee8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestFsViewProcedure.scala
@@ -51,9 +51,102 @@ class TestFsViewProcedure extends
HoodieSparkProcedureTestBase {
assertResult(2) {
result.length
}
+
+ // not specify partition
+ val result1 = spark.sql(
+ s"""call show_fsview_all(table =>
'$tableName')""".stripMargin).collect()
+ assertResult(2){
+ result1.length
+ }
+ }
+ }
+
+ test("Test Call show_fsview_all Procedure For NonPartition") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // Check required fields
+ checkExceptionContain(s"""call show_fsview_all(limit => 10)""")(
+ s"Argument: table is required")
+
+ // collect result for table
+ val result = spark.sql(
+ s"""call show_fsview_all(table => '$tableName', limit =>
10)""".stripMargin).collect()
+ assertResult(2) {
+ result.length
+ }
}
}
+ test("Test Call show_fsview_all Procedure For Three-Level Partition") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | f1 string,
+ | f2 string,
+ | ts long
+ |) using hudi
+ | partitioned by(f1, f2, ts)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 'f11',
'f21',1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 'f12', 'f22',
1500")
+
+ // Check required fields
+ checkExceptionContain(s"""call show_fsview_all(limit => 10)""")(
+ s"Argument: table is required")
+
+ // not specify partition
+ val result = spark.sql(
+ s"""call show_fsview_all(table => '$tableName', limit =>
10)""".stripMargin).collect()
+ assertResult(2) {
+ result.length
+ }
+
+ val result1 = spark.sql(
+ s"""call show_fsview_all(table => '$tableName', path_regex =>
'*/*/*/')""".stripMargin).collect()
+ assertResult(2){
+ result1.length
+ }
+
+ val result2 = spark.sql(
+ s"""call show_fsview_all(table => '$tableName', path_regex =>
'f1=f11/*/*/')""".stripMargin).collect()
+ assertResult(1) {
+ result2.length
+ }
+ }
+ }
+
+
test("Test Call show_fsview_latest Procedure") {
withTempDir { tmp =>
val tableName = generateTableName