This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b7320ecd7d5 [HUDI-8214] Support specifying partitions with regex for
compaction (#11958)
b7320ecd7d5 is described below
commit b7320ecd7d531e41f10d110592c31133c57b02e6
Author: TheR1sing3un <[email protected]>
AuthorDate: Fri Sep 20 11:59:07 2024 +0800
[HUDI-8214] Support specifying partitions with regex for compaction (#11958)
---
.../apache/hudi/config/HoodieCompactionConfig.java | 12 ++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 ++
.../BaseHoodieCompactionPlanGenerator.java | 4 ++
.../PartitionRegexBasedCompactionStrategy.java | 35 ++++++++++++
.../table/action/compact/TestHoodieCompactor.java | 65 ++++++++++++++++++++++
.../strategy/TestHoodieCompactionStrategy.java | 56 +++++++++++++++++++
6 files changed, 176 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index f50603343d9..497945f7e9b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -155,6 +155,13 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Used by
org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the
number of "
+ "latest partitions to compact during a compaction run.");
+ public static final ConfigProperty<String>
COMPACTION_SPECIFY_PARTITION_PATH_REGEX = ConfigProperty
+ .key("hoodie.compaction.partition.path.regex")
+ .noDefaultValue()
+ .markAdvanced()
+ .withDocumentation("Used to specify the partition path regex for
compaction. "
+ + "Only partitions that match the regex will be compacted. Only be
used when configure PartitionRegexBasedCompactionStrategy.");
+
/**
* Configs related to specific table types.
*/
@@ -461,6 +468,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
+ public Builder withCompactionSpecifyPartitionPathRegex(String
partitionPathRegex) {
+ compactionConfig.setValue(COMPACTION_SPECIFY_PARTITION_PATH_REGEX,
partitionPathRegex);
+ return this;
+ }
+
public HoodieCompactionConfig build() {
compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
compactionConfig.setDefaults(HoodieReaderConfig.class.getName());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 15c16078b0f..19b3aa09082 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1736,6 +1736,10 @@ public class HoodieWriteConfig extends HoodieConfig {
.valueOf(getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY));
}
+ public String getCompactionSpecifyPartitionPathRegex() {
+ return
getString(HoodieCompactionConfig.COMPACTION_SPECIFY_PARTITION_PATH_REGEX);
+ }
+
/**
* Clustering properties.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index e5ac5af9f64..f7390cdd56f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -85,8 +85,12 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
List<String> partitionPaths = FSUtils.getAllPartitionPaths(
engineContext, metaClient.getStorage(),
writeConfig.getMetadataConfig(), metaClient.getBasePath());
+ int allPartitionSize = partitionPaths.size();
+
// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPathsByStrategy(writeConfig,
partitionPaths);
+ LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
+ writeConfig.getCompactionStrategy().getClass().getSimpleName(),
partitionPaths.size(), allPartitionSize);
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
new file mode 100644
index 00000000000..eb3b42ee7ca
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class PartitionRegexBasedCompactionStrategy extends CompactionStrategy {
+
+ @Override
+ public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig,
List<String> allPartitionPaths) {
+ String regex = writeConfig.getCompactionSpecifyPartitionPathRegex();
+ Pattern pattern = Pattern.compile(regex);
+ return
allPartitionPaths.stream().filter(pattern.asPredicate()).collect(Collectors.toList());
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 167a00babe6..7cbdf4bbf7f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -46,6 +46,7 @@ import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import
org.apache.hudi.table.action.compact.strategy.PartitionRegexBasedCompactionStrategy;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import com.codahale.metrics.Counter;
@@ -53,11 +54,17 @@ import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -278,6 +285,64 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
}
}
+ private static Stream<Arguments> regexTestParameters() {
+ Object[][] data = new Object[][] {
+ {
+ ".*", Arrays.asList("2015/03/16", "2015/03/17", "2016/03/15")
+ },
+ {
+ "2017/.*/.*", Collections.emptyList()
+ },
+ {
+ "2015/03/.*", Arrays.asList("2015/03/16", "2015/03/17")
+ },
+ {
+ "2016/.*/.*", Arrays.asList("2016/03/15")
+ }
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("regexTestParameters")
+ public void testCompactionSpecifyPartition(String regex, List<String>
expectedCompactedPartition) throws Exception {
+ HoodieCompactionConfig.Builder builder =
HoodieCompactionConfig.newBuilder()
+ .withCompactionStrategy(new
PartitionRegexBasedCompactionStrategy()).withMaxNumDeltaCommitsBeforeCompaction(1);
+ builder.withCompactionSpecifyPartitionPathRegex(regex);
+ HoodieWriteConfig config = getConfigBuilder()
+ .withCompactionConfig(builder.build())
+ .withMetricsConfig(getMetricsConfig()).build();
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+ String newCommitTime = "100";
+ writeClient.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+ JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
+ writeClient.insert(recordsRDD, newCommitTime).collect();
+
+ // update 1 time
+ newCommitTime = "101";
+ updateRecords(config, newCommitTime, records);
+ assertLogFilesNumEqualsTo(config, 1);
+
+ // schedule compaction
+ boolean scheduled = writeClient.scheduleCompactionAtInstant("102",
Option.empty());
+ if (expectedCompactedPartition.isEmpty()) {
+ assertFalse(scheduled);
+ return;
+ }
+
+ HoodieWriteMetadata result = compact(writeClient, "102");
+
+ assertTrue(result.getWriteStats().isPresent());
+ List<HoodieWriteStat> stats = (List<HoodieWriteStat>)
result.getWriteStats().get();
+ assertEquals(expectedCompactedPartition.size(), stats.size());
+ expectedCompactedPartition.forEach(expectedPartition -> {
+ assertTrue(stats.stream().anyMatch(stat ->
stat.getPartitionPath().contentEquals(expectedPartition)));
+ });
+ }
+ }
+
@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index 83f8101ad70..a67f51face4 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -335,6 +335,62 @@ public class TestHoodieCompactionStrategy {
}
+ @Test
+ public void testPartitionRegexBasedCompactionStrategy() {
+ List<String> partitions = Arrays.asList(
+ "2020/01/01",
+ "2020/01/02",
+ "2020/01/03",
+ "2020/02/01",
+ "2021/01/01"
+ );
+
+ HoodieWriteConfig writeConfig = updateRegex(".*");
+ List<String> filteredPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitions);
+ assertEquals(5, filteredPartitions.size());
+
+ writeConfig = updateRegex("2020/01/01");
+ filteredPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitions);
+ assertEquals(1, filteredPartitions.size());
+ assertEquals("2020/01/01", filteredPartitions.get(0));
+
+ writeConfig = updateRegex("2020/01/0[1-2]");
+ filteredPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitions);
+ assertEquals(2, filteredPartitions.size());
+ assertEquals("2020/01/01", filteredPartitions.get(0));
+ assertEquals("2020/01/02", filteredPartitions.get(1));
+ writeConfig = updateRegex("2020/01/0[1-2]|2020/02/01");
+ filteredPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitions);
+ assertEquals(3, filteredPartitions.size());
+ assertEquals("2020/01/01", filteredPartitions.get(0));
+ assertEquals("2020/01/02", filteredPartitions.get(1));
+ assertEquals("2020/02/01", filteredPartitions.get(2));
+
+
+ writeConfig = updateRegex("2020/.*/01");
+ filteredPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitions);
+ assertEquals(2, filteredPartitions.size());
+ assertEquals("2020/01/01", filteredPartitions.get(0));
+ assertEquals("2020/02/01", filteredPartitions.get(1));
+
+ writeConfig = updateRegex(".*/01/.*");
+ filteredPartitions =
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig,
partitions);
+ assertEquals(4, filteredPartitions.size());
+ assertEquals("2020/01/01", filteredPartitions.get(0));
+ assertEquals("2020/01/02", filteredPartitions.get(1));
+ assertEquals("2020/01/03", filteredPartitions.get(2));
+ assertEquals("2021/01/01", filteredPartitions.get(3));
+
+ }
+
+ private HoodieWriteConfig updateRegex(String regex) {
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+ HoodieCompactionConfig.newBuilder()
+ .withCompactionStrategy(new
PartitionRegexBasedCompactionStrategy())
+ .withCompactionSpecifyPartitionPathRegex(regex).build()).build();
+ return writeConfig;
+ }
+
private List<HoodieCompactionOperation>
createCompactionOperations(HoodieWriteConfig config,
Map<Long,
List<Long>> sizesMap) {
Map<Long, String> keyToPartitionMap = sizesMap.keySet().stream()