This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7320ecd7d5 [HUDI-8214] Support specifying partitions with regex for 
compaction (#11958)
b7320ecd7d5 is described below

commit b7320ecd7d531e41f10d110592c31133c57b02e6
Author: TheR1sing3un <[email protected]>
AuthorDate: Fri Sep 20 11:59:07 2024 +0800

    [HUDI-8214] Support specifying partitions with regex for compaction (#11958)
---
 .../apache/hudi/config/HoodieCompactionConfig.java | 12 ++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../BaseHoodieCompactionPlanGenerator.java         |  4 ++
 .../PartitionRegexBasedCompactionStrategy.java     | 35 ++++++++++++
 .../table/action/compact/TestHoodieCompactor.java  | 65 ++++++++++++++++++++++
 .../strategy/TestHoodieCompactionStrategy.java     | 56 +++++++++++++++++++
 6 files changed, 176 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index f50603343d9..497945f7e9b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -155,6 +155,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("Used by 
org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the 
number of "
           + "latest partitions to compact during a compaction run.");
 
+  public static final ConfigProperty<String> 
COMPACTION_SPECIFY_PARTITION_PATH_REGEX = ConfigProperty
+      .key("hoodie.compaction.partition.path.regex")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("Used to specify the partition path regex for 
compaction. "
+          + "Only partitions that match the regex will be compacted. Only be 
used when configure PartitionRegexBasedCompactionStrategy.");
+
   /**
    * Configs related to specific table types.
    */
@@ -461,6 +468,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withCompactionSpecifyPartitionPathRegex(String 
partitionPathRegex) {
+      compactionConfig.setValue(COMPACTION_SPECIFY_PARTITION_PATH_REGEX, 
partitionPathRegex);
+      return this;
+    }
+
     public HoodieCompactionConfig build() {
       compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
       compactionConfig.setDefaults(HoodieReaderConfig.class.getName());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 15c16078b0f..19b3aa09082 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1736,6 +1736,10 @@ public class HoodieWriteConfig extends HoodieConfig {
         .valueOf(getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY));
   }
 
+  public String getCompactionSpecifyPartitionPathRegex() {
+    return 
getString(HoodieCompactionConfig.COMPACTION_SPECIFY_PARTITION_PATH_REGEX);
+  }
+
   /**
    * Clustering properties.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index e5ac5af9f64..f7390cdd56f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -85,8 +85,12 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     List<String> partitionPaths = FSUtils.getAllPartitionPaths(
         engineContext, metaClient.getStorage(), 
writeConfig.getMetadataConfig(), metaClient.getBasePath());
 
+    int allPartitionSize = partitionPaths.size();
+
     // filter the partition paths if needed to reduce list status
     partitionPaths = filterPartitionPathsByStrategy(writeConfig, 
partitionPaths);
+    LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
+        writeConfig.getCompactionStrategy().getClass().getSimpleName(), 
partitionPaths.size(), allPartitionSize);
     if (partitionPaths.isEmpty()) {
       // In case no partitions could be picked, return no compaction plan
       return null;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
new file mode 100644
index 00000000000..eb3b42ee7ca
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class PartitionRegexBasedCompactionStrategy extends CompactionStrategy {
+
+  @Override
+  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
+    String regex = writeConfig.getCompactionSpecifyPartitionPathRegex();
+    Pattern pattern = Pattern.compile(regex);
+    return 
allPartitionPaths.stream().filter(pattern.asPredicate()).collect(Collectors.toList());
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 167a00babe6..7cbdf4bbf7f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -46,6 +46,7 @@ import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+import 
org.apache.hudi.table.action.compact.strategy.PartitionRegexBasedCompactionStrategy;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
 import com.codahale.metrics.Counter;
@@ -53,11 +54,17 @@ import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -278,6 +285,64 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
     }
   }
 
+  private static Stream<Arguments> regexTestParameters() {
+    Object[][] data = new Object[][] {
+        {
+          ".*", Arrays.asList("2015/03/16", "2015/03/17", "2016/03/15")
+        },
+        {
+          "2017/.*/.*", Collections.emptyList()
+        },
+        {
+          "2015/03/.*", Arrays.asList("2015/03/16", "2015/03/17")
+        },
+        {
+          "2016/.*/.*", Arrays.asList("2016/03/15")
+        }
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("regexTestParameters")
+  public void testCompactionSpecifyPartition(String regex, List<String> 
expectedCompactedPartition) throws Exception {
+    HoodieCompactionConfig.Builder builder = 
HoodieCompactionConfig.newBuilder()
+        .withCompactionStrategy(new 
PartitionRegexBasedCompactionStrategy()).withMaxNumDeltaCommitsBeforeCompaction(1);
+    builder.withCompactionSpecifyPartitionPathRegex(regex);
+    HoodieWriteConfig config = getConfigBuilder()
+        .withCompactionConfig(builder.build())
+        .withMetricsConfig(getMetricsConfig()).build();
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+      String newCommitTime = "100";
+      writeClient.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+      JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+      writeClient.insert(recordsRDD, newCommitTime).collect();
+
+      // update 1 time
+      newCommitTime = "101";
+      updateRecords(config, newCommitTime, records);
+      assertLogFilesNumEqualsTo(config, 1);
+
+      // schedule compaction
+      boolean scheduled = writeClient.scheduleCompactionAtInstant("102", 
Option.empty());
+      if (expectedCompactedPartition.isEmpty()) {
+        assertFalse(scheduled);
+        return;
+      }
+
+      HoodieWriteMetadata result = compact(writeClient, "102");
+
+      assertTrue(result.getWriteStats().isPresent());
+      List<HoodieWriteStat> stats = (List<HoodieWriteStat>) 
result.getWriteStats().get();
+      assertEquals(expectedCompactedPartition.size(), stats.size());
+      expectedCompactedPartition.forEach(expectedPartition -> {
+        assertTrue(stats.stream().anyMatch(stat -> 
stat.getPartitionPath().contentEquals(expectedPartition)));
+      });
+    }
+  }
+
   @Override
   protected HoodieTableType getTableType() {
     return HoodieTableType.MERGE_ON_READ;
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index 83f8101ad70..a67f51face4 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -335,6 +335,62 @@ public class TestHoodieCompactionStrategy {
 
   }
 
+  @Test
+  public void testPartitionRegexBasedCompactionStrategy() {
+    List<String> partitions = Arrays.asList(
+        "2020/01/01",
+        "2020/01/02",
+        "2020/01/03",
+        "2020/02/01",
+        "2021/01/01"
+    );
+
+    HoodieWriteConfig writeConfig = updateRegex(".*");
+    List<String> filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    assertEquals(5, filteredPartitions.size());
+
+    writeConfig = updateRegex("2020/01/01");
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    assertEquals(1, filteredPartitions.size());
+    assertEquals("2020/01/01", filteredPartitions.get(0));
+
+    writeConfig = updateRegex("2020/01/0[1-2]");
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    assertEquals(2, filteredPartitions.size());
+    assertEquals("2020/01/01", filteredPartitions.get(0));
+    assertEquals("2020/01/02", filteredPartitions.get(1));
+    writeConfig = updateRegex("2020/01/0[1-2]|2020/02/01");
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    assertEquals(3, filteredPartitions.size());
+    assertEquals("2020/01/01", filteredPartitions.get(0));
+    assertEquals("2020/01/02", filteredPartitions.get(1));
+    assertEquals("2020/02/01", filteredPartitions.get(2));
+
+
+    writeConfig = updateRegex("2020/.*/01");
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    assertEquals(2, filteredPartitions.size());
+    assertEquals("2020/01/01", filteredPartitions.get(0));
+    assertEquals("2020/02/01", filteredPartitions.get(1));
+
+    writeConfig = updateRegex(".*/01/.*");
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    assertEquals(4, filteredPartitions.size());
+    assertEquals("2020/01/01", filteredPartitions.get(0));
+    assertEquals("2020/01/02", filteredPartitions.get(1));
+    assertEquals("2020/01/03", filteredPartitions.get(2));
+    assertEquals("2021/01/01", filteredPartitions.get(3));
+
+  }
+
+  private HoodieWriteConfig updateRegex(String regex) {
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+        HoodieCompactionConfig.newBuilder()
+            .withCompactionStrategy(new 
PartitionRegexBasedCompactionStrategy())
+            .withCompactionSpecifyPartitionPathRegex(regex).build()).build();
+    return writeConfig;
+  }
+
   private List<HoodieCompactionOperation> 
createCompactionOperations(HoodieWriteConfig config,
                                                                      Map<Long, 
List<Long>> sizesMap) {
     Map<Long, String> keyToPartitionMap = sizesMap.keySet().stream()

Reply via email to