This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9431aab [HUDI-1381] Schedule compaction based on time elapsed (#2260)
9431aab is described below
commit 9431aabfab47a3b679bd6ccfa8b5fa584260fc9e
Author: Karl_Wang <[email protected]>
AuthorDate: Wed Feb 17 23:44:53 2021 +0800
[HUDI-1381] Schedule compaction based on time elapsed (#2260)
- introduce configs to control how compaction is triggered
- Compaction can be triggered using time, number of delta commits and/or
combinations
- Default behaviour remains the same.
---
.../apache/hudi/config/HoodieCompactionConfig.java | 20 +++
.../org/apache/hudi/config/HoodieWriteConfig.java | 9 +
.../action/compact/CompactionTriggerStrategy.java | 30 ++++
.../SparkScheduleCompactionActionExecutor.java | 103 +++++++++---
.../table/action/compact/TestInlineCompaction.java | 186 +++++++++++++++++++--
.../table/timeline/HoodieActiveTimeline.java | 10 +-
.../hudi/exception/HoodieCompactException.java | 30 ++++
7 files changed, 348 insertions(+), 40 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 08f3774..934d91a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -46,6 +47,9 @@ public class HoodieCompactionConfig extends
DefaultHoodieConfig {
public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
// Run a compaction every N delta commits
public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP =
"hoodie.compact.inline.max.delta.commits";
+ // Run a compaction when time elapsed > N seconds since last compaction
+ public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP =
"hoodie.compact.inline.max.delta.seconds";
+ public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP =
"hoodie.compact.inline.trigger.strategy";
public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP =
"hoodie.cleaner.fileversions.retained";
public static final String CLEANER_COMMITS_RETAINED_PROP =
"hoodie.cleaner.commits.retained";
public static final String CLEANER_INCREMENTAL_MODE =
"hoodie.cleaner.incremental.mode";
@@ -109,6 +113,8 @@ public class HoodieCompactionConfig extends
DefaultHoodieConfig {
private static final String DEFAULT_INLINE_COMPACT = "false";
private static final String DEFAULT_INCREMENTAL_CLEANER = "true";
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5";
+ private static final String DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS =
String.valueOf(60 * 60);
+ private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY =
CompactionTriggerStrategy.NUM_COMMITS.name();
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
@@ -164,6 +170,11 @@ public class HoodieCompactionConfig extends
DefaultHoodieConfig {
return this;
}
+ public Builder
withInlineCompactionTriggerStrategy(CompactionTriggerStrategy
compactionTriggerStrategy) {
+ props.setProperty(INLINE_COMPACT_TRIGGER_STRATEGY_PROP,
compactionTriggerStrategy.name());
+ return this;
+ }
+
public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
props.setProperty(CLEANER_POLICY_PROP, policy.name());
return this;
@@ -235,6 +246,11 @@ public class HoodieCompactionConfig extends
DefaultHoodieConfig {
return this;
}
+ public Builder withMaxDeltaSecondsBeforeCompaction(int
maxDeltaSecondsBeforeCompaction) {
+ props.setProperty(INLINE_COMPACT_TIME_DELTA_SECONDS_PROP,
String.valueOf(maxDeltaSecondsBeforeCompaction));
+ return this;
+ }
+
public Builder withCompactionLazyBlockReadEnabled(Boolean
compactionLazyBlockReadEnabled) {
props.setProperty(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
String.valueOf(compactionLazyBlockReadEnabled));
return this;
@@ -271,6 +287,10 @@ public class HoodieCompactionConfig extends
DefaultHoodieConfig {
DEFAULT_INLINE_COMPACT);
setDefaultOnCondition(props,
!props.containsKey(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP),
INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,
DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS);
+ setDefaultOnCondition(props,
!props.containsKey(INLINE_COMPACT_TIME_DELTA_SECONDS_PROP),
+ INLINE_COMPACT_TIME_DELTA_SECONDS_PROP,
DEFAULT_INLINE_COMPACT_TIME_DELTA_SECONDS);
+ setDefaultOnCondition(props,
!props.containsKey(INLINE_COMPACT_TRIGGER_STRATEGY_PROP),
+ INLINE_COMPACT_TRIGGER_STRATEGY_PROP,
DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY);
setDefaultOnCondition(props, !props.containsKey(CLEANER_POLICY_PROP),
CLEANER_POLICY_PROP,
DEFAULT_CLEANER_POLICY);
setDefaultOnCondition(props,
!props.containsKey(CLEANER_FILE_VERSIONS_RETAINED_PROP),
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index cadb2d1..e3c1ef6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -34,6 +34,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -405,10 +406,18 @@ public class HoodieWriteConfig extends
DefaultHoodieConfig {
return
Boolean.parseBoolean(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_PROP));
}
+ public CompactionTriggerStrategy getInlineCompactTriggerStrategy() {
+ return
CompactionTriggerStrategy.valueOf(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY_PROP));
+ }
+
public int getInlineCompactDeltaCommitMax() {
return
Integer.parseInt(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP));
}
+ public int getInlineCompactDeltaSecondsMax() {
+ return
Integer.parseInt(props.getProperty(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS_PROP));
+ }
+
public CompactionStrategy getCompactionStrategy() {
return
ReflectionUtils.loadClass(props.getProperty(HoodieCompactionConfig.COMPACTION_STRATEGY_PROP));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java
new file mode 100644
index 0000000..6a4e634
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionTriggerStrategy.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact;
+
+public enum CompactionTriggerStrategy {
+ // trigger compaction when reach N delta commits
+ NUM_COMMITS,
+ // trigger compaction when time elapsed > N seconds since last compaction
+ TIME_ELAPSED,
+ // trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied
+ NUM_AND_TIME,
+ // trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied
+ NUM_OR_TIME
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
index 34db0a7..9c44499 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
@@ -25,7 +25,9 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -37,6 +39,7 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
+import java.text.ParseException;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -58,36 +61,92 @@ public class SparkScheduleCompactionActionExecutor<T
extends HoodieRecordPayload
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " +
config.getBasePath());
+ // judge if we need to compact according to num delta commits and time
elapsed
+ boolean compactable =
needCompact(config.getInlineCompactTriggerStrategy());
+ if (compactable) {
+ LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
+ HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ try {
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
+ Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+ // exclude files in pending clustering from compaction.
+
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+ return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ }
+ }
+
+ return new HoodieCompactionPlan();
+ }
+
+ public Pair<Integer, String>
getLatestDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
- String lastCompactionTs = "0";
+ HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
+
+ String latestInstantTs;
+ int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
- lastCompactionTs = lastCompaction.get().getTimestamp();
+ latestInstantTs = lastCompaction.get().getTimestamp();
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfter(latestInstantTs,
Integer.MAX_VALUE).countInstants();
+ } else {
+ latestInstantTs = deltaCommits.firstInstant().get().getTimestamp();
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfterOrEquals(latestInstantTs,
Integer.MAX_VALUE).countInstants();
}
+ return Pair.of(deltaCommitsSinceLastCompaction, latestInstantTs);
+ }
- int deltaCommitsSinceLastCompaction =
table.getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
- if (config.getInlineCompactDeltaCommitMax() >
deltaCommitsSinceLastCompaction) {
- LOG.info("Not scheduling compaction as only " +
deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " +
lastCompactionTs + ". Waiting for "
- + config.getInlineCompactDeltaCommitMax());
- return new HoodieCompactionPlan();
+ public boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
+ boolean compactable;
+ // get deltaCommitsSinceLastCompaction and lastCompactionTs
+ Pair<Integer, String> latestDeltaCommitInfo =
getLatestDeltaCommitInfo(compactionTriggerStrategy);
+ int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
+ int inlineCompactDeltaSecondsMax =
config.getInlineCompactDeltaSecondsMax();
+ switch (compactionTriggerStrategy) {
+ case NUM_COMMITS:
+ compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfo.getLeft();
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s, trigger compaction
scheduler.", inlineCompactDeltaCommitMax));
+ }
+ break;
+ case TIME_ELAPSED:
+ compactable = inlineCompactDeltaSecondsMax <=
parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfo.getRight());
+ if (compactable) {
+ LOG.info(String.format("The elapsed time >=%ss, trigger compaction
scheduler.", inlineCompactDeltaSecondsMax));
+ }
+ break;
+ case NUM_OR_TIME:
+ compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfo.getLeft()
+ || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfo.getRight());
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s or elapsed_time
>=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
+ inlineCompactDeltaSecondsMax));
+ }
+ break;
+ case NUM_AND_TIME:
+ compactable = inlineCompactDeltaCommitMax <=
latestDeltaCommitInfo.getLeft()
+ && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) -
parsedToSeconds(latestDeltaCommitInfo.getRight());
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s and elapsed_time
>=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
+ inlineCompactDeltaSecondsMax));
+ }
+ break;
+ default:
+ throw new HoodieCompactionException("Unsupported compaction trigger
strategy: " + config.getInlineCompactTriggerStrategy());
}
+ return compactable;
+ }
- LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
- HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ public Long parsedToSeconds(String time) {
+ long timestamp;
try {
- SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
- Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
- .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
- .collect(Collectors.toSet());
- // exclude files in pending clustering from compaction.
-
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
- return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
-
- } catch (IOException e) {
- throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ timestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(time).getTime()
/ 1000;
+ } catch (ParseException e) {
+ throw new HoodieCompactionException(e.getMessage(), e);
}
+ return timestamp;
}
-
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 066a965..80542ed 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.Test;
@@ -39,21 +38,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestInlineCompaction extends CompactionTestBase {
- private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits) {
+ private HoodieWriteConfig getConfigForInlineCompaction(int maxDeltaCommits,
int maxDeltaTime, CompactionTriggerStrategy inlineCompactionType) {
return getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
-
.withInlineCompaction(true).withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
+ .withInlineCompaction(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)
+ .withMaxDeltaSecondsBeforeCompaction(maxDeltaTime)
+ .withInlineCompactionTriggerStrategy(inlineCompactionType).build())
.build();
}
@Test
public void testCompactionIsNotScheduledEarly() throws Exception {
// Given: make two commits
- HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60,
CompactionTriggerStrategy.NUM_COMMITS);
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
- List<HoodieRecord> records = dataGen.generateInserts("000", 100);
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 100);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
- runNextDeltaCommits(writeClient, readClient, Arrays.asList("000",
"001"), records, cfg, true, new ArrayList<>());
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
// Then: ensure no compaction is executedm since there are only 2 delta
commits
@@ -62,9 +65,9 @@ public class TestInlineCompaction extends CompactionTestBase {
}
@Test
- public void testSuccessfulCompaction() throws Exception {
+ public void testSuccessfulCompactionBasedOnNumCommits() throws Exception {
// Given: make three commits
- HoodieWriteConfig cfg = getConfigForInlineCompaction(3);
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 60,
CompactionTriggerStrategy.NUM_COMMITS);
List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
@@ -85,32 +88,181 @@ public class TestInlineCompaction extends
CompactionTestBase {
}
@Test
- public void testCompactionRetryOnFailure() throws Exception {
+ public void testSuccessfulCompactionBasedOnTime() throws Exception {
+ // Given: make one commit
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
+
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime),
records, cfg, true, new ArrayList<>());
+
+ // after 10s, that will trigger compaction
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg,
false);
+
+ // Then: ensure the file slices are compacted as per policy
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(HoodieTimeline.COMMIT_ACTION,
metaClient.getActiveTimeline().lastInstant().get().getAction());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_OR_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Then: trigger the compaction because reach 3 commits.
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ // 4th commit, that will trigger compaction because reach the time
elapsed
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(6,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_AND_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 3).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+
+ // Then: ensure no compaction is executedm since there are only 3 delta
commits
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ // 4th commit, that will trigger compaction
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(5,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ }
+ }
+
+ @Test
+ public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception
{
// Given: two commits, schedule compaction and its failed/in-flight
HoodieWriteConfig cfg = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
-
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
.build();
- List<String> instants = CollectionUtils.createImmutableList("000", "001");
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ String instantTime2;
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
- // Schedule compaction 002, make it in-flight (simulates inline
compaction failing)
- scheduleCompaction("002", writeClient, cfg);
- moveCompactionFromRequestedToInflight("002", cfg);
+ // Schedule compaction instant2, make it in-flight (simulates inline
compaction failing)
+ instantTime2 = HoodieActiveTimeline.createNewInstantTime();
+ scheduleCompaction(instantTime2, writeClient, cfg);
+ moveCompactionFromRequestedToInflight(instantTime2, cfg);
+ }
+
+ // When: a third commit happens
+ HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60,
CompactionTriggerStrategy.NUM_COMMITS);
+ String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg))
{
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(instantTime3,
dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg,
false);
+ }
+
+ // Then: 1 delta commit is done, the failed compaction is retried
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(instantTime2,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+ }
+
+ @Test
+ public void testCompactionRetryOnFailureBasedOnTime() throws Exception {
+ // Given: two commits, schedule compaction and its failed/in-flight
+ HoodieWriteConfig cfg = getConfigBuilder(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxDeltaSecondsBeforeCompaction(5)
+
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.TIME_ELAPSED).build())
+ .build();
+ String instantTime;
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Schedule compaction instantTime, make it in-flight (simulates inline
compaction failing)
+ instantTime = HoodieActiveTimeline.createNewInstantTime(10000);
+ scheduleCompaction(instantTime, writeClient, cfg);
+ moveCompactionFromRequestedToInflight(instantTime, cfg);
+ }
+
+ // When: commit happens after 10s
+ HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
+ String instantTime2;
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg))
{
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ instantTime2 = HoodieActiveTimeline.createNewInstantTime();
+ createNextDeltaCommit(instantTime2,
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg,
false);
+ }
+
+ // Then: 1 delta commit is done, the failed compaction is retried
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(instantTime,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+ }
+
+ @Test
+ public void testCompactionRetryOnFailureBasedOnNumAndTime() throws Exception
{
+ // Given: two commits, schedule compaction and its failed/in-flight
+ HoodieWriteConfig cfg = getConfigBuilder(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxDeltaSecondsBeforeCompaction(1)
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.NUM_AND_TIME).build())
+ .build();
+ String instantTime;
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Schedule compaction instantTime, make it in-flight (simulates inline
compaction failing)
+ instantTime = HoodieActiveTimeline.createNewInstantTime();
+ scheduleCompaction(instantTime, writeClient, cfg);
+ moveCompactionFromRequestedToInflight(instantTime, cfg);
}
// When: a third commit happens
- HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2);
+ HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_OR_TIME);
+ String instantTime2;
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg))
{
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
- createNextDeltaCommit("003", dataGen.generateUpdates("003", 100),
writeClient, metaClient, inlineCfg, false);
+ instantTime2 = HoodieActiveTimeline.createNewInstantTime();
+ createNextDeltaCommit(instantTime2,
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg,
false);
}
// Then: 1 delta commit is done, the failed compaction is retried
metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
- assertEquals("002",
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+ assertEquals(instantTime,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 865f0dc..4a03cd4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -81,10 +81,18 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
* Ensures each instant time is atleast 1 second apart since we create
instant times at second granularity
*/
public static String createNewInstantTime() {
+ return createNewInstantTime(0);
+ }
+
+ /**
+ * Returns next instant time that adds N milliseconds in the {@link
#COMMIT_FORMATTER} format.
+ * Ensures each instant time is atleast 1 second apart since we create
instant times at second granularity
+ */
+ public static String createNewInstantTime(long milliseconds) {
return lastInstantTime.updateAndGet((oldVal) -> {
String newCommitTime;
do {
- newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new
Date());
+ newCommitTime = HoodieActiveTimeline.COMMIT_FORMATTER.format(new
Date(System.currentTimeMillis() + milliseconds));
} while (HoodieTimeline.compareTimestamps(newCommitTime,
LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java
new file mode 100644
index 0000000..0d51706
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieCompactException extends HoodieException {
+
+ public HoodieCompactException(String msg) {
+ super(msg);
+ }
+
+ public HoodieCompactException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}