This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9431aab  [HUDI-1381] Schedule compaction based on time elapsed (#2260)
9431aab is described below

commit 9431aabfab47a3b679bd6ccfa8b5fa584260fc9e
Author: Karl_Wang <[email protected]>
AuthorDate: Wed Feb 17 23:44:53 2021 +0800

    [HUDI-1381] Schedule compaction based on time elapsed (#2260)
    
    - introduce configs to control how compaction is triggered
    - Compaction can be triggered using time, number of delta commits and/or 
combinations
    - Default behaviour remains the same.
---
 .../apache/hudi/config/HoodieCompactionConfig.java |  20 +++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   9 +
 .../action/compact/CompactionTriggerStrategy.java  |  30 ++++
 .../SparkScheduleCompactionActionExecutor.java     | 103 +++++++++---
 .../table/action/compact/TestInlineCompaction.java | 186 +++++++++++++++++++--
 .../table/timeline/HoodieActiveTimeline.java       |  10 +-
 .../hudi/exception/HoodieCompactException.java     |  30 ++++
 7 files changed, 348 insertions(+), 40 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 08f3774..934d91a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.DefaultHoodieConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
 
@@ -46,6 +47,9 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
   public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
   // Run a compaction every N delta commits
   public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = 
"hoodie.compact.inline.max.delta.commits";
+  // Run a compaction when time elapsed > N seconds since last compaction
+  public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = 
"hoodie.compact.inline.max.delta.seconds";
+  public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = 
"hoodie.compact.inline.trigger.strategy";
   public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = 
"hoodie.cleaner.fileversions.retained";
   public static final String CLEANER_COMMITS_RETAINED_PROP = 
"hoodie.cleaner.commits.retained";
   public static final String CLEANER_INCREMENTAL_MODE = 
"hoodie.cleaner.incremental.mode";
@@ -109,6 +113,8 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
   private static final String DEFAULT_INLINE_COMPACT = "false";
   private static final String DEFAULT_INCREMENTAL_CLEANER = "true";
   private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5";
+  private static final String DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS = 
String.valueOf(60 * 60);
+  private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY = 
CompactionTriggerStrategy.NUM_COMMITS.name();
   private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
   private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
   private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
@@ -164,6 +170,11 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder 
withInlineCompactionTriggerStrategy(CompactionTriggerStrategy 
compactionTriggerStrategy) {
+      props.setProperty(INLINE_COMPACT_TRIGGER_STRATEGY_PROP, 
compactionTriggerStrategy.name());
+      return this;
+    }
+
     public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
       props.setProperty(CLEANER_POLICY_PROP, policy.name());
       return this;
@@ -235,6 +246,11 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withMaxDeltaSecondsBeforeCompaction(int 
maxDeltaSecondsBeforeCompaction) {
+      props.setProperty(INLINE_COMPACT_TIME_DELTA_SECONDS_PROP, 
String.valueOf(maxDeltaSecondsBeforeCompaction));
+      return this;
+    }
+
     public Builder withCompactionLazyBlockReadEnabled(Boolean 
compactionLazyBlockReadEnabled) {
       props.setProperty(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, 
String.valueOf(compactionLazyBlockReadEnabled));
       return this;
@@ -271,6 +287,10 @@ public class HoodieCompactionConfig extends 
DefaultHoodieConfig {
           DEFAULT_INLINE_COMPACT);
       setDefaultOnCondition(props, 
!props.containsKey(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP),
           INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 
DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS);
+      setDefaultOnCondition(props, 
!props.containsKey(INLINE_COMPACT_TIME_DELTA_SECONDS_PROP),
+          INLINE_COMPACT_TIME_DELTA_SECONDS_PROP, 
DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS);
+      setDefaultOnCondition(props, 
!props.containsKey(INLINE_COMPACT_TRIGGER_STRATEGY_PROP),
+          INLINE_COMPACT_TRIGGER_STRATEGY_PROP, 
DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY);
       setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP), 
CLEANER_POLICY_PROP,
           DEFAULT_CLEANER_POLICY);
       setDefaultOnCondition(props, 
!props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP),
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index cadb2d1..e3c1ef6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -34,6 +34,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.apache.hadoop.hbase.io.compress.Compression;
@@ -405,10 +406,18 @@ public class HoodieWriteConfig extends 
DefaultHoodieConfig {
     return 
Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_PROP));
   }
 
+  public CompactionTriggerStrategy getInlineCompactTriggerStrategy() {
+    return 
CompactionTriggerStrategy.valueOf(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY_PROP));
+  }
+
   public int getInlineCompactDeltaCommitMax() {
     return 
Integer.parseInt(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP));
   }
 
+  public int getInlineCompactDeltaSecondsMax() {
+    return 
Integer.parseInt(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS_PROP));
+  }
+
   public CompactionStrategy getCompactionStrategy() {
     return 
ReflectionUtils.loadClass(props.getProperty(HoodieCompactionConfig.COMPACTION_STRATEGY_PROP));
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java
new file mode 100644
index 0000000..6a4e634
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+public enum CompactionTriggerStrategy {
+    // trigger compaction when reach N delta commits
+    NUM_COMMITS,
+    // trigger compaction when time elapsed > N seconds since last compaction
+    TIME_ELAPSED,
+    // trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied
+    NUM_AND_TIME,
+    // trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied
+    NUM_OR_TIME
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
index 34db0a7..9c44499 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -37,6 +39,7 @@ import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 
 import java.io.IOException;
+import java.text.ParseException;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -58,36 +61,92 @@ public class SparkScheduleCompactionActionExecutor<T 
extends HoodieRecordPayload
   @Override
   protected HoodieCompactionPlan scheduleCompaction() {
     LOG.info("Checking if compaction needs to be run on " + 
config.getBasePath());
+    // judge if we need to compact according to num delta commits and time 
elapsed
+    boolean compactable = 
needCompact(config.getInlineCompactTriggerStrategy());
+    if (compactable) {
+      LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
+      HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+      try {
+        SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
+        Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
+            .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
+            .collect(Collectors.toSet());
+        // exclude files in pending clustering from compaction.
+        
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+        return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
+      } catch (IOException e) {
+        throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      }
+    }
+
+    return new HoodieCompactionPlan();
+  }
+
+  public Pair<Integer, String> 
getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
     Option<HoodieInstant> lastCompaction = 
table.getActiveTimeline().getCommitTimeline()
         .filterCompletedInstants().lastInstant();
-    String lastCompactionTs = "0";
+    HoodieTimeline deltaCommits = 
table.getActiveTimeline().getDeltaCommitTimeline();
+
+    String latestInstantTs;
+    int deltaCommitsSinceLastCompaction = 0;
     if (lastCompaction.isPresent()) {
-      lastCompactionTs = lastCompaction.get().getTimestamp();
+      latestInstantTs = lastCompaction.get().getTimestamp();
+      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfter(latestInstantTs, 
Integer.MAX_VALUE).countInstants();
+    } else {
+      latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
+      deltaCommitsSinceLastCompaction = 
deltaCommits.findInstantsAfterOrEquals(latestInstantTs, 
Integer.MAX_VALUE).countInstants();
     }
+    return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
+  }
 
-    int deltaCommitsSinceLastCompaction = 
table.getActiveTimeline().getDeltaCommitTimeline()
-        .findInstantsAfter(lastCompactionTs, 
Integer.MAX_VALUE).countInstants();
-    if (config.getInlineCompactDeltaCommitMax() > 
deltaCommitsSinceLastCompaction) {
-      LOG.info("Not scheduling compaction as only " + 
deltaCommitsSinceLastCompaction
-          + " delta commits was found since last compaction " + 
lastCompactionTs + ". Waiting for "
-          + config.getInlineCompactDeltaCommitMax());
-      return new HoodieCompactionPlan();
+  public boolean needCompact(CompactionTriggerStrategy 
compactionTriggerStrategy) {
+    boolean compactable;
+    // get deltaCommitsSinceLastCompaction and lastCompactionTs
+    Pair<Integer, String> latestDeltaCommitInfo = 
getLatestDeltaCommitInfo(compactionTriggerStrategy);
+    int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
+    int inlineCompactDeltaSecondsMax = 
config.getInlineCompactDeltaSecondsMax();
+    switch (compactionTriggerStrategy) {
+      case NUM_COMMITS:
+        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfo.getLeft();
+        if (compactable) {
+          LOG.info(String.format("The delta commits >= %s, trigger compaction 
scheduler.", inlineCompactDeltaCommitMax));
+        }
+        break;
+      case TIME_ELAPSED:
+        compactable = inlineCompactDeltaSecondsMax <= 
parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfo.getRight());
+        if (compactable) {
+          LOG.info(String.format("The elapsed time >=%ss, trigger compaction 
scheduler.", inlineCompactDeltaSecondsMax));
+        }
+        break;
+      case NUM_OR_TIME:
+        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfo.getLeft()
+            || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfo.getRight());
+        if (compactable) {
+          LOG.info(String.format("The delta commits >= %s or elapsed_time 
>=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
+              inlineCompactDeltaSecondsMax));
+        }
+        break;
+      case NUM_AND_TIME:
+        compactable = inlineCompactDeltaCommitMax <= 
latestDeltaCommitInfo.getLeft()
+            && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - 
parsedToSeconds(latestDeltaCommitInfo.getRight());
+        if (compactable) {
+          LOG.info(String.format("The delta commits >= %s and elapsed_time 
>=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
+              inlineCompactDeltaSecondsMax));
+        }
+        break;
+      default:
+        throw new HoodieCompactionException("Unsupported compaction trigger 
strategy: " + config.getInlineCompactTriggerStrategy());
     }
+    return compactable;
+  }
 
-    LOG.info("Generating compaction plan for merge on read table " + 
config.getBasePath());
-    HoodieSparkMergeOnReadTableCompactor compactor = new 
HoodieSparkMergeOnReadTableCompactor();
+  public Long parsedToSeconds(String time) {
+    long timestamp;
     try {
-      SyncableFileSystemView fileSystemView = (SyncableFileSystemView) 
table.getSliceView();
-      Set<HoodieFileGroupId> fgInPendingCompactionAndClustering = 
fileSystemView.getPendingCompactionOperations()
-          .map(instantTimeOpPair -> 
instantTimeOpPair.getValue().getFileGroupId())
-          .collect(Collectors.toSet());
-      // exclude files in pending clustering from compaction.
-      
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
-      return compactor.generateCompactionPlan(context, table, config, 
instantTime, fgInPendingCompactionAndClustering);
-
-    } catch (IOException e) {
-      throw new HoodieCompactionException("Could not schedule compaction " + 
config.getBasePath(), e);
+      timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime() 
/ 1000;
+    } catch (ParseException e) {
+      throw new HoodieCompactionException(e.getMessage(), e);
     }
+    return timestamp;
   }
-
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 066a965..80542ed 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.junit.jupiter.api.Test;
@@ -39,21 +38,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestInlineCompaction extends CompactionTestBase {
 
-  private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) {
+  private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits, 
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
     return getConfigBuilder(false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            
.withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
+            .withInlineCompaction(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)
+            .withMaxDeltaSecondsBeforeCompaction(maxDeltaTime)
+            .withInlineCompactionTriggerStrategy(inlineCompactionType).build())
         .build();
   }
 
   @Test
   public void testCompactionIsNotScheduledEarly() throws Exception {
     // Given: make two commits
-    HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, 
CompactionTriggerStrategy.NUM_COMMITS);
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
-      List<HoodieRecord> records = dataGen.generateInserts("000", 100);
+      List<HoodieRecord> records = 
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 100);
       HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
-      runNextDeltaCommits(writeClient, readClient, Arrays.asList("000", 
"001"), records, cfg, true, new ArrayList<>());
+      List<String> instants = IntStream.range(0, 2).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+      runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
 
       // Then: ensure no compaction is executedm since there are only 2 delta 
commits
@@ -62,9 +65,9 @@ public class TestInlineCompaction extends CompactionTestBase {
   }
 
   @Test
-  public void testSuccessfulCompaction() throws Exception {
+  public void testSuccessfulCompactionBasedOnNumCommits() throws Exception {
     // Given: make three commits
-    HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60, 
CompactionTriggerStrategy.NUM_COMMITS);
     List<String> instants = IntStream.range(0, 2).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
 
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
@@ -85,32 +88,181 @@ public class TestInlineCompaction extends 
CompactionTestBase {
   }
 
   @Test
-  public void testCompactionRetryOnFailure() throws Exception {
+  public void testSuccessfulCompactionBasedOnTime() throws Exception {
+    // Given: make one commit
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10, 
CompactionTriggerStrategy.TIME_ELAPSED);
+
+    try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      String instantTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime), 
records, cfg, true, new ArrayList<>());
+
+      // after 10s, that will trigger compaction
+      String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000);
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, 
false);
+
+      // Then: ensure the file slices are compacted as per policy
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      assertEquals(3, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+      assertEquals(HoodieTimeline.COMMIT_ACTION, 
metaClient.getActiveTimeline().lastInstant().get().getAction());
+    }
+  }
+
+  @Test
+  public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
+    // Given: make three commits
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, 
CompactionTriggerStrategy.NUM_OR_TIME);
+    try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      List<HoodieRecord> records = 
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      List<String> instants = IntStream.range(0, 2).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+      runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
+      // Then: trigger the compaction because reach 3 commits.
+      String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+      // 4th commit, that will trigger compaction because reach the time 
elapsed
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
+      createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      assertEquals(6, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+    }
+  }
+
+  @Test
+  public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception {
+    // Given: make three commits
+    HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20, 
CompactionTriggerStrategy.NUM_AND_TIME);
+    try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      List<HoodieRecord> records = 
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      List<String> instants = IntStream.range(0, 3).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+      runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+
+      // Then: ensure no compaction is executedm since there are only 3 delta 
commits
+      assertEquals(3, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+      // 4th commit, that will trigger compaction
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
+      createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      assertEquals(5, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+    }
+  }
+
+  @Test
+  public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception 
{
     // Given: two commits, schedule compaction and its failed/in-flight
     HoodieWriteConfig cfg = getConfigBuilder(false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .build();
-    List<String> instants = CollectionUtils.createImmutableList("000", "001");
+    List<String> instants = IntStream.range(0, 2).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+    String instantTime2;
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
       List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 
100);
       HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
-      // Schedule compaction 002, make it in-flight (simulates inline 
compaction failing)
-      scheduleCompaction("002", writeClient, cfg);
-      moveCompactionFromRequestedToInflight("002", cfg);
+      // Schedule compaction instant2, make it in-flight (simulates inline 
compaction failing)
+      instantTime2 = HoodieActiveTimeline.createNewInstantTime();
+      scheduleCompaction(instantTime2, writeClient, cfg);
+      moveCompactionFromRequestedToInflight(instantTime2, cfg);
+    }
+
+    // When: a third commit happens
+    HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60, 
CompactionTriggerStrategy.NUM_COMMITS);
+    String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
+    try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) 
{
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      createNextDeltaCommit(instantTime3, 
dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg, 
false);
+    }
+
+    // Then: 1 delta commit is done, the failed compaction is retried
+    metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+    assertEquals(instantTime2, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+  }
+
+  @Test
+  public void testCompactionRetryOnFailureBasedOnTime() throws Exception {
+    // Given: two commits, schedule compaction and its failed/in-flight
+    HoodieWriteConfig cfg = getConfigBuilder(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxDeltaSecondsBeforeCompaction(5)
+            
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.TIME_ELAPSED).build())
+        .build();
+    String instantTime;
+    List<String> instants = IntStream.range(0, 2).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+    try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 
100);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
+      // Schedule compaction instantTime, make it in-flight (simulates inline 
compaction failing)
+      instantTime = HoodieActiveTimeline.createNewInstantTime(10000);
+      scheduleCompaction(instantTime, writeClient, cfg);
+      moveCompactionFromRequestedToInflight(instantTime, cfg);
+    }
+
+    // When: commit happens after 10s
+    HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10, 
CompactionTriggerStrategy.TIME_ELAPSED);
+    String instantTime2;
+    try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) 
{
+      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      instantTime2 = HoodieActiveTimeline.createNewInstantTime();
+      createNextDeltaCommit(instantTime2, 
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, 
false);
+    }
+
+    // Then: 1 delta commit is done, the failed compaction is retried
+    metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+    assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+  }
+
+  @Test
+  public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception 
{
+    // Given: two commits, schedule compaction and its failed/in-flight
+    HoodieWriteConfig cfg = getConfigBuilder(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxDeltaSecondsBeforeCompaction(1)
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_AND_TIME).build())
+        .build();
+    String instantTime;
+    List<String> instants = IntStream.range(0, 2).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+    try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+      List<HoodieRecord> records = dataGen.generateInserts(instants.get(0), 
10);
+      HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+      runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
+      // Schedule compaction instantTime, make it in-flight (simulates inline 
compaction failing)
+      instantTime = HoodieActiveTimeline.createNewInstantTime();
+      scheduleCompaction(instantTime, writeClient, cfg);
+      moveCompactionFromRequestedToInflight(instantTime, cfg);
     }
 
     // When: a third commit happens
-    HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2);
+    HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20, 
CompactionTriggerStrategy.NUM_OR_TIME);
+    String instantTime2;
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) 
{
       HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
-      createNextDeltaCommit("003", dataGen.generateUpdates("003", 100), 
writeClient, metaClient, inlineCfg, false);
+      instantTime2 = HoodieActiveTimeline.createNewInstantTime();
+      createNextDeltaCommit(instantTime2, 
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, 
false);
     }
 
     // Then: 1 delta commit is done, the failed compaction is retried
     metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
     assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
-    assertEquals("002", 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+    assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 865f0dc..4a03cd4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -81,10 +81,18 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
    * Ensures each instant time is atleast 1 second apart since we create 
instant times at second granularity
    */
   public static String createNewInstantTime() {
+    return createNewInstantTime(0);
+  }
+
+  /**
+   * Returns next instant time that adds N milliseconds in the {@link 
#COMMIT_FORMATTER} format.
+   * Ensures each instant time is atleast 1 second apart since we create 
instant times at second granularity
+   */
+  public static String createNewInstantTime(long milliseconds) {
     return lastInstantTime.updateAndGet((oldVal) -> {
       String newCommitTime;
       do {
-        newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new 
Date());
+        newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new 
Date(System.currentTimeMillis() + milliseconds));
       } while (HoodieTimeline.compareTimestamps(newCommitTime, 
LESSER_THAN_OR_EQUALS, oldVal));
       return newCommitTime;
     });
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java
 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java
new file mode 100644
index 0000000..0d51706
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieCompactException extends HoodieException {
+
+  public HoodieCompactException(String msg) {
+    super(msg);
+  }
+
+  public HoodieCompactException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}

Reply via email to