This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 506447f [HUDI-850] Avoid unnecessary listings in incremental cleaning
mode (#1576)
506447f is described below
commit 506447fd4fde4cd922f7aa8f4e17a7f06666dc97
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Fri May 1 21:37:21 2020 -0700
[HUDI-850] Avoid unnecessary listings in incremental cleaning mode (#1576)
---
.../hudi/table/action/clean/CleanPlanner.java | 87 ++++++++++++++++------
.../java/org/apache/hudi/table/TestCleaner.java | 43 ++++++-----
.../hudi/utilities/TestHoodieSnapshotExporter.java | 3 -
3 files changed, 85 insertions(+), 48 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 9f14e6c..815b41d 100644
---
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -39,7 +39,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
-
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.table.HoodieTable;
import org.apache.log4j.LogManager;
@@ -48,6 +47,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -111,36 +111,77 @@ public class CleanPlanner<T extends
HoodieRecordPayload<T>> implements Serializa
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant>
newInstantToRetain) throws IOException {
- if (config.incrementalCleanerModeEnabled() &&
newInstantToRetain.isPresent()
- && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS ==
config.getCleanerPolicy())) {
- Option<HoodieInstant> lastClean =
-
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+ switch (config.getCleanerPolicy()) {
+ case KEEP_LATEST_COMMITS:
+ return getPartitionPathsForCleanByCommits(newInstantToRetain);
+ case KEEP_LATEST_FILE_VERSIONS:
+ return getPartitionPathsForFullCleaning();
+ default:
+ throw new IllegalStateException("Unknown Cleaner Policy");
+ }
+ }
+
+ /**
+ * Return partition paths for cleaning by commits mode.
+ * @param instantToRetain Earliest Instant to retain
+ * @return list of partitions
+ * @throws IOException
+ */
+ private List<String>
getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain)
throws IOException {
+ if (!instantToRetain.isPresent()) {
+ LOG.info("No earliest commit to retain. No need to scan partitions !!");
+ return Collections.emptyList();
+ }
+
+ if (config.incrementalCleanerModeEnabled()) {
+ Option<HoodieInstant> lastClean =
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
- LOG.warn("Incremental Cleaning mode is enabled. Looking up
partition-paths that have since changed "
- + "since last cleaned at " +
cleanMetadata.getEarliestCommitToRetain()
- + ". New Instant to retain : " + newInstantToRetain);
- return hoodieTable.getCompletedCommitsTimeline().getInstants()
- .filter(instant ->
- HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain())
- && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
- ).flatMap(instant -> {
- try {
- HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- return
commitMetadata.getPartitionToWriteStats().keySet().stream();
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- }).distinct().collect(Collectors.toList());
+ return getPartitionPathsForIncrementalCleaning(cleanMetadata,
instantToRetain);
}
}
}
- // Otherwise go to brute force mode of scanning all partitions
- return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
- hoodieTable.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
+ return getPartitionPathsForFullCleaning();
+ }
+
+ /**
+ * Use Incremental Mode for finding partition paths.
+ * @param cleanMetadata
+ * @param newInstantToRetain
+ * @return
+ */
+ private List<String>
getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
+ Option<HoodieInstant> newInstantToRetain) {
+ LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths
that have since changed "
+ + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ + ". New Instant to retain : " + newInstantToRetain);
+ return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
+ instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
+ cleanMetadata.getEarliestCommitToRetain()) &&
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+ HoodieTimeline.LESSER_THAN,
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+ HoodieCommitMetadata.class);
+ return
commitMetadata.getPartitionToWriteStats().keySet().stream();
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }).distinct().collect(Collectors.toList());
+ }
+
+ /**
+ * Scan and list all paritions for cleaning.
+ * @return all partitions paths for the dataset.
+ * @throws IOException
+ */
+ private List<String> getPartitionPathsForFullCleaning() throws IOException {
+ // Go to brute force mode of scanning all partitions
+ return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
hoodieTable.getMetaClient().getBasePath(),
+ config.shouldAssumeDatePartitioning());
}
/**
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index ad05890..296530b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -104,7 +104,8 @@ public class TestCleaner extends TestHoodieClientBase {
*/
private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg,
HoodieWriteClient client,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient,
JavaRDD<HoodieRecord>, String> insertFn,
+ HoodieCleaningPolicy cleaningPolicy) throws Exception {
/*
* do a big insert (this is basically same as insert part of upsert, just
adding it here so we can catch breakages
@@ -127,11 +128,15 @@ public class TestCleaner extends TestHoodieClientBase {
HoodieTable table = HoodieTable.create(metaClient, client.getConfig(),
jsc);
assertFalse(table.getCompletedCommitsTimeline().empty());
- String instantTime =
table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
- assertFalse(table.getCompletedCleanTimeline().empty());
- assertEquals(instantTime,
-
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
- "The clean instant should be the same as the commit instant");
+ if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
+ // We no longer write empty cleaner plans when there are not enough
commits present
+ assertTrue(table.getCompletedCleanTimeline().empty());
+ } else {
+ String instantTime =
table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
+ assertFalse(table.getCompletedCleanTimeline().empty());
+ assertEquals(instantTime,
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
+ "The clean instant should be the same as the commit instant");
+ }
HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
List<HoodieRecord> taggedRecords =
index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
@@ -201,7 +206,8 @@ public class TestCleaner extends TestHoodieClientBase {
final Function2<List<HoodieRecord>, String, Integer>
recordUpsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg,
dataGen::generateUniqueUpdates);
- insertFirstBigBatchForClientCleanerTest(cfg, client,
recordInsertGenWrappedFunction, insertFn);
+ insertFirstBigBatchForClientCleanerTest(cfg, client,
recordInsertGenWrappedFunction, insertFn,
+ HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice =
new HashMap<>();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -349,7 +355,7 @@ public class TestCleaner extends TestHoodieClientBase {
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
-
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
+
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
@@ -361,7 +367,8 @@ public class TestCleaner extends TestHoodieClientBase {
final Function2<List<HoodieRecord>, String, Integer>
recordUpsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg,
dataGen::generateUniqueUpdates);
- insertFirstBigBatchForClientCleanerTest(cfg, client,
recordInsertGenWrappedFunction, insertFn);
+ insertFirstBigBatchForClientCleanerTest(cfg, client,
recordInsertGenWrappedFunction, insertFn,
+ HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
// Keep doing some writes and clean inline. Make sure we have expected
number of files remaining.
HoodieTestUtils.monotonicIncreasingCommitTimestamps(8,
1).forEach(newCommitTime -> {
@@ -376,7 +383,9 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
- Option<HoodieInstant> earliestRetainedCommit =
activeTimeline.nthFromLastInstant(maxCommits - 1);
+ // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We
explicitly keep one commit before earliest
+ // commit
+ Option<HoodieInstant> earliestRetainedCommit =
activeTimeline.nthFromLastInstant(maxCommits);
Set<HoodieInstant> acceptableCommits =
activeTimeline.getInstants().collect(Collectors.toSet());
if (earliestRetainedCommit.isPresent()) {
acceptableCommits
@@ -751,12 +760,7 @@ public class TestCleaner extends TestHoodieClientBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config,
simulateFailureRetry);
- assertEquals(0,
- getCleanStat(hoodieCleanStatsOne,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
- .size(), "Must not clean any files");
- assertEquals(0,
- getCleanStat(hoodieCleanStatsOne,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
- .size(), "Must not clean any files");
+ assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions
and clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
file1P0C0));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
@@ -786,12 +790,7 @@ public class TestCleaner extends TestHoodieClientBase {
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config,
simulateFailureRetry);
- assertEquals(0,
- getCleanStat(hoodieCleanStatsTwo,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
- .size(), "Must not clean any files");
- assertEquals(0,
- getCleanStat(hoodieCleanStatsTwo,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
- .size(), "Must not clean any files");
+ assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions
and clean any files");
assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
file2P0C1));
assertTrue(HoodieTestUtils.doesDataFileExist(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
index fda192f..087a283 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -139,9 +139,6 @@ public class TestHoodieSnapshotExporter extends
HoodieClientTestHarness {
new HoodieSnapshotExporter().export(jsc, cfg);
// Check results
- assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".clean")));
- assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".clean.inflight")));
- assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".clean.requested")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".commit")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".commit.requested")));
assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME +
".inflight")));