This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6de923cfdfd [HUDI-5318] Fix partition pruning for clustering 
scheduling (#7366)
6de923cfdfd is described below

commit 6de923cfdfdfcc4d265e3af5e12749295c29bb1c
Author: StreamingFlames <18889897...@163.com>
AuthorDate: Wed Dec 14 09:10:50 2022 +0800

    [HUDI-5318] Fix partition pruning for clustering scheduling (#7366)
    
    Co-authored-by: Nicholas Jiang <programg...@163.com>
---
 .../PartitionAwareClusteringPlanStrategy.java      | 24 ++++----
 .../TestPartitionAwareClusteringPlanStrategy.java  |  2 +-
 .../hudi/procedure/TestClusteringProcedure.scala   | 66 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 7042585f59b..e12d6d27aa2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -75,11 +75,18 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
     HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
     LOG.info("Scheduling clustering for " + metaClient.getBasePath());
     HoodieWriteConfig config = getWriteConfig();
-    List<String> partitionPaths = 
FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), 
metaClient.getBasePath());
 
-    // get matched partitions if set
-    partitionPaths = getMatchedPartitions(config, partitionPaths);
-    // filter the partition paths if needed to reduce list status
+    String partitionSelected = config.getClusteringPartitionSelected();
+    List<String> partitionPaths;
+
+    if (StringUtils.isNullOrEmpty(partitionSelected)) {
+      // get matched partitions if set
+      partitionPaths = getRegexPatternMatchedPartitions(config, 
FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), 
metaClient.getBasePath()));
+      // filter the partition paths if needed to reduce list status
+    } else {
+      partitionPaths = Arrays.asList(partitionSelected.split(","));
+    }
+
     partitionPaths = filterPartitionPaths(partitionPaths);
 
     if (partitionPaths.isEmpty()) {
@@ -118,15 +125,6 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T extends HoodieRecor
         .build());
   }
 
-  public List<String> getMatchedPartitions(HoodieWriteConfig config, 
List<String> partitionPaths) {
-    String partitionSelected = config.getClusteringPartitionSelected();
-    if (!StringUtils.isNullOrEmpty(partitionSelected)) {
-      return Arrays.asList(partitionSelected.split(","));
-    } else {
-      return getRegexPatternMatchedPartitions(config, partitionPaths);
-    }
-  }
-
   public List<String> getRegexPatternMatchedPartitions(HoodieWriteConfig 
config, List<String> partitionPaths) {
     String pattern = config.getClusteringPartitionFilterRegexPattern();
     if (!StringUtils.isNullOrEmpty(pattern)) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index 440bc956153..a053a961105 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -71,7 +71,7 @@ public class TestPartitionAwareClusteringPlanStrategy {
     fakeTimeBasedPartitionsPath.add("20210719");
     fakeTimeBasedPartitionsPath.add("20210721");
 
-    List list = 
strategyTestRegexPattern.getMatchedPartitions(hoodieWriteConfig, 
fakeTimeBasedPartitionsPath);
+    List list = 
strategyTestRegexPattern.getRegexPatternMatchedPartitions(hoodieWriteConfig, 
fakeTimeBasedPartitionsPath);
     assertEquals(2, list.size());
     assertTrue(list.contains("20210721"));
     assertTrue(list.contains("20210723"));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index cc61db4a03d..fa82e419f7b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstant, HoodieTimeline}
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.config.HoodieClusteringConfig
 import org.apache.hudi.{DataSourceReadOptions, HoodieCLIUtils, 
HoodieDataSourceHelpers, HoodieFileIndex}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
 import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, 
StructField, StructType}
@@ -602,6 +603,71 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call run_clustering with partition selected config") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(ts)
+           | location '$basePath'
+     """.stripMargin)
+
+      // Test clustering with PARTITION_SELECTED config set, choose only a 
part of all partitions to schedule
+      {
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1010)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1010)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1011)")
+        spark.sql(s"set 
${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010")
+        // Do
+        val result = spark.sql(s"call run_clustering(table => '$tableName', 
show_involved_partition => true)")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
+        assertResult(1)(result.length)
+        assertResult("ts=1010")(result(0)(3))
+
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1010),
+          Seq(2, "a2", 10.0, 1010),
+          Seq(3, "a3", 10.0, 1011)
+        )
+      }
+
+      // Test clustering with PARTITION_SELECTED config set, choose all 
partitions to schedule
+      {
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1010)")
+        spark.sql(s"insert into $tableName values(5, 'a5', 10, 1011)")
+        spark.sql(s"insert into $tableName values(6, 'a6', 10, 1012)")
+        spark.sql(s"set 
${HoodieClusteringConfig.PARTITION_SELECTED.key()}=ts=1010,ts=1011,ts=1012")
+        val result = spark.sql(s"call run_clustering(table => '$tableName', 
show_involved_partition => true)")
+          .collect()
+          .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2), 
row.getString(3)))
+        assertResult(1)(result.length)
+        assertResult("ts=1010,ts=1011,ts=1012")(result(0)(3))
+
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "a1", 10.0, 1010),
+          Seq(2, "a2", 10.0, 1010),
+          Seq(3, "a3", 10.0, 1011),
+          Seq(4, "a4", 10.0, 1010),
+          Seq(5, "a5", 10.0, 1011),
+          Seq(6, "a6", 10.0, 1012)
+        )
+      }
+    }
+  }
+
   def avgRecord(commitTimeline: HoodieTimeline): Long = {
     var totalByteSize = 0L
     var totalRecordsCount = 0L

Reply via email to