yanghua commented on a change in pull request #2260:
URL: https://github.com/apache/hudi/pull/2260#discussion_r560755205
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -109,6 +112,8 @@
private static final String DEFAULT_INLINE_COMPACT = "false";
private static final String DEFAULT_INCREMENTAL_CLEANER = "true";
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = "5";
+ private static final String DEFAULT_INLINE_COMPACT_ELAPSED_TIME =
String.valueOf(60 * 60);
Review comment:
ditto
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,112 @@ public
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " +
config.getBasePath());
+ // judge if we need to compact according to num delta commits and time
elapsed
+ boolean compactable =
needCompact(config.getInlineCompactTriggerStrategy());
+ if (compactable) {
+ LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
+ HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ try {
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
+ Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+ // exclude files in pending clustering from compaction.
+
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+ return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ }
+ }
+
+ return new HoodieCompactionPlan();
+ }
+
+ public Tuple2<Integer, String>
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
- String lastCompactionTs = "0";
+ HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
+
+ String lastCompactionTs;
+ int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
lastCompactionTs = lastCompaction.get().getTimestamp();
+ } else {
+ lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
}
+ if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+ if (lastCompaction.isPresent()) {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ } else {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ }
+ }
+ return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+ }
- int deltaCommitsSinceLastCompaction =
table.getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
- if (config.getInlineCompactDeltaCommitMax() >
deltaCommitsSinceLastCompaction) {
- LOG.info("Not scheduling compaction as only " +
deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " +
lastCompactionTs + ". Waiting for "
- + config.getInlineCompactDeltaCommitMax());
- return new HoodieCompactionPlan();
+ public boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
+ boolean compactable;
+ // return deltaCommitsSinceLastCompaction and lastCompactionTs
Review comment:
`return` -> `get`
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -85,32 +88,185 @@ public void testSuccessfulCompaction() throws Exception {
}
@Test
- public void testCompactionRetryOnFailure() throws Exception {
+ public void testSuccessfulCompactionBasedOnTime() throws Exception {
+ // Given: make one commit
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
+
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime),
records, cfg, true, new ArrayList<>());
+
+ // after 10s, that will trigger compaction
+ Thread.sleep(10000);
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg,
false);
+
+ // Then: ensure the file slices are compacted as per policy
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(HoodieTimeline.COMMIT_ACTION,
metaClient.getActiveTimeline().lastInstant().get().getAction());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_OR_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Then: trigger the compaction because reach 3 commits.
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ Thread.sleep(20000);
Review comment:
ditto
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -85,32 +88,185 @@ public void testSuccessfulCompaction() throws Exception {
}
@Test
- public void testCompactionRetryOnFailure() throws Exception {
+ public void testSuccessfulCompactionBasedOnTime() throws Exception {
+ // Given: make one commit
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
+
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime),
records, cfg, true, new ArrayList<>());
+
+ // after 10s, that will trigger compaction
+ Thread.sleep(10000);
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg,
false);
+
+ // Then: ensure the file slices are compacted as per policy
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(HoodieTimeline.COMMIT_ACTION,
metaClient.getActiveTimeline().lastInstant().get().getAction());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_OR_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Then: trigger the compaction because reach 3 commits.
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ Thread.sleep(20000);
+ // 4th commit, that will trigger compaction because reach the time
elapsed
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(6,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_AND_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 3).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+
+ // Then: ensure no compaction is executedm since there are only 3 delta
commits
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ Thread.sleep(20000);
+ // 4th commit, that will trigger compaction
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(5,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ }
+ }
+
+ @Test
+ public void testCompactionRetryOnFailureBasedOnNumCommits() throws Exception
{
+ // Given: two commits, schedule compaction and its failed/in-flight
+ HoodieWriteConfig cfg = getConfigBuilder(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .build();
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ String instantTime2;
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Schedule compaction instant2, make it in-flight (simulates inline
compaction failing)
+ instantTime2 = HoodieActiveTimeline.createNewInstantTime();
+ scheduleCompaction(instantTime2, writeClient, cfg);
+ moveCompactionFromRequestedToInflight(instantTime2, cfg);
+ }
+
+ // When: a third commit happens
+ HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60,
CompactionTriggerStrategy.NUM);
+ String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg))
{
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(instantTime3,
dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg,
false);
+ }
+
+ // Then: 1 delta commit is done, the failed compaction is retried
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(instantTime2,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
+ }
+
+ @Test
+ public void testCompactionRetryOnFailureBasedOnTime() throws Exception {
// Given: two commits, schedule compaction and its failed/in-flight
HoodieWriteConfig cfg = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
-
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withInlineCompaction(false)
+ .withMaxDeltaTimeBeforeCompaction(5)
+
.withInlineCompactionTriggerStrategy(CompactionTriggerStrategy.TIME_ELAPSED).build())
.build();
- List<String> instants = CollectionUtils.createImmutableList("000", "001");
+ String instantTime;
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
List<HoodieRecord> records = dataGen.generateInserts(instants.get(0),
100);
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
- // Schedule compaction 002, make it in-flight (simulates inline
compaction failing)
- scheduleCompaction("002", writeClient, cfg);
- moveCompactionFromRequestedToInflight("002", cfg);
+ // Schedule compaction instantTime, make it in-flight (simulates inline
compaction failing)
+ Thread.sleep(10000);
Review comment:
ditto
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -46,6 +47,8 @@
public static final String INLINE_COMPACT_PROP = "hoodie.compact.inline";
// Run a compaction every N delta commits
public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP =
"hoodie.compact.inline.max.delta.commits";
+ public static final String INLINE_COMPACT_ELAPSED_TIME_PROP =
"hoodie.compact.inline.max.delta.seconds";
Review comment:
Can we unify the constant name to
`INLINE_COMPACT_TIME_DELTA_SECONDS_PROP ` so that we can align with
`INLINE_COMPACT_NUM_DELTA_COMMITS_PROP ` and `withMaxDeltaTimeBeforeCompaction `
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,112 @@ public
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " +
config.getBasePath());
+ // judge if we need to compact according to num delta commits and time
elapsed
+ boolean compactable =
needCompact(config.getInlineCompactTriggerStrategy());
+ if (compactable) {
+ LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
+ HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ try {
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
+ Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+ // exclude files in pending clustering from compaction.
+
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+ return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ }
+ }
+
+ return new HoodieCompactionPlan();
+ }
+
+ public Tuple2<Integer, String>
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
- String lastCompactionTs = "0";
+ HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
+
+ String lastCompactionTs;
+ int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
lastCompactionTs = lastCompaction.get().getTimestamp();
+ } else {
+ lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
}
+ if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+ if (lastCompaction.isPresent()) {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ } else {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ }
+ }
+ return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+ }
- int deltaCommitsSinceLastCompaction =
table.getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
- if (config.getInlineCompactDeltaCommitMax() >
deltaCommitsSinceLastCompaction) {
- LOG.info("Not scheduling compaction as only " +
deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " +
lastCompactionTs + ". Waiting for "
- + config.getInlineCompactDeltaCommitMax());
- return new HoodieCompactionPlan();
+ public boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
+ boolean compactable;
+ // return deltaCommitsSinceLastCompaction and lastCompactionTs
+ Tuple2<Integer, String> threshold =
getLastDeltaCommitInfo(compactionTriggerStrategy);
+ int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
+ int inlineCompactDeltaElapsedTimeMax =
config.getInlineCompactDeltaElapsedTimeMax();
+ long elapsedTime;
+ switch (compactionTriggerStrategy) {
+ case NUM:
+ compactable = inlineCompactDeltaCommitMax <= threshold._1;
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s, trigger compaction
scheduler.", inlineCompactDeltaCommitMax));
+ } else {
+ LOG.info(String.format("Not scheduling compaction because %s delta
commits needed since last compaction %s."
+ + "But only %s delta commits found.",
inlineCompactDeltaCommitMax, threshold._2, threshold._1));
+ }
+ return compactable;
+ case TIME_ELAPSED:
+ elapsedTime = parsedToSeconds(instantTime) -
parsedToSeconds(threshold._2);
+ compactable = inlineCompactDeltaElapsedTimeMax <= elapsedTime;
+ if (compactable) {
+ LOG.info(String.format("The elapsed time >=%ss, trigger compaction
scheduler.", inlineCompactDeltaElapsedTimeMax));
+ } else {
+ LOG.info(String.format("Not scheduling compaction because %s elapsed
time needed since last compaction %s."
+ + "But only %ss elapsed time found",
inlineCompactDeltaElapsedTimeMax, threshold._2, elapsedTime));
+ }
+ return compactable;
+ case NUM_OR_TIME:
+ elapsedTime = parsedToSeconds(instantTime) -
parsedToSeconds(threshold._2);
+ compactable = inlineCompactDeltaCommitMax <= threshold._1 ||
inlineCompactDeltaElapsedTimeMax <= elapsedTime;
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s or elapsed_time
>=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
+ inlineCompactDeltaElapsedTimeMax));
+ } else {
+ LOG.info(String.format("Not scheduling compaction because %s delta
commits or %ss elapsed time needed since last compaction %s."
+ + "But only %s delta commits and %ss elapsed time found",
inlineCompactDeltaCommitMax, inlineCompactDeltaElapsedTimeMax, threshold._2,
+ threshold._1, elapsedTime));
+ }
+ return compactable;
+ case NUM_AND_TIME:
+ elapsedTime = parsedToSeconds(instantTime) -
parsedToSeconds(threshold._2);
+ compactable = inlineCompactDeltaCommitMax <= threshold._1 &&
inlineCompactDeltaElapsedTimeMax <= elapsedTime;
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s and elapsed_time
>=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
+ inlineCompactDeltaElapsedTimeMax));
+ } else {
+ LOG.info(String.format("Not scheduling compaction because %s delta
commits and %ss elapsed time needed since last compaction %s."
+ + "But only %s delta commits and %ss elapsed time found",
inlineCompactDeltaCommitMax, inlineCompactDeltaElapsedTimeMax, threshold._2,
+ threshold._1, elapsedTime));
+ }
+ return compactable;
+ default:
+ throw new HoodieCompactionException("Unsupported compact type: " +
config.getInlineCompactTriggerStrategy());
Review comment:
`compact type` -> `compaction trigger strategy`.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,112 @@ public
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " +
config.getBasePath());
+ // judge if we need to compact according to num delta commits and time
elapsed
+ boolean compactable =
needCompact(config.getInlineCompactTriggerStrategy());
+ if (compactable) {
+ LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
+ HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ try {
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
+ Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+ // exclude files in pending clustering from compaction.
+
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+ return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ }
+ }
+
+ return new HoodieCompactionPlan();
+ }
+
+ public Tuple2<Integer, String>
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
- String lastCompactionTs = "0";
+ HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
+
+ String lastCompactionTs;
+ int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
lastCompactionTs = lastCompaction.get().getTimestamp();
+ } else {
+ lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
}
+ if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+ if (lastCompaction.isPresent()) {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ } else {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ }
+ }
+ return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+ }
- int deltaCommitsSinceLastCompaction =
table.getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
- if (config.getInlineCompactDeltaCommitMax() >
deltaCommitsSinceLastCompaction) {
- LOG.info("Not scheduling compaction as only " +
deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " +
lastCompactionTs + ". Waiting for "
- + config.getInlineCompactDeltaCommitMax());
- return new HoodieCompactionPlan();
+ public boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
+ boolean compactable;
+ // return deltaCommitsSinceLastCompaction and lastCompactionTs
+ Tuple2<Integer, String> threshold =
getLastDeltaCommitInfo(compactionTriggerStrategy);
Review comment:
Actually, it's not a threshold, right? it's real value. The below two
are threshold values if you want to define. `threshold` is immutable.
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -85,32 +88,185 @@ public void testSuccessfulCompaction() throws Exception {
}
@Test
- public void testCompactionRetryOnFailure() throws Exception {
+ public void testSuccessfulCompactionBasedOnTime() throws Exception {
+ // Given: make one commit
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
+
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime),
records, cfg, true, new ArrayList<>());
+
+ // after 10s, that will trigger compaction
+ Thread.sleep(10000);
Review comment:
We should avoid use sleep, it will add the CI time. Can we fetch the
relevant status with a loop?
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,112 @@ public
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " +
config.getBasePath());
+ // judge if we need to compact according to num delta commits and time
elapsed
+ boolean compactable =
needCompact(config.getInlineCompactTriggerStrategy());
+ if (compactable) {
+ LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
+ HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ try {
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
+ Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+ // exclude files in pending clustering from compaction.
+
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+ return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ }
+ }
+
+ return new HoodieCompactionPlan();
+ }
+
+ public Tuple2<Integer, String>
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
- String lastCompactionTs = "0";
+ HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
+
+ String lastCompactionTs;
+ int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
lastCompactionTs = lastCompaction.get().getTimestamp();
+ } else {
+ lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
}
+ if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+ if (lastCompaction.isPresent()) {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ } else {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ }
+ }
+ return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+ }
- int deltaCommitsSinceLastCompaction =
table.getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
- if (config.getInlineCompactDeltaCommitMax() >
deltaCommitsSinceLastCompaction) {
- LOG.info("Not scheduling compaction as only " +
deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " +
lastCompactionTs + ". Waiting for "
- + config.getInlineCompactDeltaCommitMax());
- return new HoodieCompactionPlan();
+ public boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
+ boolean compactable;
+ // return deltaCommitsSinceLastCompaction and lastCompactionTs
+ Tuple2<Integer, String> threshold =
getLastDeltaCommitInfo(compactionTriggerStrategy);
+ int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
+ int inlineCompactDeltaElapsedTimeMax =
config.getInlineCompactDeltaElapsedTimeMax();
+ long elapsedTime;
+ switch (compactionTriggerStrategy) {
+ case NUM:
+ compactable = inlineCompactDeltaCommitMax <= threshold._1;
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s, trigger compaction
scheduler.", inlineCompactDeltaCommitMax));
+ } else {
+ LOG.info(String.format("Not scheduling compaction because %s delta
commits needed since last compaction %s."
Review comment:
IMO, we do not need all the `else` statement, right? It's too normal if
we do not match the compaction strategy.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,112 @@ public
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " +
config.getBasePath());
+ // judge if we need to compact according to num delta commits and time
elapsed
+ boolean compactable =
needCompact(config.getInlineCompactTriggerStrategy());
+ if (compactable) {
+ LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
+ HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ try {
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
+ Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+ // exclude files in pending clustering from compaction.
+
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+ return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ }
+ }
+
+ return new HoodieCompactionPlan();
+ }
+
+ public Tuple2<Integer, String>
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
- String lastCompactionTs = "0";
+ HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
+
+ String lastCompactionTs;
+ int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
lastCompactionTs = lastCompaction.get().getTimestamp();
+ } else {
+ lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
}
+ if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
+ if (lastCompaction.isPresent()) {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ } else {
+ deltaCommitsSinceLastCompaction =
deltaCommits.findInstantsAfterOrEquals(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
+ }
+ }
+ return new Tuple2(deltaCommitsSinceLastCompaction, lastCompactionTs);
+ }
- int deltaCommitsSinceLastCompaction =
table.getActiveTimeline().getDeltaCommitTimeline()
- .findInstantsAfter(lastCompactionTs,
Integer.MAX_VALUE).countInstants();
- if (config.getInlineCompactDeltaCommitMax() >
deltaCommitsSinceLastCompaction) {
- LOG.info("Not scheduling compaction as only " +
deltaCommitsSinceLastCompaction
- + " delta commits was found since last compaction " +
lastCompactionTs + ". Waiting for "
- + config.getInlineCompactDeltaCommitMax());
- return new HoodieCompactionPlan();
+ public boolean needCompact(CompactionTriggerStrategy
compactionTriggerStrategy) {
+ boolean compactable;
+ // return deltaCommitsSinceLastCompaction and lastCompactionTs
+ Tuple2<Integer, String> threshold =
getLastDeltaCommitInfo(compactionTriggerStrategy);
+ int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
+ int inlineCompactDeltaElapsedTimeMax =
config.getInlineCompactDeltaElapsedTimeMax();
+ long elapsedTime;
+ switch (compactionTriggerStrategy) {
+ case NUM:
+ compactable = inlineCompactDeltaCommitMax <= threshold._1;
+ if (compactable) {
+ LOG.info(String.format("The delta commits >= %s, trigger compaction
scheduler.", inlineCompactDeltaCommitMax));
+ } else {
+ LOG.info(String.format("Not scheduling compaction because %s delta
commits needed since last compaction %s."
+ + "But only %s delta commits found.",
inlineCompactDeltaCommitMax, threshold._2, threshold._1));
+ }
+ return compactable;
Review comment:
Since you have defined a `compactable `, let's use `break` here and
return it in the end.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/SparkScheduleCompactionActionExecutor.java
##########
@@ -58,36 +62,112 @@ public
SparkScheduleCompactionActionExecutor(HoodieEngineContext context,
@Override
protected HoodieCompactionPlan scheduleCompaction() {
LOG.info("Checking if compaction needs to be run on " +
config.getBasePath());
+ // judge if we need to compact according to num delta commits and time
elapsed
+ boolean compactable =
needCompact(config.getInlineCompactTriggerStrategy());
+ if (compactable) {
+ LOG.info("Generating compaction plan for merge on read table " +
config.getBasePath());
+ HoodieSparkMergeOnReadTableCompactor compactor = new
HoodieSparkMergeOnReadTableCompactor();
+ try {
+ SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
table.getSliceView();
+ Set<HoodieFileGroupId> fgInPendingCompactionAndClustering =
fileSystemView.getPendingCompactionOperations()
+ .map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
+ .collect(Collectors.toSet());
+ // exclude files in pending clustering from compaction.
+
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
+ return compactor.generateCompactionPlan(context, table, config,
instantTime, fgInPendingCompactionAndClustering);
+ } catch (IOException e) {
+ throw new HoodieCompactionException("Could not schedule compaction " +
config.getBasePath(), e);
+ }
+ }
+
+ return new HoodieCompactionPlan();
+ }
+
+ public Tuple2<Integer, String>
getLastDeltaCommitInfo(CompactionTriggerStrategy compactionTriggerStrategy) {
Option<HoodieInstant> lastCompaction =
table.getActiveTimeline().getCommitTimeline()
.filterCompletedInstants().lastInstant();
- String lastCompactionTs = "0";
+ HoodieTimeline deltaCommits =
table.getActiveTimeline().getDeltaCommitTimeline();
+
+ String lastCompactionTs;
+ int deltaCommitsSinceLastCompaction = 0;
if (lastCompaction.isPresent()) {
lastCompactionTs = lastCompaction.get().getTimestamp();
+ } else {
+ lastCompactionTs = deltaCommits.firstInstant().get().getTimestamp();
}
+ if (compactionTriggerStrategy != CompactionTriggerStrategy.TIME_ELAPSED) {
Review comment:
Do we also need to judge `NUM_OR_TIME `?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -85,32 +88,185 @@ public void testSuccessfulCompaction() throws Exception {
}
@Test
- public void testCompactionRetryOnFailure() throws Exception {
+ public void testSuccessfulCompactionBasedOnTime() throws Exception {
+ // Given: make one commit
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
+
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime),
records, cfg, true, new ArrayList<>());
+
+ // after 10s, that will trigger compaction
+ Thread.sleep(10000);
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg,
false);
+
+ // Then: ensure the file slices are compacted as per policy
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(HoodieTimeline.COMMIT_ACTION,
metaClient.getActiveTimeline().lastInstant().get().getAction());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_OR_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Then: trigger the compaction because reach 3 commits.
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ Thread.sleep(20000);
+ // 4th commit, that will trigger compaction because reach the time
elapsed
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(6,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_AND_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 3).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+
+ // Then: ensure no compaction is executedm since there are only 3 delta
commits
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ Thread.sleep(20000);
Review comment:
ditto
##########
File path:
hudi-common/src/main/java/org/apache/hudi/exception/HoodieCompactException.java
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieCompactException extends HoodieException {
Review comment:
Useless exception class?
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
##########
@@ -85,32 +88,185 @@ public void testSuccessfulCompaction() throws Exception {
}
@Test
- public void testCompactionRetryOnFailure() throws Exception {
+ public void testSuccessfulCompactionBasedOnTime() throws Exception {
+ // Given: make one commit
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
+
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<HoodieRecord> records = dataGen.generateInserts(instantTime, 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ runNextDeltaCommits(writeClient, readClient, Arrays.asList(instantTime),
records, cfg, true, new ArrayList<>());
+
+ // after 10s, that will trigger compaction
+ Thread.sleep(10000);
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg,
false);
+
+ // Then: ensure the file slices are compacted as per policy
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ assertEquals(HoodieTimeline.COMMIT_ACTION,
metaClient.getActiveTimeline().lastInstant().get().getAction());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumOrTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_OR_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ // Then: trigger the compaction because reach 3 commits.
+ String finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ Thread.sleep(20000);
+ // 4th commit, that will trigger compaction because reach the time
elapsed
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ finalInstant = HoodieActiveTimeline.createNewInstantTime();
+ createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
+
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ assertEquals(6,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ }
+ }
+
+ @Test
+ public void testSuccessfulCompactionBasedOnNumAndTime() throws Exception {
+ // Given: make three commits
+ HoodieWriteConfig cfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_AND_TIME);
+ try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(cfg)) {
+ List<HoodieRecord> records =
dataGen.generateInserts(HoodieActiveTimeline.createNewInstantTime(), 10);
+ HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
+ List<String> instants = IntStream.range(0, 3).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
+ runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+
+ // Then: ensure no compaction is executedm since there are only 3 delta
commits
+ assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
+ Thread.sleep(20000);
Review comment:
ditto
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]