This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7d2c2489025fe5b183367b4f4dfbf64804d8c33b
Author: Shawy Geng <[email protected]>
AuthorDate: Thu Dec 2 09:41:04 2021 +0800

    [HUDI-2881] Compact the file group with larger log files to reduce write 
amplification (#4152)
    
    (cherry picked from commit 5284730175df4637eee43b179c774606b07a10a9)
---
 .../java/org/apache/hudi/config/HoodieCompactionConfig.java   | 11 +++++++++++
 .../main/java/org/apache/hudi/config/HoodieWriteConfig.java   |  4 ++++
 .../compact/strategy/LogFileSizeBasedCompactionStrategy.java  |  9 +++++++--
 .../action/compact/strategy/TestHoodieCompactionStrategy.java |  9 +++++----
 4 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index e11d060..fbe31b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -171,6 +171,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("Amount of MBs to spend during compaction run for the 
LogFileSizeBasedCompactionStrategy. "
           + "This value helps bound ingestion latency while compaction is run 
inline mode.");
 
+  public static final ConfigProperty<Long> COMPACTION_LOG_FILE_SIZE_THRESHOLD 
= ConfigProperty
+      .key("hoodie.compaction.logfile.size.threshold")
+      .defaultValue(0L)
+      .withDocumentation("Only if the log file size is greater than the 
threshold in bytes,"
+          + " the file group will be compacted.");
+
   public static final ConfigProperty<String> COMPACTION_STRATEGY = 
ConfigProperty
       .key("hoodie.compaction.strategy")
       .defaultValue(LogFileSizeBasedCompactionStrategy.class.getName())
@@ -598,6 +604,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withLogFileSizeThresholdBasedCompaction(long 
logFileSizeThreshold) {
+      compactionConfig.setValue(COMPACTION_LOG_FILE_SIZE_THRESHOLD, 
String.valueOf(logFileSizeThreshold));
+      return this;
+    }
+
     public Builder withCommitsArchivalBatchSize(int batchSize) {
       compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, 
String.valueOf(batchSize));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9ae22e8..b49108f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1133,6 +1133,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getLong(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB);
   }
 
+  public Long getCompactionLogFileSizeThreshold() {
+    return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD);
+  }
+
   public Boolean getCompactionLazyBlockReadEnabled() {
     return 
getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
index fe4b474..c165141 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
@@ -27,7 +27,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 /**
- * LogFileSizeBasedCompactionStrategy orders the compactions based on the 
total log files size and limits the
+ * LogFileSizeBasedCompactionStrategy orders the compactions based on the 
total log files size,
+ * filters the file group which log files size is greater than the threshold 
and limits the
  * compactions within a configured IO bound.
  *
  * @see BoundedIOCompactionStrategy
@@ -39,8 +40,12 @@ public class LogFileSizeBasedCompactionStrategy extends 
BoundedIOCompactionStrat
   @Override
   public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig,
       List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans) {
+    // Filter the file group which log files size is greater than the 
threshold in bytes.
     // Order the operations based on the reverse size of the logs and limit 
them by the IO
-    return super.orderAndFilter(writeConfig, 
operations.stream().sorted(this).collect(Collectors.toList()),
+    long threshold = writeConfig.getCompactionLogFileSizeThreshold();
+    return super.orderAndFilter(writeConfig, operations.stream()
+            .filter(e -> e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) 
>= threshold)
+            .sorted(this).collect(Collectors.toList()),
         pendingCompactionPlans);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index 823eac4..dee1fad 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -99,19 +99,20 @@ public class TestHoodieCompactionStrategy {
     sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
     LogFileSizeBasedCompactionStrategy strategy = new 
LogFileSizeBasedCompactionStrategy();
     HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
-        
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
+        
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
+            .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 
1024).build())
         .build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
     List<HoodieCompactionOperation> returned = 
strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
 
     assertTrue(returned.size() < operations.size(),
         "LogFileSizeBasedCompactionStrategy should have resulted in fewer 
compactions");
-    assertEquals(1, returned.size(), "LogFileSizeBasedCompactionStrategy 
should have resulted in 1 compaction");
+    assertEquals(2, returned.size(), "LogFileSizeBasedCompactionStrategy 
should have resulted in 2 compaction");
     // Total size of all the log files
     Long returnedSize = returned.stream().map(s -> 
s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB))
         .map(Double::longValue).reduce(Long::sum).orElse(0L);
-    assertEquals(1204, (long) returnedSize,
-        "Should chose the first 2 compactions which should result in a total 
IO of 690 MB");
+    assertEquals(1594, (long) returnedSize,
+        "Should chose the first 2 compactions which should result in a total 
IO of 1594 MB");
   }
 
   @Test

Reply via email to