nsivabalan commented on code in PR #10651:
URL: https://github.com/apache/hudi/pull/10651#discussion_r1489875786
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -191,25 +213,71 @@ private List<String>
getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths
that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
- return
hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
+
+ List<String> incrementalPartitions =
hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) &&
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
- HoodieTimeline.LESSER_THAN,
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
- try {
- if
(HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
- HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
-
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
- return
Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(),
replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
- } else {
- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
- HoodieCommitMetadata.class);
- return
commitMetadata.getPartitionToWriteStats().keySet().stream();
- }
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- }).distinct().collect(Collectors.toList());
+ HoodieTimeline.LESSER_THAN,
newInstantToRetain.get().getTimestamp()))
+
.flatMap(this::getPartitionsForInstants).distinct().collect(Collectors.toList());
+
+ // If any savepoint is removed b/w previous clean and this clean planning,
lets include the partitions of interest.
+ // for metadata table and non partitioned table, we do not need this
additional processing.
+ if (hoodieTable.isMetadataTable() || !hoodieTable.isPartitioned()) {
+ return incrementalPartitions;
+ }
+
+ List<String> partitionsFromDeletedSavepoints =
getPartitionsFromDeletedSavepoint(cleanMetadata);
+ LOG.info("Including partitions part of savepointed commits which was
removed after last known clean " + partitionsFromDeletedSavepoints.toString());
+ List<String> partitionsOfInterest = new ArrayList<>(incrementalPartitions);
+ partitionsOfInterest.addAll(partitionsFromDeletedSavepoints);
+ return
incrementalPartitions.stream().distinct().collect(Collectors.toList());
Review Comment:
oh, my bad. that was my intention. will fix it
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java:
##########
@@ -122,10 +132,48 @@ void testGetDeletePaths(HoodieWriteConfig config, String
earliestInstant, List<H
assertEquals(expected, actual);
}
+ @ParameterizedTest
+ @MethodSource("incrCleaningPartitionsTestCases")
+ void testPartitionsForIncrCleaning(HoodieWriteConfig config, String
earliestInstant,
Review Comment:
sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]