vinothchandar commented on a change in pull request #1576:
URL: https://github.com/apache/incubator-hudi/pull/1576#discussion_r418632353
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -111,36 +111,74 @@ public CleanPlanner(HoodieTable<T> hoodieTable,
HoodieWriteConfig config) {
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant>
newInstantToRetain) throws IOException {
- if (config.incrementalCleanerModeEnabled() &&
newInstantToRetain.isPresent()
- && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS ==
config.getCleanerPolicy())) {
- Option<HoodieInstant> lastClean =
-
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+ switch (config.getCleanerPolicy()) {
+ case KEEP_LATEST_COMMITS: return
getPartitionPathsForCleanByCommits(newInstantToRetain);
Review comment:
move to separate lines?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -111,36 +111,74 @@ public CleanPlanner(HoodieTable<T> hoodieTable,
HoodieWriteConfig config) {
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant>
newInstantToRetain) throws IOException {
- if (config.incrementalCleanerModeEnabled() &&
newInstantToRetain.isPresent()
- && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS ==
config.getCleanerPolicy())) {
- Option<HoodieInstant> lastClean =
-
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+ switch (config.getCleanerPolicy()) {
+ case KEEP_LATEST_COMMITS: return
getPartitionPathsForCleanByCommits(newInstantToRetain);
+ case KEEP_LATEST_FILE_VERSIONS: return scanAllPartitionsForCleaning();
+ default: throw new IllegalStateException("Unknown Cleaner Policy");
+ }
+ }
+
+ /**
+ * Return partition paths for cleaning by commits mode.
+ * @param instantToRetain Earliest Instant to retain
+ * @return list of partitions
+ * @throws IOException
+ */
+ private List<String>
getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain)
throws IOException {
+ if (!instantToRetain.isPresent() &&
(HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
+ LOG.info("No earliest commit to retain. No need to scan partitions !!");
+ return Collections.emptyList();
+ }
+
+ if (config.incrementalCleanerModeEnabled()) {
+ Option<HoodieInstant> lastClean =
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
- LOG.warn("Incremental Cleaning mode is enabled. Looking up
partition-paths that have since changed "
- + "since last cleaned at " +
cleanMetadata.getEarliestCommitToRetain()
- + ". New Instant to retain : " + newInstantToRetain);
- return hoodieTable.getCompletedCommitsTimeline().getInstants()
- .filter(instant ->
- HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain())
- && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
- ).flatMap(instant -> {
- try {
- HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- return
commitMetadata.getPartitionToWriteStats().keySet().stream();
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- }).distinct().collect(Collectors.toList());
+ return
getPartitionPathsForCleaningUsingIncrementalMode(cleanMetadata,
instantToRetain);
}
}
}
- // Otherwise go to brute force mode of scanning all partitions
- return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
- hoodieTable.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
+ return scanAllPartitionsForCleaning();
+ }
+
+ /**
+ * Use Incremental Mode for finding partition paths.
+ * @param cleanMetadata
+ * @param newInstantToRetain
+ * @return
+ */
+ private List<String>
getPartitionPathsForCleaningUsingIncrementalMode(HoodieCleanMetadata
cleanMetadata,
Review comment:
rename: `getPartitionPathsForIncrementalCleaning()` ?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -111,36 +111,74 @@ public CleanPlanner(HoodieTable<T> hoodieTable,
HoodieWriteConfig config) {
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant>
newInstantToRetain) throws IOException {
- if (config.incrementalCleanerModeEnabled() &&
newInstantToRetain.isPresent()
- && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS ==
config.getCleanerPolicy())) {
- Option<HoodieInstant> lastClean =
-
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+ switch (config.getCleanerPolicy()) {
+ case KEEP_LATEST_COMMITS: return
getPartitionPathsForCleanByCommits(newInstantToRetain);
+ case KEEP_LATEST_FILE_VERSIONS: return scanAllPartitionsForCleaning();
+ default: throw new IllegalStateException("Unknown Cleaner Policy");
+ }
+ }
+
+ /**
+ * Return partition paths for cleaning by commits mode.
+ * @param instantToRetain Earliest Instant to retain
+ * @return list of partitions
+ * @throws IOException
+ */
+ private List<String>
getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain)
throws IOException {
+ if (!instantToRetain.isPresent() &&
(HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
Review comment:
we don't need the second check anymore , due to the switch above?
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -111,36 +111,74 @@ public CleanPlanner(HoodieTable<T> hoodieTable,
HoodieWriteConfig config) {
* @throws IOException when underlying file-system throws this exception
*/
public List<String> getPartitionPathsToClean(Option<HoodieInstant>
newInstantToRetain) throws IOException {
- if (config.incrementalCleanerModeEnabled() &&
newInstantToRetain.isPresent()
- && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS ==
config.getCleanerPolicy())) {
- Option<HoodieInstant> lastClean =
-
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+ switch (config.getCleanerPolicy()) {
+ case KEEP_LATEST_COMMITS: return
getPartitionPathsForCleanByCommits(newInstantToRetain);
+ case KEEP_LATEST_FILE_VERSIONS: return scanAllPartitionsForCleaning();
+ default: throw new IllegalStateException("Unknown Cleaner Policy");
+ }
+ }
+
+ /**
+ * Return partition paths for cleaning by commits mode.
+ * @param instantToRetain Earliest Instant to retain
+ * @return list of partitions
+ * @throws IOException
+ */
+ private List<String>
getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain)
throws IOException {
+ if (!instantToRetain.isPresent() &&
(HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
+ LOG.info("No earliest commit to retain. No need to scan partitions !!");
+ return Collections.emptyList();
+ }
+
+ if (config.incrementalCleanerModeEnabled()) {
+ Option<HoodieInstant> lastClean =
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
- LOG.warn("Incremental Cleaning mode is enabled. Looking up
partition-paths that have since changed "
- + "since last cleaned at " +
cleanMetadata.getEarliestCommitToRetain()
- + ". New Instant to retain : " + newInstantToRetain);
- return hoodieTable.getCompletedCommitsTimeline().getInstants()
- .filter(instant ->
- HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain())
- && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
- ).flatMap(instant -> {
- try {
- HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- return
commitMetadata.getPartitionToWriteStats().keySet().stream();
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- }).distinct().collect(Collectors.toList());
+ return
getPartitionPathsForCleaningUsingIncrementalMode(cleanMetadata,
instantToRetain);
}
}
}
- // Otherwise go to brute force mode of scanning all partitions
- return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
- hoodieTable.getMetaClient().getBasePath(),
config.shouldAssumeDatePartitioning());
+ return scanAllPartitionsForCleaning();
+ }
+
+ /**
+ * Use Incremental Mode for finding partition paths.
+ * @param cleanMetadata
+ * @param newInstantToRetain
+ * @return
+ */
+ private List<String>
getPartitionPathsForCleaningUsingIncrementalMode(HoodieCleanMetadata
cleanMetadata,
+ Option<HoodieInstant> newInstantToRetain) {
+ LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths
that have since changed "
+ + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ + ". New Instant to retain : " + newInstantToRetain);
+ return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
+ instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
+ cleanMetadata.getEarliestCommitToRetain()) &&
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+ HoodieTimeline.LESSER_THAN,
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+ HoodieCommitMetadata.class);
+ return
commitMetadata.getPartitionToWriteStats().keySet().stream();
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }).distinct().collect(Collectors.toList());
+ }
+
+ /**
+ * Scan and list all paritions for cleaning.
+ * @return all partitions paths for the dataset.
+ * @throws IOException
+ */
+ private List<String> scanAllPartitionsForCleaning() throws IOException {
Review comment:
rename: `getPartitionPathsForFullCleaning()`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]