This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6de923cfdfd [HUDI-5318] Fix partition pruning for clustering
scheduling (#7366)
6de923cfdfd is described below
commit 6de923cfdfdfcc4d265e3af5e12749295c29bb1c
Author: StreamingFlames <[email protected]>
AuthorDate: Wed Dec 14 09:10:50 2022 +0800
[HUDI-5318] Fix partition pruning for clustering scheduling (#7366)
Co-authored-by: Nicholas Jiang <[email protected]>
---
.../PartitionAwareClusteringPlanStrategy.java | 24 ++++----
.../TestPartitionAwareClusteringPlanStrategy.java | 2 +-
.../hudi/procedure/TestClusteringProcedure.scala | 66 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 14 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 7042585f59b..e12d6d27aa2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -75,11 +75,18 @@ public abstract class
PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
HoodieWriteConfig config = getWriteConfig();
- List<String> partitionPaths =
FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(),
metaClient.getBasePath());
- // get matched partitions if set
- partitionPaths = getMatchedPartitions(config, partitionPaths);
- // filter the partition paths if needed to reduce list status
+ String partitionSelected = config.getClusteringPartitionSelected();
+ List<String> partitionPaths;
+
+ if (StringUtils.isNullOrEmpty(partitionSelected)) {
+ // get matched partitions if set
+ partitionPaths = getRegexPatternMatchedPartitions(config,
FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(),
metaClient.getBasePath()));
+ // filter the partition paths if needed to reduce list status
+ } else {
+ partitionPaths = Arrays.asList(partitionSelected.split(","));
+ }
+
partitionPaths = filterPartitionPaths(partitionPaths);
if (partitionPaths.isEmpty()) {
@@ -118,15 +125,6 @@ public abstract class
PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
.build());
}
- public List<String> getMatchedPartitions(HoodieWriteConfig config,
List<String> partitionPaths) {
- String partitionSelected = config.getClusteringPartitionSelected();
- if (!StringUtils.isNullOrEmpty(partitionSelected)) {
- return Arrays.asList(partitionSelected.split(","));
- } else {
- return getRegexPatternMatchedPartitions(config, partitionPaths);
- }
- }
-
public List<String> getRegexPatternMatchedPartitions(HoodieWriteConfig
config, List<String> partitionPaths) {
String pattern = config.getClusteringPartitionFilterRegexPattern();
if (!StringUtils.isNullOrEmpty(pattern)) {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index 440bc956153..a053a961105 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -71,7 +71,7 @@ public class TestPartitionAwareClusteringPlanStrategy {
fakeTimeBasedPartitionsPath.add("20210719");
fakeTimeBasedPartitionsPath.add("20210721");
- List list =
strategyTestRegexPattern.getMatchedPartitions(hoodieWriteConfig,
fakeTimeBasedPartitionsPath);
+ List list =
strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig,
fakeTimeBasedPartitionsPath);
assertEquals(2, list.size());
assertTrue(list.contains("20210721"));
assertTrue(list.contains("20210723"));
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index cc61db4a03d..fa82e419f7b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline,
HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils,
HoodieDataSourceHelpers, HoodieFileIndex}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
import org.apache.spark.sql.types.{DataTypes, Metadata, StringType,
StructField, StructType}
@@ -602,6 +603,71 @@ class TestClusteringProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test Call run_clustering with partition selected config") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$basePath'
+ """.stripMargin)
+
+ // Test clustering with PARTITION_SELECTED config set, choose only a
part of all partitions to schedule
+ {
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
+ spark.sql(s"set
${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010")
+ // Do
+ val result = spark.sql(s"call run_clustering(table => '$tableName',
show_involved_partition => true)")
+ .collect()
+ .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2),
row.getString(3)))
+ assertResult(1)(result.length)
+ assertResult("ts=1010")(result(0)(3))
+
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1010),
+ Seq(2, "a2", 10.0, 1010),
+ Seq(3, "a3", 10.0, 1011)
+ )
+ }
+
+ // Test clustering with PARTITION_SELECTED config set, choose all
partitions to schedule
+ {
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
+ spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
+ spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
+ spark.sql(s"set
${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012")
+ val result = spark.sql(s"call run_clustering(table => '$tableName',
show_involved_partition => true)")
+ .collect()
+ .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2),
row.getString(3)))
+ assertResult(1)(result.length)
+ assertResult("ts=1010,ts=1011,ts=1012")(result(0)(3))
+
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1010),
+ Seq(2, "a2", 10.0, 1010),
+ Seq(3, "a3", 10.0, 1011),
+ Seq(4, "a4", 10.0, 1010),
+ Seq(5, "a5", 10.0, 1011),
+ Seq(6, "a6", 10.0, 1012)
+ )
+ }
+ }
+ }
+
def avgRecord(commitTimeline: HoodieTimeline): Long = {
var totalByteSize = 0L
var totalRecordsCount = 0L