This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch release-0.10.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 7d2c2489025fe5b183367b4f4dfbf64804d8c33b Author: Shawy Geng <[email protected]> AuthorDate: Thu Dec 2 09:41:04 2021 +0800 [HUDI-2881] Compact the file group with larger log files to reduce write amplification (#4152) (cherry picked from commit 5284730175df4637eee43b179c774606b07a10a9) --- .../java/org/apache/hudi/config/HoodieCompactionConfig.java | 11 +++++++++++ .../main/java/org/apache/hudi/config/HoodieWriteConfig.java | 4 ++++ .../compact/strategy/LogFileSizeBasedCompactionStrategy.java | 9 +++++++-- .../action/compact/strategy/TestHoodieCompactionStrategy.java | 9 +++++---- 4 files changed, 27 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index e11d060..fbe31b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -171,6 +171,12 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Amount of MBs to spend during compaction run for the LogFileSizeBasedCompactionStrategy. " + "This value helps bound ingestion latency while compaction is run inline mode."); + public static final ConfigProperty<Long> COMPACTION_LOG_FILE_SIZE_THRESHOLD = ConfigProperty + .key("hoodie.compaction.logfile.size.threshold") + .defaultValue(0L) + .withDocumentation("Only if the log file size is greater than the threshold in bytes," + + " the file group will be compacted."); + public static final ConfigProperty<String> COMPACTION_STRATEGY = ConfigProperty .key("hoodie.compaction.strategy") .defaultValue(LogFileSizeBasedCompactionStrategy.class.getName()) @@ -598,6 +604,11 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold) { + compactionConfig.setValue(COMPACTION_LOG_FILE_SIZE_THRESHOLD, String.valueOf(logFileSizeThreshold)); + return this; + } + public Builder withCommitsArchivalBatchSize(int batchSize) { compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9ae22e8..b49108f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1133,6 +1133,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getLong(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB); } + public Long getCompactionLogFileSizeThreshold() { + return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD); + } + public Boolean getCompactionLazyBlockReadEnabled() { return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java index fe4b474..c165141 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java @@ -27,7 +27,8 @@ import java.util.List; import java.util.stream.Collectors; /** - * LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size and limits the + * LogFileSizeBasedCompactionStrategy orders the compactions based on the total log files size, + * filters the file group which log files size is greater than the threshold and limits the * compactions within a configured IO bound. * * @see BoundedIOCompactionStrategy @@ -39,8 +40,12 @@ public class LogFileSizeBasedCompactionStrategy extends BoundedIOCompactionStrat @Override public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) { + // Filter the file group which log files size is greater than the threshold in bytes. // Order the operations based on the reverse size of the logs and limit them by the IO - return super.orderAndFilter(writeConfig, operations.stream().sorted(this).collect(Collectors.toList()), + long threshold = writeConfig.getCompactionLogFileSizeThreshold(); + return super.orderAndFilter(writeConfig, operations.stream() + .filter(e -> e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) >= threshold) + .sorted(this).collect(Collectors.toList()), pendingCompactionPlans); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java index 823eac4..dee1fad 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java @@ -99,19 +99,20 @@ public class TestHoodieCompactionStrategy { sizesMap.put(90 * MB, Collections.singletonList(1024 * MB)); LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy(); HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig( - HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build()) + HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205) + .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build()) .build(); List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap); List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>()); assertTrue(returned.size() < operations.size(), "LogFileSizeBasedCompactionStrategy should have resulted in fewer compactions"); - assertEquals(1, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 1 compaction"); + assertEquals(2, returned.size(), "LogFileSizeBasedCompactionStrategy should have resulted in 2 compaction"); // Total size of all the log files Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB)) .map(Double::longValue).reduce(Long::sum).orElse(0L); - assertEquals(1204, (long) returnedSize, - "Should chose the first 2 compactions which should result in a total IO of 690 MB"); + assertEquals(1594, (long) returnedSize, + "Should chose the first 2 compactions which should result in a total IO of 1594 MB"); } @Test
