This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f29811b1a4c [HUDI-7104] Fixing cleaner savepoint interplay to fix edge 
case with incremental cleaning (#10651)
f29811b1a4c is described below

commit f29811b1a4ca9121a5124d63ded147dba7b90b93
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Feb 15 05:16:41 2024 -0800

    [HUDI-7104] Fixing cleaner savepoint interplay to fix edge case with 
incremental cleaning (#10651)
    
    * Fixing incremental cleaning with savepoint
    
    * Addressing feedback
---
 .../table/action/clean/CleanActionExecutor.java    |   3 +-
 .../action/clean/CleanPlanActionExecutor.java      |  12 +-
 .../hudi/table/action/clean/CleanPlanner.java      | 116 ++++++++--
 .../apache/hudi/table/action/TestCleanPlanner.java | 247 ++++++++++++++++++++-
 .../hudi/utils/TestMetadataConversionUtils.java    |   4 +-
 .../functional/TestExternalPathHandling.java       |   5 +-
 .../java/org/apache/hudi/table/TestCleaner.java    |   7 +-
 .../testutils/HoodieSparkClientTestHarness.java    |   4 +-
 hudi-common/src/main/avro/HoodieCleanMetadata.avsc |  11 +-
 hudi-common/src/main/avro/HoodieCleanerPlan.avsc   |  11 +-
 .../clean/CleanPlanV1MigrationHandler.java         |   3 +-
 .../clean/CleanPlanV2MigrationHandler.java         |   3 +-
 .../org/apache/hudi/common/util/CleanerUtils.java  |   5 +-
 .../table/view/TestIncrementalFSViewSync.java      |   2 +-
 .../hudi/common/testutils/HoodieTestTable.java     |   8 +-
 .../hudi/common/util/TestClusteringUtils.java      |   6 +-
 16 files changed, 395 insertions(+), 52 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 40d91b63394..61c0eeeffb0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -219,7 +219,8 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
       HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
           inflightInstant.getTimestamp(),
           Option.of(timer.endTimer()),
-          cleanStats
+          cleanStats,
+          cleanerPlan.getExtraMetadata()
       );
       if (!skipLocking) {
         this.txnManager.beginTransaction(Option.of(inflightInstant), 
Option.empty());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index a70bfd256c0..723a95bb218 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -49,11 +49,11 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.MapUtils.nonEmpty;
+import static 
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
 
 public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, 
I, K, O, Option<HoodieCleanerPlan>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CleanPlanActionExecutor.class);
-
   private final Option<Map<String, String>> extraMetadata;
 
   public CleanPlanActionExecutor(HoodieEngineContext context,
@@ -142,12 +142,20 @@ public class CleanPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I
           .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), 
x.getState().name())).orElse(null),
           planner.getLastCompletedCommitTimestamp(),
           config.getCleanerPolicy().name(), Collections.emptyMap(),
-          CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, 
partitionsToDelete);
+          CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, 
partitionsToDelete, prepareExtraMetadata(planner.getSavepointedTimestamps()));
     } catch (IOException e) {
       throw new HoodieIOException("Failed to schedule clean operation", e);
     }
   }
 
+  private Map<String, String> prepareExtraMetadata(List<String> 
savepointedTimestamps) {
+    if (savepointedTimestamps.isEmpty()) {
+      return Collections.emptyMap();
+    } else {
+      return Collections.singletonMap(SAVEPOINTED_TIMESTAMPS, 
savepointedTimestamps.stream().collect(Collectors.joining(",")));
+    }
+  }
+
   /**
    * Creates a Cleaner plan if there are files to be cleaned and stores them 
in instant file.
    * Cleaner Plan contains absolute file paths.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 0dd516a88d1..19cbe0f91a7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -41,6 +41,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
@@ -55,6 +56,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -78,6 +80,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable 
{
   public static final Integer CLEAN_PLAN_VERSION_1 = 
CleanPlanV1MigrationHandler.VERSION;
   public static final Integer CLEAN_PLAN_VERSION_2 = 
CleanPlanV2MigrationHandler.VERSION;
   public static final Integer LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2;
+  public static final String SAVEPOINTED_TIMESTAMPS = "savepointed_timestamps";
 
   private final SyncableFileSystemView fileSystemView;
   private final HoodieTimeline commitTimeline;
@@ -86,6 +89,7 @@ public class CleanPlanner<T, I, K, O> implements Serializable 
{
   private final HoodieTable<T, I, K, O> hoodieTable;
   private final HoodieWriteConfig config;
   private transient HoodieEngineContext context;
+  private List<String> savepointedTimestamps;
 
   public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> 
hoodieTable, HoodieWriteConfig config) {
     this.context = context;
@@ -109,25 +113,43 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
       LOG.info("Load all partitions and files into file system view in 
advance.");
       fileSystemView.loadAllPartitions();
     }
+    // collect savepointed timestamps to be assist with incremental cleaning. 
For non-partitioned and metadata table, we may not need this.
+    this.savepointedTimestamps = hoodieTable.isMetadataTable() ? 
Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? 
hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
+        : Collections.EMPTY_LIST);
+  }
+
+  /**
+   * @return list of savepointed timestamps in active timeline as of this 
clean planning.
+   */
+  List<String> getSavepointedTimestamps() {
+    return this.savepointedTimestamps;
   }
 
   /**
    * Get the list of data file names savepointed.
    */
   public Stream<String> getSavepointedDataFiles(String savepointTime) {
-    if (!hoodieTable.getSavepointTimestamps().contains(savepointTime)) {
+    HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
+    return metadata.getPartitionMetadata().values().stream().flatMap(s -> 
s.getSavepointDataFile().stream());
+  }
+
+  private Stream<String> getPartitionsFromSavepoint(String savepointTime) {
+    HoodieSavepointMetadata metadata = getSavepointMetadata(savepointTime);
+    return metadata.getPartitionMetadata().keySet().stream();
+  }
+
+  private HoodieSavepointMetadata getSavepointMetadata(String 
savepointTimestamp) {
+    if (!hoodieTable.getSavepointTimestamps().contains(savepointTimestamp)) {
       throw new HoodieSavepointException(
-          "Could not get data files for savepoint " + savepointTime + ". No 
such savepoint.");
+          "Could not get data files for savepoint " + savepointTimestamp + ". 
No such savepoint.");
     }
-    HoodieInstant instant = new HoodieInstant(false, 
HoodieTimeline.SAVEPOINT_ACTION, savepointTime);
-    HoodieSavepointMetadata metadata;
+    HoodieInstant instant = new HoodieInstant(false, 
HoodieTimeline.SAVEPOINT_ACTION, savepointTimestamp);
     try {
-      metadata = TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
+      return TimelineMetadataUtils.deserializeHoodieSavepointMetadata(
           hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
     } catch (IOException e) {
-      throw new HoodieSavepointException("Could not get savepointed data files 
for savepoint " + savepointTime, e);
+      throw new HoodieSavepointException("Could not get savepointed data files 
for savepoint " + savepointTimestamp, e);
     }
-    return metadata.getPartitionMetadata().values().stream().flatMap(s -> 
s.getSavepointDataFile().stream());
   }
 
   /**
@@ -191,25 +213,71 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
     LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths 
that have since changed "
         + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
         + ". New Instant to retain : " + newInstantToRetain);
-    return 
hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
+
+    List<String> incrementalPartitions = 
hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
         instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS,
             cleanMetadata.getEarliestCommitToRetain()) && 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
-            HoodieTimeline.LESSER_THAN, 
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
-              try {
-                if 
(HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
-                  HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
-                      
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
-                  return 
Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(),
 replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
-                } else {
-                  HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-                      
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
-                          HoodieCommitMetadata.class);
-                  return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
-                }
-              } catch (IOException e) {
-                throw new HoodieIOException(e.getMessage(), e);
-              }
-            }).distinct().collect(Collectors.toList());
+            HoodieTimeline.LESSER_THAN, 
newInstantToRetain.get().getTimestamp()))
+        
.flatMap(this::getPartitionsForInstants).distinct().collect(Collectors.toList());
+
+    // If any savepoint is removed b/w previous clean and this clean planning, 
lets include the partitions of interest.
+    // for metadata table and non partitioned table, we do not need this 
additional processing.
+    if (hoodieTable.isMetadataTable() || !hoodieTable.isPartitioned()) {
+      return incrementalPartitions;
+    }
+
+    List<String> partitionsFromDeletedSavepoints = 
getPartitionsFromDeletedSavepoint(cleanMetadata);
+    LOG.info("Including partitions part of savepointed commits which was 
removed after last known clean " + partitionsFromDeletedSavepoints.toString());
+    List<String> partitionsOfInterest = new ArrayList<>(incrementalPartitions);
+    partitionsOfInterest.addAll(partitionsFromDeletedSavepoints);
+    return 
partitionsOfInterest.stream().distinct().collect(Collectors.toList());
+  }
+
+  private List<String> getPartitionsFromDeletedSavepoint(HoodieCleanMetadata 
cleanMetadata) {
+    List<String> savepointedTimestampsFromLastClean = 
Arrays.stream(cleanMetadata.getExtraMetadata()
+            .getOrDefault(SAVEPOINTED_TIMESTAMPS, 
StringUtils.EMPTY_STRING).split(","))
+        .filter(partition -> 
!StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
+    if (savepointedTimestampsFromLastClean.isEmpty()) {
+      return Collections.emptyList();
+    }
+    // check for any savepointed removed in latest compared to previous saved 
list
+    List<String> removedSavepointedTimestamps = new 
ArrayList<>(savepointedTimestampsFromLastClean);
+    removedSavepointedTimestamps.removeAll(savepointedTimestamps);
+    if (removedSavepointedTimestamps.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    // fetch list of partitions from the removed savepoints and add it to 
return list
+    return removedSavepointedTimestamps.stream().flatMap(savepointCommit -> {
+      Option<HoodieInstant> instantOption = 
hoodieTable.getCompletedCommitsTimeline().filter(instant -> 
instant.getTimestamp().equals(savepointCommit)).firstInstant();
+      if (!instantOption.isPresent()) {
+        LOG.warn("Skipping to process a commit for which savepoint was removed 
as the instant moved to archived timeline already");
+      }
+      HoodieInstant instant = instantOption.get();
+      return getPartitionsForInstants(instant);
+    }).collect(Collectors.toList());
+  }
+
+  /**
+   * Fetch partitions updated as part of a HoodieInstant.
+   * @param instant {@link HoodieInstant} of interest.
+   * @return partitions that were part of {@link HoodieInstant} given.
+   */
+  private Stream<String> getPartitionsForInstants(HoodieInstant instant) {
+    try {
+      if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
+        HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
+            hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+        return 
Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(),
 replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
+      } else {
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+            
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+                HoodieCommitMetadata.class);
+        return commitMetadata.getPartitionToWriteStats().keySet().stream();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException(e.getMessage(), e);
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
index e5a528b9382..4268cc36d47 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.table.action;
 
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -26,16 +28,20 @@ import 
org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.CleanFileInfo;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -61,6 +67,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.util.CleanerUtils.CLEAN_METADATA_VERSION_2;
+import static 
org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -72,6 +80,9 @@ public class TestCleanPlanner {
   private final HoodieTable<?, ?, ?, ?> mockHoodieTable = 
mock(HoodieTable.class);
 
   private SyncableFileSystemView mockFsView;
+  private static String PARTITION1 = "partition1";
+  private static String PARTITION2 = "partition2";
+  private static String PARTITION3 = "partition3";
 
   @BeforeEach
   void setUp() {
@@ -93,7 +104,7 @@ public class TestCleanPlanner {
   @ParameterizedTest
   @MethodSource("testCases")
   void testGetDeletePaths(HoodieWriteConfig config, String earliestInstant, 
List<HoodieFileGroup> allFileGroups, List<Pair<String, Option<byte[]>>> 
savepoints,
-                         List<HoodieFileGroup> replacedFileGroups, 
Pair<Boolean, List<CleanFileInfo>> expected) {
+                          List<HoodieFileGroup> replacedFileGroups, 
Pair<Boolean, List<CleanFileInfo>> expected) throws IOException {
 
     // setup savepoint mocks
     Set<String> savepointTimestamps = 
savepoints.stream().map(Pair::getLeft).collect(Collectors.toSet());
@@ -122,10 +133,48 @@ public class TestCleanPlanner {
     assertEquals(expected, actual);
   }
 
+  @ParameterizedTest
+  @MethodSource("incrCleaningPartitionsTestCases")
+  void testPartitionsForIncrCleaning(HoodieWriteConfig config, String 
earliestInstant,
+                                     String lastCompletedTimeInLastClean, 
String lastCleanInstant, String earliestInstantsInLastClean, List<String> 
partitionsInLastClean,
+                                     Map<String, List<String>> 
savepointsTrackedInLastClean, Map<String, List<String>> 
activeInstantsPartitions,
+                                     Map<String, List<String>> savepoints, 
List<String> expectedPartitions) throws IOException {
+    HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
+    when(mockHoodieTable.getActiveTimeline()).thenReturn(activeTimeline);
+    // setup savepoint mocks
+    Set<String> savepointTimestamps = 
savepoints.keySet().stream().collect(Collectors.toSet());
+    
when(mockHoodieTable.getSavepointTimestamps()).thenReturn(savepointTimestamps);
+    if (!savepoints.isEmpty()) {
+      for (Map.Entry<String, List<String>> entry: savepoints.entrySet()) {
+        Pair<HoodieSavepointMetadata, Option<byte[]>> 
savepointMetadataOptionPair = getSavepointMetadata(entry.getValue());
+        HoodieInstant instant = new HoodieInstant(false, 
HoodieTimeline.SAVEPOINT_ACTION, entry.getKey());
+        
when(activeTimeline.getInstantDetails(instant)).thenReturn(savepointMetadataOptionPair.getRight());
+      }
+    }
+
+    // prepare last Clean Metadata
+    Pair<HoodieCleanMetadata, Option<byte[]>> cleanMetadataOptionPair =
+        getCleanCommitMetadata(partitionsInLastClean, lastCleanInstant, 
earliestInstantsInLastClean, lastCompletedTimeInLastClean, 
savepointsTrackedInLastClean.keySet());
+    mockLastCleanCommit(mockHoodieTable, lastCleanInstant, 
earliestInstantsInLastClean, activeTimeline, cleanMetadataOptionPair);
+    mockFewActiveInstants(mockHoodieTable, activeInstantsPartitions, 
savepointsTrackedInLastClean);
+
+    // Trigger clean and validate partitions to clean.
+    CleanPlanner<?, ?, ?, ?> cleanPlanner = new CleanPlanner<>(context, 
mockHoodieTable, config);
+    HoodieInstant earliestCommitToRetain = new 
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", earliestInstant);
+    List<String> partitionsToClean = 
cleanPlanner.getPartitionPathsToClean(Option.of(earliestCommitToRetain));
+    Collections.sort(expectedPartitions);
+    Collections.sort(partitionsToClean);
+    assertEquals(expectedPartitions, partitionsToClean);
+  }
+
   static Stream<Arguments> testCases() {
     return Stream.concat(keepLatestByHoursOrCommitsArgs(), 
keepLatestVersionsArgs());
   }
 
+  static Stream<Arguments> incrCleaningPartitionsTestCases() {
+    return keepLatestByHoursOrCommitsArgsIncrCleanPartitions();
+  }
+
   static Stream<Arguments> keepLatestVersionsArgs() {
     HoodieWriteConfig keepLatestVersionsConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp")
         .withCleanConfig(HoodieCleanConfig.newBuilder()
@@ -278,6 +327,99 @@ public class TestCleanPlanner {
         Collections.emptyList(),
         Collections.singletonList(replacedFileGroup),
         Pair.of(false, Collections.emptyList())));
+    return arguments.stream();
+  }
+
+  static Stream<Arguments> keepLatestByHoursOrCommitsArgsIncrCleanPartitions() 
{
+    String earliestInstant = "20231204194919610";
+    String earliestInstantPlusTwoDays =  "20231206194919610";
+    String lastCleanInstant = earliestInstantPlusTwoDays;
+    String earliestInstantMinusThreeDays = "20231201194919610";
+    String earliestInstantMinusFourDays = "20231130194919610";
+    String earliestInstantMinusFiveDays = "20231129194919610";
+    String earliestInstantMinusSixDays = "20231128194919610";
+    String earliestInstantInLastClean = earliestInstantMinusSixDays;
+    String lastCompletedInLastClean = earliestInstantMinusSixDays;
+    String earliestInstantMinusOneWeek =   "20231127194919610";
+    String savepoint2 = earliestInstantMinusOneWeek;
+    String earliestInstantMinusOneMonth =  "20231104194919610";
+    String savepoint3 = earliestInstantMinusOneMonth;
+
+    List<String> threePartitionsInActiveTimeline = Arrays.asList(PARTITION1, 
PARTITION2, PARTITION3);
+    Map<String, List<String>> activeInstantsPartitionsMap3 = new HashMap<>();
+    activeInstantsPartitionsMap3.put(earliestInstantMinusThreeDays, 
threePartitionsInActiveTimeline);
+    activeInstantsPartitionsMap3.put(earliestInstantMinusFourDays, 
threePartitionsInActiveTimeline);
+    activeInstantsPartitionsMap3.put(earliestInstantMinusFiveDays, 
threePartitionsInActiveTimeline);
+
+    List<String> twoPartitionsInActiveTimeline = Arrays.asList(PARTITION2, 
PARTITION3);
+    Map<String, List<String>> activeInstantsPartitionsMap2 = new HashMap<>();
+    activeInstantsPartitionsMap2.put(earliestInstantMinusThreeDays, 
twoPartitionsInActiveTimeline);
+    activeInstantsPartitionsMap2.put(earliestInstantMinusFourDays, 
twoPartitionsInActiveTimeline);
+    activeInstantsPartitionsMap2.put(earliestInstantMinusFiveDays, 
twoPartitionsInActiveTimeline);
+
+    List<Arguments> arguments = new ArrayList<>();
+
+    // no savepoints tracked in last clean and no additional savepoints. all 
partitions in uncleaned instants should be expected
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1), 
Collections.emptyMap(),
+        activeInstantsPartitionsMap3, Collections.emptyMap(), 
threePartitionsInActiveTimeline));
+
+    // a new savepoint is added after last clean. but rest of uncleaned 
touches all partitions, and so all partitions are expected
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1), 
Collections.emptyMap(),
+        activeInstantsPartitionsMap3, Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), threePartitionsInActiveTimeline));
+
+    // previous clean tracks a savepoint which exists in timeline still. only 
2 partitions are touched by uncleaned instants. only 2 partitions are expected
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
+        activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline));
+
+    // savepoint tracked in previous clean was removed(touching partition1). 
latest uncleaned touched 2 other partitions. So, in total 3 partitions are 
expected.
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
+        activeInstantsPartitionsMap2, Collections.emptyMap(), 
threePartitionsInActiveTimeline));
+
+    // previous savepoint still exists and touches partition1. uncleaned 
touches only partition2 and partition3. expected partition2 and partition3.
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
+        activeInstantsPartitionsMap2, Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)), twoPartitionsInActiveTimeline));
+
+    // a new savepoint was added compared to previous clean. all 2 partitions 
are expected since uncleaned commits touched just 2 partitions.
+    Map<String, List<String>> latestSavepoints = new HashMap<>();
+    latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
+    latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION1));
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(PARTITION1)),
+        activeInstantsPartitionsMap2, latestSavepoints, 
twoPartitionsInActiveTimeline));
+
+    // 2 savepoints were tracked in previous clean. one of them is removed in 
latest. A partition which was part of the removed savepoint should be added in 
final
+    // list of partitions to clean
+    Map<String, List<String>> previousSavepoints = new HashMap<>();
+    latestSavepoints.put(savepoint2, Collections.singletonList(PARTITION1));
+    latestSavepoints.put(savepoint3, Collections.singletonList(PARTITION2));
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+        previousSavepoints, activeInstantsPartitionsMap2, 
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), 
twoPartitionsInActiveTimeline));
+
+    // 2 savepoints were tracked in previous clean. one of them is removed in 
latest. But a partition part of removed savepoint is already touched by 
uncleaned commits.
+    // so we expect all 3 partitions to be in final list.
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(PARTITION1),
+        previousSavepoints, activeInstantsPartitionsMap3, 
Collections.singletonMap(savepoint3, Collections.singletonList(PARTITION2)), 
threePartitionsInActiveTimeline));
+
+    // unpartitioned test case. savepoint removed.
+    List<String> unPartitionsInActiveTimeline = 
Arrays.asList(StringUtils.EMPTY_STRING);
+    Map<String, List<String>> activeInstantsUnPartitionsMap = new HashMap<>();
+    activeInstantsUnPartitionsMap.put(earliestInstantMinusThreeDays, 
unPartitionsInActiveTimeline);
+
+    
arguments.addAll(buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(
+        earliestInstant, lastCompletedInLastClean, lastCleanInstant, 
earliestInstantInLastClean, Collections.singletonList(StringUtils.EMPTY_STRING),
+        Collections.singletonMap(savepoint2, 
Collections.singletonList(StringUtils.EMPTY_STRING)),
+        activeInstantsUnPartitionsMap, Collections.emptyMap(), 
unPartitionsInActiveTimeline));
 
     return arguments.stream();
   }
@@ -307,9 +449,29 @@ public class TestCleanPlanner {
         Arguments.of(getCleanByCommitsConfig(), earliestInstant, 
allFileGroups, savepoints, replacedFileGroups, expected));
   }
 
+  // helper to build common cases for the two policies
+  private static List<Arguments> 
buildArgumentsForCleanByHoursAndCommitsIncrCleanParitionsCases(String 
earliestInstant,
+                                                                               
                 String latestCompletedInLastClean,
+                                                                               
                 String lastKnownCleanInstantTime,
+                                                                               
                 String earliestInstantInLastClean,
+                                                                               
                 List<String> partitionsInLastClean,
+                                                                               
                 Map<String, List<String>> savepointsTrackedInLastClean,
+                                                                               
                 Map<String, List<String>> activeInstantsToPartitionsMap,
+                                                                               
                 Map<String, List<String>> savepoints,
+                                                                               
                 List<String> expectedPartitions) {
+    return Arrays.asList(Arguments.of(getCleanByHoursConfig(), 
earliestInstant, latestCompletedInLastClean, lastKnownCleanInstantTime,
+            earliestInstantInLastClean, partitionsInLastClean, 
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, 
expectedPartitions),
+        Arguments.of(getCleanByCommitsConfig(), earliestInstant, 
latestCompletedInLastClean, lastKnownCleanInstantTime,
+            earliestInstantInLastClean, partitionsInLastClean, 
savepointsTrackedInLastClean, activeInstantsToPartitionsMap, savepoints, 
expectedPartitions));
+  }
+
   private static HoodieFileGroup buildFileGroup(List<String> 
baseFileCommitTimes) {
+    return buildFileGroup(baseFileCommitTimes, PARTITION1);
+  }
+
+  private static HoodieFileGroup buildFileGroup(List<String> 
baseFileCommitTimes, String partition) {
     String fileGroup = UUID.randomUUID() + "-0";
-    HoodieFileGroupId fileGroupId =  new HoodieFileGroupId("partition1", 
UUID.randomUUID().toString());
+    HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partition, 
UUID.randomUUID().toString());
     HoodieTimeline timeline = mock(HoodieTimeline.class);
     when(timeline.lastInstant()).thenReturn(Option.of(new 
HoodieInstant(HoodieInstant.State.COMPLETED, "COMMIT", 
baseFileCommitTimes.get(baseFileCommitTimes.size() - 1))));
     HoodieFileGroup group = new HoodieFileGroup(fileGroupId, timeline);
@@ -333,4 +495,85 @@ public class TestCleanPlanner {
       throw new UncheckedIOException(ex);
     }
   }
+
+  private static Pair<HoodieCleanMetadata, Option<byte[]>> 
getCleanCommitMetadata(List<String> partitions, String instantTime, String 
earliestCommitToRetain,
+                                                                               
   String lastCompletedTime, Set<String> savepointsToTrack) {
+    try {
+      Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new 
HashMap<>();
+      partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieCleanPartitionMetadata(partition, 
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
+          Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), false)));
+      Map<String, String> extraMetadata = new HashMap<>();
+      if (!savepointsToTrack.isEmpty()) {
+        extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointsToTrack.stream().collect(Collectors.joining(",")));
+      }
+      HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
+          CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, extraMetadata);
+      return Pair.of(cleanMetadata, 
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+  }
+
+  private static Pair<HoodieSavepointMetadata, Option<byte[]>> 
getSavepointMetadata(List<String> partitions) {
+    try {
+      Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new 
HashMap<>();
+      partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieSavepointPartitionMetadata(partition, Collections.emptyList())));
+      HoodieSavepointMetadata savepointMetadata =
+          new HoodieSavepointMetadata("user", 1L, "comments", 
partitionMetadata, 1);
+      return Pair.of(savepointMetadata, 
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata));
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+  }
+
+  private static void mockLastCleanCommit(HoodieTable hoodieTable, String 
timestamp, String earliestCommitToRetain, HoodieActiveTimeline activeTimeline,
+                                          Pair<HoodieCleanMetadata, 
Option<byte[]>> cleanMetadata)
+      throws IOException {
+    HoodieDefaultTimeline cleanTimeline = mock(HoodieDefaultTimeline.class);
+    when(activeTimeline.getCleanerTimeline()).thenReturn(cleanTimeline);
+    when(hoodieTable.getCleanTimeline()).thenReturn(cleanTimeline);
+    HoodieDefaultTimeline completedCleanTimeline = 
mock(HoodieDefaultTimeline.class);
+    
when(cleanTimeline.filterCompletedInstants()).thenReturn(completedCleanTimeline);
+    HoodieInstant latestCleanInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION, 
timestamp);
+    
when(completedCleanTimeline.lastInstant()).thenReturn(Option.of(latestCleanInstant));
+    when(activeTimeline.isEmpty(latestCleanInstant)).thenReturn(false);
+    
when(activeTimeline.getInstantDetails(latestCleanInstant)).thenReturn(cleanMetadata.getRight());
+
+    HoodieDefaultTimeline commitsTimeline = mock(HoodieDefaultTimeline.class);
+    when(activeTimeline.getCommitsTimeline()).thenReturn(commitsTimeline);
+    
when(commitsTimeline.isBeforeTimelineStarts(earliestCommitToRetain)).thenReturn(false);
+
+    when(hoodieTable.isPartitioned()).thenReturn(true);
+    when(hoodieTable.isMetadataTable()).thenReturn(false);
+  }
+
+  private static void mockFewActiveInstants(HoodieTable hoodieTable, 
Map<String, List<String>> activeInstantsToPartitions,
+                                            Map<String, List<String>> 
savepointedCommitsToAdd)
+      throws IOException {
+    HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
+    List<HoodieInstant> instants = new ArrayList<>();
+    Map<String, List<String>> instantstoProcess = new HashMap<>();
+    instantstoProcess.putAll(activeInstantsToPartitions);
+    instantstoProcess.putAll(savepointedCommitsToAdd);
+    instantstoProcess.forEach((k,v) -> {
+      HoodieInstant hoodieInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, k);
+      instants.add(hoodieInstant);
+      Map<String, List<HoodieWriteStat>> partitionToWriteStats = new 
HashMap<>();
+      v.forEach(partition -> partitionToWriteStats.put(partition, 
Collections.emptyList()));
+      HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+      v.forEach(partition -> {
+        commitMetadata.getPartitionToWriteStats().put(partition, 
Collections.emptyList());
+      });
+      try {
+        
when(hoodieTable.getActiveTimeline().getInstantDetails(hoodieInstant)).thenReturn(TimelineMetadataUtils.serializeCommitMetadata(commitMetadata));
+      } catch (IOException e) {
+        throw new RuntimeException("Should not have failed", e);
+      }
+    });
+
+    commitsTimeline.setInstants(instants);
+    
when(hoodieTable.getCompletedCommitsTimeline()).thenReturn(commitsTimeline);
+    when(hoodieTable.isPartitioned()).thenReturn(true);
+    when(hoodieTable.isMetadataTable()).thenReturn(false);
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index 8d7380631d4..f050f91d169 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -392,7 +392,7 @@ public class TestMetadataConversionUtils extends 
HoodieCommonTestHarness {
 
   private void createCleanMetadata(String instantTime) throws IOException {
     HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""),
-        "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>());
+        "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>(), Collections.EMPTY_MAP);
     HoodieCleanStat cleanStats = new HoodieCleanStat(
         HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
         HoodieTestUtils.DEFAULT_PARTITION_PATHS[new 
Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
@@ -401,7 +401,7 @@ public class TestMetadataConversionUtils extends 
HoodieCommonTestHarness {
         Collections.emptyList(),
         instantTime,
         "");
-    HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats));
+    HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats), Collections.EMPTY_MAP);
     HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, 
cleanMetadata);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index 970ff9b0c64..039be9048a2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -154,7 +154,8 @@ public class TestExternalPathHandling extends 
HoodieClientTestBase {
     HoodieCleanMetadata cleanMetadata = CleanerUtils.convertCleanMetadata(
         cleanTime,
         Option.empty(),
-        cleanStats);
+        cleanStats,
+        Collections.EMPTY_MAP);
     try (HoodieTableMetadataWriter hoodieTableMetadataWriter = 
(HoodieTableMetadataWriter) writeClient.initTable(WriteOperationType.UPSERT, 
Option.of(cleanTime)).getMetadataWriter(cleanTime).get()) {
       hoodieTableMetadataWriter.update(cleanMetadata, cleanTime);
       metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, 
inflightClean,
@@ -292,6 +293,6 @@ public class TestExternalPathHandling extends 
HoodieClientTestBase {
     return new HoodieCleanerPlan(earliestInstantToRetain,
         latestCommit,
         writeConfig.getCleanerPolicy().name(), Collections.emptyMap(),
-        CleanPlanner.LATEST_CLEAN_PLAN_VERSION, 
filePathsToBeDeletedPerPartition, Collections.emptyList());
+        CleanPlanner.LATEST_CLEAN_PLAN_VERSION, 
filePathsToBeDeletedPerPartition, Collections.emptyList(), 
Collections.EMPTY_MAP);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 1644103f050..001ea19d168 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -775,7 +775,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
     HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
         instantTime,
         Option.of(0L),
-        Arrays.asList(cleanStat1, cleanStat2)
+        Arrays.asList(cleanStat1, cleanStat2),
+        Collections.EMPTY_MAP
     );
     metadata.setVersion(CleanerUtils.CLEAN_METADATA_VERSION_1);
 
@@ -1132,9 +1133,9 @@ public class TestCleaner extends HoodieCleanerTestBase {
 
       // add clean instant
       HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""),
-          "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>());
+          "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>(), Collections.emptyMap());
       HoodieCleanMetadata cleanMeta = new HoodieCleanMetadata("", 0L, 0,
-          "20", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>());
+          "20", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), Collections.emptyMap());
       testTable.addClean("30", cleanerPlan, cleanMeta);
 
       // add file in partition "part_2"
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index e0a0ab69a90..25f1b2fa4c7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -643,7 +643,7 @@ public abstract class HoodieSparkClientTestHarness extends 
HoodieWriterClientTes
 
   public HoodieInstant createCleanMetadata(String instantTime, boolean 
inflightOnly, boolean isEmptyForAll, boolean isEmptyCompleted) throws 
IOException {
     HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""), "", "",
-            new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>());
+            new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>(), Collections.EMPTY_MAP);
     if (inflightOnly) {
       HoodieTestTable.of(metaClient).addInflightClean(instantTime, 
cleanerPlan);
     } else {
@@ -655,7 +655,7 @@ public abstract class HoodieSparkClientTestHarness extends 
HoodieWriterClientTes
               Collections.emptyList(),
               instantTime,
               "");
-      HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats));
+      HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats), Collections.EMPTY_MAP);
       HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, 
cleanMetadata, isEmptyForAll, isEmptyCompleted);
     }
     return new HoodieInstant(inflightOnly, "clean", instantTime);
diff --git a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc 
b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
index e51ecd0300c..c47690e982b 100644
--- a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc
@@ -41,6 +41,15 @@
           "default" : null
         }],
         "default" : null
-     }
+     },
+   {
+      "name":"extraMetadata",
+      "type":["null", {
+         "type":"map",
+         "values":"string",
+         "default": null
+      }],
+      "default": null
+   }
  ]
 }
diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc 
b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
index 42842c8be29..de0d9fccc1d 100644
--- a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
+++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc
@@ -105,6 +105,15 @@
       { "type":"array", "items":"string"}
       ],
       "default": null
-    }
+    },
+   {
+      "name":"extraMetadata",
+      "type":["null", {
+         "type":"map",
+         "values":"string",
+         "default": null
+      }],
+      "default": null
+   }
   ]
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
index 844376cbbfd..a4c4cefa2a2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -63,6 +64,6 @@ public class CleanPlanV1MigrationHandler extends 
AbstractMigratorBase<HoodieClea
         .map(e -> Pair.of(e.getKey(), e.getValue().stream().map(v -> new 
Path(v.getFilePath()).getName())
             
.collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, 
Pair::getValue));
     return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), 
plan.getLastCompletedCommitTimestamp(),
-        plan.getPolicy(), filesPerPartition, VERSION, new HashMap<>(), new 
ArrayList<>());
+        plan.getPolicy(), filesPerPartition, VERSION, new HashMap<>(), new 
ArrayList<>(), Collections.EMPTY_MAP);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
index aacdd26aeda..573b65bfb21 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hadoop.fs.Path;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +58,7 @@ public class CleanPlanV2MigrationHandler extends 
AbstractMigratorBase<HoodieClea
                 new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), 
e.getKey()), v).toString(), false))
             
.collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, 
Pair::getValue));
     return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), 
plan.getLastCompletedCommitTimestamp(),
-        plan.getPolicy(), new HashMap<>(), VERSION, filePathsPerPartition, new 
ArrayList<>());
+        plan.getPolicy(), new HashMap<>(), VERSION, filePathsPerPartition, new 
ArrayList<>(), Collections.emptyMap());
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 899bd673665..0fa758c21e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -64,7 +64,8 @@ public class CleanerUtils {
 
   public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime,
                                                          Option<Long> 
durationInMs,
-                                                         List<HoodieCleanStat> 
cleanStats) {
+                                                         List<HoodieCleanStat> 
cleanStats,
+                                                         Map<String, String> 
extraMetadatafromCleanPlan) {
     Map<String, HoodieCleanPartitionMetadata> partitionMetadataMap = new 
HashMap<>();
     Map<String, HoodieCleanPartitionMetadata> partitionBootstrapMetadataMap = 
new HashMap<>();
 
@@ -92,7 +93,7 @@ public class CleanerUtils {
     }
 
     return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() 
-> -1L), totalDeleted, earliestCommitToRetain,
-        lastCompletedCommitTimestamp, partitionMetadataMap, 
CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap);
+        lastCompletedCommitTimestamp, partitionMetadataMap, 
CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap, 
extraMetadatafromCleanPlan);
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 860340d5d6c..852f916c1a4 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -631,7 +631,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
     HoodieInstant cleanInflightInstant = new HoodieInstant(true, 
HoodieTimeline.CLEAN_ACTION, cleanInstant);
     metaClient.getActiveTimeline().createNewInstant(cleanInflightInstant);
-    HoodieCleanMetadata cleanMetadata = 
CleanerUtils.convertCleanMetadata(cleanInstant, Option.empty(), cleanStats);
+    HoodieCleanMetadata cleanMetadata = 
CleanerUtils.convertCleanMetadata(cleanInstant, Option.empty(), cleanStats, 
Collections.EMPTY_MAP);
     metaClient.getActiveTimeline().saveAsComplete(cleanInflightInstant,
         TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 4ff3b2b7e46..ca13ff79c52 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -348,7 +348,7 @@ public class HoodieTestTable {
 
   public HoodieTestTable addClean(String instantTime) throws IOException {
     HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING),
-        EMPTY_STRING, EMPTY_STRING, new HashMap<>(), 
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
+        EMPTY_STRING, EMPTY_STRING, new HashMap<>(), 
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), 
Collections.EMPTY_MAP);
     HoodieCleanStat cleanStats = new HoodieCleanStat(
         HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
         
HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
@@ -357,19 +357,19 @@ public class HoodieTestTable {
         Collections.emptyList(),
         instantTime,
         "");
-    HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats));
+    HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, 
Option.of(0L), Collections.singletonList(cleanStats), Collections.EMPTY_MAP);
     return HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, 
cleanMetadata);
   }
 
   public Pair<HoodieCleanerPlan, HoodieCleanMetadata> 
getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) {
     HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING),
-        EMPTY_STRING, EMPTY_STRING, new HashMap<>(), 
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>());
+        EMPTY_STRING, EMPTY_STRING, new HashMap<>(), 
CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>(), 
Collections.EMPTY_MAP);
     List<HoodieCleanStat> cleanStats = new ArrayList<>();
     for (Map.Entry<String, List<String>> entry : 
testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) {
       cleanStats.add(new 
HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
           entry.getKey(), entry.getValue(), entry.getValue(), 
Collections.emptyList(), commitTime, ""));
     }
-    return Pair.of(cleanerPlan, convertCleanMetadata(commitTime, 
Option.of(0L), cleanStats));
+    return Pair.of(cleanerPlan, convertCleanMetadata(commitTime, 
Option.of(0L), cleanStats, Collections.EMPTY_MAP));
   }
 
   public HoodieTestTable addRequestedRollback(String instantTime, 
HoodieRollbackPlan plan) throws IOException {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
index 6ee601e3deb..d3375fe5e8a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestClusteringUtils.java
@@ -181,7 +181,7 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant4, 
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
     HoodieInstant inflightInstant4 = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant4,
 Option.empty());
     HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 
1L, 1,
-        completedInstant3.getTimestamp(), "", Collections.emptyMap(), 0, 
Collections.emptyMap());
+        completedInstant3.getTimestamp(), "", Collections.emptyMap(), 0, 
Collections.emptyMap(), Collections.emptyMap());
     metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, 
inflightInstant4,
         TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
     metaClient.reloadActiveTimeline();
@@ -205,11 +205,11 @@ public class TestClusteringUtils extends 
HoodieCommonTestHarness {
     HoodieInstant requestedInstant2 = new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, 
cleanTime1);
     HoodieCleanerPlan cleanerPlan1 = new HoodieCleanerPlan(null, clusterTime1,
         HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(), 
Collections.emptyMap(),
-        CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(), 
Collections.emptyList());
+        CleanPlanV2MigrationHandler.VERSION, Collections.emptyMap(), 
Collections.emptyList(), Collections.EMPTY_MAP);
     metaClient.getActiveTimeline().saveToCleanRequested(requestedInstant2, 
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan1));
     HoodieInstant inflightInstant2 = 
metaClient.getActiveTimeline().transitionCleanRequestedToInflight(requestedInstant2,
 Option.empty());
     HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(cleanTime1, 
1L, 1,
-        "", "", Collections.emptyMap(), 0, Collections.emptyMap());
+        "", "", Collections.emptyMap(), 0, Collections.emptyMap(), 
Collections.emptyMap());
     metaClient.getActiveTimeline().transitionCleanInflightToComplete(true, 
inflightInstant2,
         TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
     metaClient.reloadActiveTimeline();

Reply via email to