This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch incremental-TableService
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 85270558328b62913a92a83d87d0d3988cdd080f
Author: YueZhang <[email protected]>
AuthorDate: Wed Jan 8 17:55:09 2025 +0800

    finsh base conding && need more test
---
 .../action/BaseTableServicePlanActionExecutor.java | 163 +++++++++++++++++++++
 ...java => IncrementalPartitionAwareStrategy.java} |  28 ++--
 .../cluster/ClusteringPlanActionExecutor.java      |  11 +-
 .../cluster/ClusteringPlanPartitionFilter.java     |  14 +-
 .../cluster/strategy/ClusteringPlanStrategy.java   |   3 +-
 .../PartitionAwareClusteringPlanStrategy.java      |  33 +++--
 .../compact/ScheduleCompactionActionExecutor.java  |   8 +-
 .../BaseHoodieCompactionPlanGenerator.java         |  24 +--
 .../generators/HoodieCompactionPlanGenerator.java  |  18 ++-
 .../HoodieLogCompactionPlanGenerator.java          |  25 +++-
 .../strategy/BoundedIOCompactionStrategy.java      |   5 +-
 .../BoundedPartitionAwareCompactionStrategy.java   |  33 ++++-
 .../compact/strategy/CompactionStrategy.java       |  20 ++-
 .../strategy/CompositeCompactionStrategy.java      |  20 ++-
 .../strategy/DayBasedCompactionStrategy.java       |  12 +-
 .../LogFileNumBasedCompactionStrategy.java         |  15 +-
 .../LogFileSizeBasedCompactionStrategy.java        |  14 +-
 .../PartitionRegexBasedCompactionStrategy.java     |   9 +-
 .../strategy/UnBoundedCompactionStrategy.java      |   3 +-
 .../UnBoundedPartitionAwareCompactionStrategy.java |  14 +-
 .../TestPartitionAwareClusteringPlanStrategy.java  |   6 +-
 ...inkSizeBasedClusteringPlanStrategyRecently.java |   5 +-
 .../TestSparkClusteringPlanPartitionFilter.java    |  10 +-
 .../strategy/TestHoodieCompactionStrategy.java     |  55 ++++---
 .../src/main/avro/HoodieClusteringPlan.avsc        |  10 +-
 .../src/main/avro/HoodieCompactionOperation.avsc   |   8 +
 .../hudi/common/testutils/CompactionTestUtils.java |   2 +-
 .../org/apache/hudi/utils/TestClusteringUtil.java  |   2 +-
 .../org/apache/hudi/utils/TestCompactionUtil.java  |   2 +-
 29 files changed, 437 insertions(+), 135 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseTableServicePlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseTableServicePlanActionExecutor.java
new file mode 100644
index 00000000000..bc3bbfa6f65
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseTableServicePlanActionExecutor.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+
+public abstract class BaseTableServicePlanActionExecutor<T, I, K, O, R> 
extends BaseActionExecutor<T, I, K, O, R> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseTableServicePlanActionExecutor.class);
+
+  public BaseTableServicePlanActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig config,
+                                            HoodieTable<T, I, K, O> table, 
String instantTime) {
+    super(context, config, table, instantTime);
+  }
+
+  /**
+   * Get partitions, if strategy implement `IncrementalPartitionAwareStrategy` 
then return incremental partitions,
+   * otherwise return all partitions of the table
+   * @param strategy
+   * @return
+   */
+  public List<String> getPartitions(Object strategy, TableServiceType type) {
+    if (strategy instanceof IncrementalPartitionAwareStrategy) {
+      try {
+        // get incremental partitions.
+        Set<String> incrementalPartitions = getIncrementalPartitions(type);
+        if (!incrementalPartitions.isEmpty()) {
+          return new ArrayList<>(incrementalPartitions);
+        }
+      } catch (Exception ex) {
+        LOG.warn("Failed to get incremental partitions", ex);
+      }
+    }
+
+    // fall back to get all partitions
+    return FSUtils.getAllPartitionPaths(context, 
table.getMetaClient().getStorage(),
+        config.getMetadataConfig(), table.getMetaClient().getBasePath());
+  }
+
+  private Set<String> getIncrementalPartitions(TableServiceType type) {
+    Pair<Option<HoodieInstant>, List<String>> missingPair = 
fetchMissingPartitions(type);
+    if (!missingPair.getLeft().isPresent()) {
+      // Last complete table service commit maybe archived.
+      return Collections.emptySet();
+    }
+
+    List<String> missingPartitions = missingPair.getRight();
+
+    String leftBoundary = missingPair.getLeft().get().requestedTime();
+    String rightBoundary = instantTime;
+
+    // compute [leftBoundary, rightBoundary) as time window
+    HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    Set<String> partitionsInCommitMeta = 
table.getActiveTimeline().filterCompletedInstants().getCommitsTimeline().getInstants().stream().flatMap(instant
 -> {
+      if (!instant.getAction().equals(CLUSTERING_ACTION)) {
+        String completionTime = instant.getCompletionTime();
+        if (completionTime.compareTo(leftBoundary) <= 0 && 
completionTime.compareTo(rightBoundary) > 0) {
+          try {
+            HoodieCommitMetadata metadata = 
TimelineUtils.getCommitMetadata(instant, activeTimeline);
+            return 
metadata.getWriteStats().stream().map(HoodieWriteStat::getPartitionPath);
+          } catch (IOException e) {
+            throw new HoodieIOException("Failed to get commit meta " + 
instant, e);
+          }
+        }
+      }
+      return Stream.empty();
+    }).collect(Collectors.toSet());
+
+    partitionsInCommitMeta.addAll(missingPartitions);
+    return partitionsInCommitMeta;
+  }
+
+  private Pair<Option<HoodieInstant>, List<String>> 
fetchMissingPartitions(TableServiceType tableServiceType) {
+    Option<HoodieInstant> instant = Option.empty();
+    List<String> missingPartitions = new ArrayList<>();
+
+    switch (tableServiceType) {
+      case COMPACT:
+      case LOG_COMPACT: {
+        Option<HoodieInstant> lastCompactionCommitInstant = 
table.getActiveTimeline()
+            
.filterCompletedInstants().getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).lastInstant();
+        if (lastCompactionCommitInstant.isPresent()) {
+          instant = lastCompactionCommitInstant;
+          String action = tableServiceType.equals(TableServiceType.COMPACT) ? 
HoodieTimeline.COMPACTION_ACTION : HoodieTimeline.LOG_COMPACTION_ACTION;
+          HoodieInstant compactionPlanInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, action,
+              instant.get().requestedTime(), 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);
+          Option<byte[]> details = 
table.getMetaClient().getActiveTimeline().readCompactionPlanAsBytes(compactionPlanInstant);
+          HoodieCompactionPlan compactionPlan = 
CompactionUtils.getCompactionPlan(table.getMetaClient(), details);
+          if (compactionPlan.getMissingSchedulePartitions() != null) {
+            missingPartitions = compactionPlan.getMissingSchedulePartitions();
+          }
+        }
+        break;
+      }
+      case CLUSTER: {
+        Option<HoodieInstant> lastClusteringInstant = 
table.getActiveTimeline().filterCompletedInstants().getLastClusteringInstant();
+        if (lastClusteringInstant.isPresent()) {
+          instant = lastClusteringInstant;
+          Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlan = 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
lastClusteringInstant.get());
+          if (clusteringPlan.isPresent() && 
clusteringPlan.get().getRight().getMissingSchedulePartitions() != null) {
+            missingPartitions = 
clusteringPlan.get().getRight().getMissingSchedulePartitions();
+          }
+        }
+        break;
+      }
+      default:
+        throw new HoodieException("Un-supported incremental table service " + 
tableServiceType);
+    }
+
+    return Pair.of(instant, missingPartitions);
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/IncrementalPartitionAwareStrategy.java
similarity index 50%
copy from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
copy to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/IncrementalPartitionAwareStrategy.java
index ffc437bcd4c..24dfd9cde3c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/IncrementalPartitionAwareStrategy.java
@@ -16,25 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action.compact.strategy;
+package org.apache.hudi.table.action;
 
-import org.apache.hudi.avro.model.HoodieCompactionOperation;
-import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.util.List;
 
 /**
- * UnBoundedCompactionStrategy will not change ordering or filter any 
compaction. It is a pass-through and will compact
- * all the base files which has a log file. This usually means no-intelligence 
on compaction.
+ * Marking strategy interface.
  *
- * @see CompactionStrategy
+ * Any Strategy implement this `IncrementalPartitionAwareStrategy` could have 
the ability to perform incremental partitions processing.
+ * At this time, Incremental partitions should be passed to the current 
strategy.
  */
-public class UnBoundedCompactionStrategy extends CompactionStrategy {
-
-  @Override
-  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
config,
-      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionWorkloads) {
-    return operations;
-  }
+public interface IncrementalPartitionAwareStrategy {
+  
+  /**
+   * Filter the given incremental partitions.
+   * @param writeConfig
+   * @param incrementalPartitions
+   * @return Pair of final processing partition paths and filtered partitions 
which will be recorded as missing partitions.
+   * Different strategies can individually implement whether to record, or 
which partitions to record as missing partitions.
+   */
+  Pair<List<String>, List<String>> filterPartitionPaths(HoodieWriteConfig 
writeConfig, List<String> partitions);
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index d698053d7b5..75625e11592 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.cluster;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -31,17 +32,19 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor;
 import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
+import org.apache.hudi.util.Lazy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
-public class ClusteringPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
+public class ClusteringPlanActionExecutor<T, I, K, O> extends 
BaseTableServicePlanActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ClusteringPlanActionExecutor.class);
 
@@ -84,7 +87,9 @@ public class ClusteringPlanActionExecutor<T, I, K, O> extends 
BaseActionExecutor
         ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
             new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, 
HoodieWriteConfig.class}, table, context, config);
 
-    return strategy.generateClusteringPlan();
+    Lazy<List<String>> partitions = Lazy.lazily(() -> getPartitions(strategy, 
TableServiceType.CLUSTER));
+
+    return strategy.generateClusteringPlan(partitions);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
index ecc3706f674..06019d3b2b9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
@@ -42,13 +42,13 @@ import java.util.stream.Stream;
  */
 public class ClusteringPlanPartitionFilter {
 
-  public static List<String> filter(List<String> partitions, HoodieWriteConfig 
config) {
+  public static List<String> filter(List<String> partitions, HoodieWriteConfig 
config, ArrayList<String> missingPartitions) {
     ClusteringPlanPartitionFilterMode mode = 
config.getClusteringPlanPartitionFilterMode();
     switch (mode) {
       case NONE:
         return partitions;
       case RECENT_DAYS:
-        return recentDaysFilter(partitions, config);
+        return recentDaysFilter(partitions, config, missingPartitions);
       case SELECTED_PARTITIONS:
         return selectedPartitionsFilter(partitions, config);
       case DAY_ROLLING:
@@ -71,12 +71,14 @@ public class ClusteringPlanPartitionFilter {
     return selectPt;
   }
 
-  private static List<String> recentDaysFilter(List<String> partitions, 
HoodieWriteConfig config) {
+  private static List<String> recentDaysFilter(List<String> partitions, 
HoodieWriteConfig config, ArrayList<String> missingPartitions) {
     int targetPartitionsForClustering = 
config.getTargetPartitionsForClustering();
     int skipPartitionsFromLatestForClustering = 
config.getSkipPartitionsFromLatestForClustering();
-    return partitions.stream()
-        .sorted(Comparator.reverseOrder())
-        .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
+    int skipOffset = Math.max(skipPartitionsFromLatestForClustering, 0);
+    List<String> sortedPartitions = 
partitions.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
+    missingPartitions.addAll(sortedPartitions.subList(0, skipOffset));
+    return sortedPartitions.stream()
+        .skip(skipOffset)
         .limit(targetPartitionsForClustering > 0 ? 
targetPartitionsForClustering : partitions.size())
         .collect(Collectors.toList());
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index a6894388f6d..6f4508075eb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -33,6 +33,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.apache.hudi.util.Lazy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,7 +102,7 @@ public abstract class ClusteringPlanStrategy<T,I,K,O> 
implements Serializable {
    *
    * If there is no data available to cluster, return None.
    */
-  public abstract Option<HoodieClusteringPlan> generateClusteringPlan();
+  public abstract Option<HoodieClusteringPlan> 
generateClusteringPlan(Lazy<List<String>> partitions);
 
   /**
    * Check if the clustering can proceed. If not (i.e., return false), the 
PlanStrategy will generate an empty plan to stop the scheduling.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 515d6532871..af238cf900f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieClusteringStrategy;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
@@ -30,14 +29,18 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.IncrementalPartitionAwareStrategy;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
+import org.apache.hudi.util.Lazy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -45,7 +48,7 @@ import java.util.stream.Stream;
 /**
  * Scheduling strategy with restriction that clustering groups can only 
contain files from same partition.
  */
-public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends 
ClusteringPlanStrategy<T,I,K,O> {
+public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends 
ClusteringPlanStrategy<T,I,K,O> implements IncrementalPartitionAwareStrategy {
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionAwareClusteringPlanStrategy.class);
 
   public PartitionAwareClusteringPlanStrategy(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
@@ -113,14 +116,15 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
   /**
    * Return list of partition paths to be considered for clustering.
    */
-  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
-    List<String> filteredPartitions = 
ClusteringPlanPartitionFilter.filter(partitionPaths, getWriteConfig());
+  public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> partitions) {
+    ArrayList<String> missingPartitions = new ArrayList<>();
+    List<String> filteredPartitions = 
ClusteringPlanPartitionFilter.filter(partitions, getWriteConfig(), 
missingPartitions);
     LOG.debug("Filtered to the following partitions: " + filteredPartitions);
-    return filteredPartitions;
+    return Pair.of(filteredPartitions, missingPartitions);
   }
 
   @Override
-  public Option<HoodieClusteringPlan> generateClusteringPlan() {
+  public Option<HoodieClusteringPlan> 
generateClusteringPlan(Lazy<List<String>> partitions) {
     if (!checkPrecondition()) {
       return Option.empty();
     }
@@ -135,21 +139,24 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
 
     if (StringUtils.isNullOrEmpty(partitionSelected)) {
       // get matched partitions if set
-      partitionPaths = getRegexPatternMatchedPartitions(config, 
FSUtils.getAllPartitionPaths(
-          getEngineContext(), metaClient.getStorage(), 
config.getMetadataConfig(), metaClient.getBasePath()));
+      partitionPaths = getRegexPatternMatchedPartitions(config, 
partitions.get());
       // filter the partition paths if needed to reduce list status
     } else {
       partitionPaths = Arrays.asList(partitionSelected.split(","));
     }
 
-    partitionPaths = filterPartitionPaths(partitionPaths);
+    Pair<List<String>, List<String>> partitionsPair = 
filterPartitionPaths(getWriteConfig(), partitionPaths);
+    partitionPaths = partitionsPair.getLeft();
+    List<String> missingPartitions = partitionsPair.getRight();
     LOG.info("Scheduling clustering partitionPaths: {}", partitionPaths);
+    LOG.info("Missing Scheduled clustering partitionPaths: {}", 
missingPartitions);
 
     if (partitionPaths.isEmpty()) {
       // In case no partitions could be picked, return no clustering plan
       return Option.empty();
     }
 
+    Set<String> processedPartitions = new HashSet<>();
     List<HoodieClusteringGroup> clusteringGroups = getEngineContext()
         .flatMap(
             partitionPaths,
@@ -160,8 +167,15 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
             partitionPaths.size())
         .stream()
         .limit(getWriteConfig().getClusteringMaxNumGroups())
+        .peek(clusteringGroup -> {
+          String partitionPath = 
clusteringGroup.getSlices().get(0).getPartitionPath();
+          processedPartitions.add(partitionPath);
+        })
         .collect(Collectors.toList());
 
+    partitionPaths.removeAll(processedPartitions);
+    missingPartitions.addAll(partitionPaths);
+
     if (clusteringGroups.isEmpty()) {
       LOG.warn("No data available to cluster");
       return Option.empty();
@@ -178,6 +192,7 @@ public abstract class 
PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
         .setExtraMetadata(getExtraMetadata())
         .setVersion(getPlanVersion())
         .setPreserveHoodieMetadata(true)
+        .setMissingSchedulePartitions(missingPartitions)
         .build());
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index e83800e45da..bd97d40f7fc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -34,7 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.BaseActionExecutor;
+import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor;
 import 
org.apache.hudi.table.action.compact.plan.generators.BaseHoodieCompactionPlanGenerator;
 import 
org.apache.hudi.table.action.compact.plan.generators.HoodieCompactionPlanGenerator;
 import 
org.apache.hudi.table.action.compact.plan.generators.HoodieLogCompactionPlanGenerator;
@@ -50,7 +50,7 @@ import java.util.Map;
 import static org.apache.hudi.common.util.CollectionUtils.nonEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
-public class ScheduleCompactionActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
+public class ScheduleCompactionActionExecutor<T, I, K, O> extends 
BaseTableServicePlanActionExecutor<T, I, K, O, Option<HoodieCompactionPlan>> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ScheduleCompactionActionExecutor.class);
   private WriteOperationType operationType;
@@ -73,9 +73,9 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseActionExec
 
   private void initPlanGenerator(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
     if (WriteOperationType.COMPACT.equals(operationType)) {
-      planGenerator = new HoodieCompactionPlanGenerator(table, context, 
config);
+      planGenerator = new HoodieCompactionPlanGenerator(table, context, 
config, this);
     } else {
-      planGenerator = new HoodieLogCompactionPlanGenerator(table, context, 
config);
+      planGenerator = new HoodieLogCompactionPlanGenerator(table, context, 
config, this);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 1884f4c5d40..0d392bc5ff9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.data.HoodieAccumulator;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -41,6 +40,7 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor;
 import org.apache.hudi.table.action.compact.CompactHelpers;
 
 import org.slf4j.Logger;
@@ -64,11 +64,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
   protected final HoodieTable<T, I, K, O> hoodieTable;
   protected final HoodieWriteConfig writeConfig;
   protected final transient HoodieEngineContext engineContext;
+  protected final BaseTableServicePlanActionExecutor executor;
 
-  public BaseHoodieCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+  public BaseHoodieCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig,
+                                           BaseTableServicePlanActionExecutor 
executor) {
     this.hoodieTable = table;
     this.writeConfig = writeConfig;
     this.engineContext = engineContext;
+    this.executor = executor;
   }
 
   @Nullable
@@ -82,13 +85,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     // TODO - rollback any compactions in flight
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     CompletionTimeQueryView completionTimeQueryView = 
metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient);
-    List<String> partitionPaths = FSUtils.getAllPartitionPaths(
-        engineContext, metaClient.getStorage(), 
writeConfig.getMetadataConfig(), metaClient.getBasePath());
+    List<String> partitionPaths = getPartitions();
 
     int allPartitionSize = partitionPaths.size();
 
     // filter the partition paths if needed to reduce list status
-    partitionPaths = filterPartitionPathsByStrategy(partitionPaths);
+    Pair<List<String>, List<String>> partitionPair = 
filterPartitionPathsByStrategy(partitionPaths);
+    partitionPaths = partitionPair.getLeft();
+
     LOG.info("Strategy: {} matched {} partition paths from all {} partitions 
for table {}",
         writeConfig.getCompactionStrategy().getClass().getSimpleName(), 
partitionPaths.size(), allPartitionSize,
         hoodieTable.getConfig().getBasePath());
@@ -169,7 +173,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     }
 
     // Filter the compactions with the passed in filter. This lets us choose 
most effective compactions only
-    HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, 
operations);
+    HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, 
operations, partitionPair);
     ValidationUtils.checkArgument(
         compactionPlan.getOperations().stream().noneMatch(
             op -> fgIdsInPendingCompactionAndClustering.contains(new 
HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
@@ -182,12 +186,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     return compactionPlan;
   }
 
-  protected abstract HoodieCompactionPlan 
getCompactionPlan(HoodieTableMetaClient metaClient, 
List<HoodieCompactionOperation> operations);
+  protected abstract List<String> getPartitions();
+
+  protected abstract HoodieCompactionPlan 
getCompactionPlan(HoodieTableMetaClient metaClient, 
List<HoodieCompactionOperation> operations, Pair<List<String>,List<String>> 
partitionPair);
 
   protected abstract boolean filterLogCompactionOperations();
 
-  protected List<String> filterPartitionPathsByStrategy(List<String> 
partitionPaths) {
-    return partitionPaths;
+  protected Pair<List<String>, List<String>> 
filterPartitionPathsByStrategy(List<String> partitionPaths) {
+    return Pair.of(partitionPaths, Collections.emptyList());
   }
 
   protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
index a93ece710b0..378461f1b98 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
@@ -22,11 +22,13 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.slf4j.Logger;
@@ -43,22 +45,28 @@ public class HoodieCompactionPlanGenerator<T extends 
HoodieRecordPayload, I, K,
 
   private final CompactionStrategy compactionStrategy;
 
-  public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
+  public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig,
+                                       BaseTableServicePlanActionExecutor 
executor) {
+    super(table, engineContext, writeConfig, executor);
     this.compactionStrategy = writeConfig.getCompactionStrategy();
     LOG.info("Compaction Strategy used is: " + compactionStrategy.toString());
   }
 
   @Override
-  protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, List<HoodieCompactionOperation> operations) {
+  protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, List<HoodieCompactionOperation> operations, Pair<List<String>, 
List<String>> partitionPair) {
     // Filter the compactions with the passed in filter. This lets us choose 
most effective
     // compactions only
     return compactionStrategy.generateCompactionPlan(writeConfig, operations,
-        
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()));
+        
CompactionUtils.getAllPendingCompactionPlans(metaClient).stream().map(Pair::getValue).collect(toList()),
 getStrategyParams(), partitionPair);
   }
 
   @Override
-  protected List<String> filterPartitionPathsByStrategy(List<String> 
partitionPaths) {
+  protected List<String> getPartitions() {
+    return executor.getPartitions(compactionStrategy, 
TableServiceType.COMPACT);
+  }
+
+  @Override
+  protected Pair<List<String>, List<String>> 
filterPartitionPathsByStrategy(List<String> partitionPaths) {
     return compactionStrategy.filterPartitionPaths(writeConfig, 
partitionPaths);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index a81ee663fa9..5d9337ac4e7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -26,13 +26,16 @@ import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.BaseTableServicePlanActionExecutor;
 import org.apache.hudi.table.action.compact.LogCompactionExecutionHelper;
 
 import org.slf4j.Logger;
@@ -45,25 +48,33 @@ import java.util.stream.Collectors;
 public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, 
I, K, O> extends BaseHoodieCompactionPlanGenerator<T, I, K, O> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogCompactionPlanGenerator.class);
+  private final HoodieCompactionStrategy compactionStrategy;
 
-  public HoodieLogCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
-  }
-
-  @Override
-  protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, List<HoodieCompactionOperation> operations) {
-    HoodieCompactionStrategy compactionStrategy = 
HoodieCompactionStrategy.newBuilder()
+  public HoodieLogCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig,
+                                          BaseTableServicePlanActionExecutor 
executor) {
+    super(table, engineContext, writeConfig, executor);
+    this.compactionStrategy = HoodieCompactionStrategy.newBuilder()
         .setStrategyParams(getStrategyParams())
         .setCompactorClassName(LogCompactionExecutionHelper.class.getName())
         .build();
+  }
+
+  @Override
+  protected HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, List<HoodieCompactionOperation> operations, Pair<List<String>, 
List<String>> partitionPair) {
     return HoodieCompactionPlan.newBuilder()
         .setOperations(operations)
         .setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION)
         .setStrategy(compactionStrategy)
+        .setMissingSchedulePartitions(partitionPair.getRight())
         .setPreserveHoodieMetadata(true)
         .build();
   }
 
+  @Override
+  protected List<String> getPartitions() {
+    return executor.getPartitions(compactionStrategy, 
TableServiceType.LOG_COMPACT);
+  }
+
   @Override
   protected boolean filterFileSlice(FileSlice fileSlice, String 
lastCompletedInstantTime,
                                     Set<HoodieFileGroupId> 
pendingFileGroupIds, Option<InstantRange> instantRange) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
index d93a50fd673..6d7a17ba536 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedIOCompactionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * CompactionStrategy which looks at total IO to be done for the compaction 
(read + write) and limits the list of
@@ -35,7 +36,7 @@ public class BoundedIOCompactionStrategy extends 
CompactionStrategy {
 
   @Override
   public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig,
-      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans) {
+                                                        
List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans, Set<String> missingPartitions) {
     // Iterate through the operations in order and accept operations as long 
as we are within the
     // IO limit
     // Preserves the original ordering of compactions
@@ -46,7 +47,7 @@ public class BoundedIOCompactionStrategy extends 
CompactionStrategy {
       targetIORemaining -= opIo;
       finalOperations.add(op);
       if (targetIORemaining <= 0) {
-        return finalOperations;
+        missingPartitions.add(op.getPartitionPath());
       }
     }
     return finalOperations;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
index 09c19b1aabe..bd858862bc7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/BoundedPartitionAwareCompactionStrategy.java
@@ -20,14 +20,17 @@ package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -46,7 +49,7 @@ public class BoundedPartitionAwareCompactionStrategy extends 
DayBasedCompactionS
 
   @Override
   public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig,
-      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans) {
+      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans, Set<String> missingPartitions) {
     // The earliest partition to compact - current day minus the target 
partitions limit
     String earliestPartitionPathToCompact =
         dateFormat.get().format(getDateAtOffsetFromToday(-1 * 
writeConfig.getTargetPartitionsPerDayBasedCompaction()));
@@ -54,19 +57,35 @@ public class BoundedPartitionAwareCompactionStrategy 
extends DayBasedCompactionS
 
     return 
operations.stream().collect(Collectors.groupingBy(HoodieCompactionOperation::getPartitionPath)).entrySet()
         
.stream().sorted(Map.Entry.comparingByKey(DayBasedCompactionStrategy.comparator))
-        .filter(e -> 
DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, 
e.getKey()) >= 0)
-        .flatMap(e -> e.getValue().stream()).collect(Collectors.toList());
+        .filter(e -> {
+          if 
(DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, 
e.getKey()) >= 0) {
+            return true;
+          } else {
+            missingPartitions.add(e.getKey());
+            return false;
+          }
+        }).flatMap(e -> e.getValue().stream()).collect(Collectors.toList());
   }
 
   @Override
-  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> partitionPaths) {
+  public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
partitionPaths) {
     // The earliest partition to compact - current day minus the target 
partitions limit
     String earliestPartitionPathToCompact =
         dateFormat.get().format(getDateAtOffsetFromToday(-1 * 
writeConfig.getTargetPartitionsPerDayBasedCompaction()));
+
+    ArrayList<String> partitionsToProcess = new ArrayList<>();
+    ArrayList<String> missingPartitions = new ArrayList<>();
+
     // Get all partitions and sort them
-    return partitionPaths.stream().map(partition -> partition.replace("/", 
"-"))
-        .sorted(Comparator.reverseOrder()).map(partitionPath -> 
partitionPath.replace("-", "/"))
-        .filter(e -> 
DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, 
e) >= 0).collect(Collectors.toList());
+    partitionPaths.stream().map(partition -> partition.replace("/", "-"))
+        .sorted(Comparator.reverseOrder()).map(partitionPath -> 
partitionPath.replace("-", "/")).forEach(partition -> {
+          if 
(DayBasedCompactionStrategy.comparator.compare(earliestPartitionPathToCompact, 
partition) >= 0) {
+            partitionsToProcess.add(partition);
+          } else {
+            missingPartitions.add(partition);
+          }
+        });
+    return Pair.of(partitionsToProcess, missingPartitions);
   }
 
   public static Date getDateAtOffsetFromToday(int offset) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
index 5b2dba7ab7a..ac82fd0eb3a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompactionStrategy.java
@@ -23,13 +23,18 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.client.utils.FileSliceMetricUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.IncrementalPartitionAwareStrategy;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Strategy for compaction. Pluggable implementation to define how compaction 
should be done. The over-ridden
@@ -37,7 +42,7 @@ import java.util.Map;
  * compaction operation to run in a single compaction. Implementation of 
CompactionStrategy cannot hold any state.
  * Difference instantiations can be passed in every time
  */
-public abstract class CompactionStrategy implements Serializable {
+public abstract class CompactionStrategy implements 
IncrementalPartitionAwareStrategy, Serializable {
 
   public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB";
   public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB";
@@ -71,10 +76,13 @@ public abstract class CompactionStrategy implements 
Serializable {
    * @return Compaction plan to be scheduled.
    */
   public HoodieCompactionPlan generateCompactionPlan(HoodieWriteConfig 
writeConfig,
-      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans) {
+      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans, Map<String, String> params, Pair<List<String>, 
List<String>> partitionPair) {
     // Strategy implementation can overload this method to set specific 
compactor-id
+    Set<String> missingPartitions = new HashSet<>(partitionPair.getRight());
+    List<HoodieCompactionOperation> operationsToProcess = 
orderAndFilter(writeConfig, operations, pendingCompactionPlans, 
missingPartitions);
     return HoodieCompactionPlan.newBuilder()
-        .setOperations(orderAndFilter(writeConfig, operations, 
pendingCompactionPlans))
+        .setOperations(operationsToProcess)
+        .setMissingSchedulePartitions(new ArrayList<>(missingPartitions))
         
.setVersion(CompactionUtils.LATEST_COMPACTION_METADATA_VERSION).build();
   }
 
@@ -88,7 +96,7 @@ public abstract class CompactionStrategy implements 
Serializable {
    * @return list of compactions to perform in this run
    */
   public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig,
-      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans) {
+      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans, Set<String> missingPartitions) {
     return operations;
   }
 
@@ -99,7 +107,7 @@ public abstract class CompactionStrategy implements 
Serializable {
    * @param allPartitionPaths
    * @return
    */
-  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
-    return allPartitionPaths;
+  public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
allPartitionPaths) {
+    return Pair.of(allPartitionPaths, Collections.emptyList());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
index da90269509d..c9583f89ea8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/CompositeCompactionStrategy.java
@@ -21,9 +21,12 @@ package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * CompositeCompactionStrategy chains multiple compaction strategies together.
@@ -39,21 +42,26 @@ public class CompositeCompactionStrategy extends 
CompactionStrategy {
   }
 
   @Override
-  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig, List<HoodieCompactionOperation> operations, 
List<HoodieCompactionPlan> pendingCompactionPlans) {
+  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig, List<HoodieCompactionOperation> operations, 
List<HoodieCompactionPlan> pendingCompactionPlans,
+                                                        Set<String> 
missingPartitions) {
     List<HoodieCompactionOperation> finalOperations = operations;
     for (CompactionStrategy strategy : strategies) {
-      finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, 
pendingCompactionPlans);
+      finalOperations = strategy.orderAndFilter(writeConfig, finalOperations, 
pendingCompactionPlans, missingPartitions);
     }
     return finalOperations;
   }
 
   @Override
-  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
-    List<String> finalPartitionPaths = allPartitionPaths;
+  public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
allPartitionPaths) {
+    List<String> partitionsToProcess = allPartitionPaths;
+    List<String> missingPartitions = new ArrayList<>();
+
     for (CompactionStrategy strategy : strategies) {
-      finalPartitionPaths = strategy.filterPartitionPaths(writeConfig, 
finalPartitionPaths);
+      Pair<List<String>, List<String>> innerRes = 
strategy.filterPartitionPaths(writeConfig, partitionsToProcess);
+      partitionsToProcess = innerRes.getLeft();
+      missingPartitions.addAll(innerRes.getRight());
     }
-    return finalPartitionPaths;
+    return Pair.of(partitionsToProcess, missingPartitions);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
index d77d22d05c5..871b1b9fc38 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/DayBasedCompactionStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.compact.strategy;
 
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 
@@ -61,10 +62,13 @@ public class DayBasedCompactionStrategy extends 
BoundedIOCompactionStrategy {
   }
 
   @Override
-  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
-    return allPartitionPaths.stream().sorted(comparator)
-        .collect(Collectors.toList()).subList(0, 
Math.min(allPartitionPaths.size(),
-            writeConfig.getTargetPartitionsPerDayBasedCompaction()));
+  public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
allPartitionPaths) {
+    List<String> sortedPartitions = 
allPartitionPaths.stream().sorted(comparator).collect(Collectors.toList());
+    int boundary = Math.min(allPartitionPaths.size(), 
writeConfig.getTargetPartitionsPerDayBasedCompaction());
+
+    List<String> partitionsToProcess = sortedPartitions.subList(0, boundary);
+    List<String> missingPartitions = sortedPartitions.subList(boundary, 
sortedPartitions.size());
+    return Pair.of(partitionsToProcess, missingPartitions);
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
index 6f79b684d0a..e7024a00ee7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -34,12 +35,20 @@ public class LogFileNumBasedCompactionStrategy extends 
BoundedIOCompactionStrate
     implements Comparator<HoodieCompactionOperation> {
 
   @Override
-  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig, List<HoodieCompactionOperation> operations, 
List<HoodieCompactionPlan> pendingCompactionPlans) {
+  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig, List<HoodieCompactionOperation> operations, 
List<HoodieCompactionPlan> pendingCompactionPlans,
+                                                        Set<String> 
missingPartitions) {
     Long numThreshold = writeConfig.getCompactionLogFileNumThreshold();
     List<HoodieCompactionOperation> filterOperator = operations.stream()
         .filter(e -> e.getDeltaFilePaths().size() >= numThreshold)
-        .sorted(this).collect(Collectors.toList());
-    return super.orderAndFilter(writeConfig, filterOperator, 
pendingCompactionPlans);
+        .filter(e -> {
+          if (e.getDeltaFilePaths().size() >= numThreshold) {
+            return true;
+          } else {
+            missingPartitions.add(e.getPartitionPath());
+            return false;
+          }
+        }).sorted(this).collect(Collectors.toList());
+    return super.orderAndFilter(writeConfig, filterOperator, 
pendingCompactionPlans, missingPartitions);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
index c165141dfc5..1024f5f9cf0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileSizeBasedCompactionStrategy.java
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -39,14 +40,21 @@ public class LogFileSizeBasedCompactionStrategy extends 
BoundedIOCompactionStrat
 
   @Override
   public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
writeConfig,
-      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans) {
+      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionPlans, Set<String> missingPartitions) {
     // Filter the file group which log files size is greater than the 
threshold in bytes.
     // Order the operations based on the reverse size of the logs and limit 
them by the IO
     long threshold = writeConfig.getCompactionLogFileSizeThreshold();
     return super.orderAndFilter(writeConfig, operations.stream()
-            .filter(e -> e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) 
>= threshold)
+            .filter(e -> {
+              if (e.getMetrics().getOrDefault(TOTAL_LOG_FILE_SIZE, 0d) >= 
threshold) {
+                return true;
+              } else {
+                missingPartitions.add(e.getPartitionPath());
+                return false;
+              }
+            })
             .sorted(this).collect(Collectors.toList()),
-        pendingCompactionPlans);
+        pendingCompactionPlans, missingPartitions);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
index eb3b42ee7ca..1ff5316389f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/PartitionRegexBasedCompactionStrategy.java
@@ -18,18 +18,23 @@
 
 package org.apache.hudi.table.action.compact.strategy;
 
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 public class PartitionRegexBasedCompactionStrategy extends CompactionStrategy {
 
+  /**
+   * Regex filtered partitions are not included in missing partitions.
+   */
   @Override
-  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
+  public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
allPartitionPaths) {
     String regex = writeConfig.getCompactionSpecifyPartitionPathRegex();
     Pattern pattern = Pattern.compile(regex);
-    return 
allPartitionPaths.stream().filter(pattern.asPredicate()).collect(Collectors.toList());
+    return 
Pair.of(allPartitionPaths.stream().filter(pattern.asPredicate()).collect(Collectors.toList()),
 new ArrayList<>());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
index ffc437bcd4c..fc34a7bd8d1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedCompactionStrategy.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * UnBoundedCompactionStrategy will not change ordering or filter any 
compaction. It is a pass-through and will compact
@@ -34,7 +35,7 @@ public class UnBoundedCompactionStrategy extends 
CompactionStrategy {
 
   @Override
   public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
config,
-      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionWorkloads) {
+      List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> 
pendingCompactionWorkloads, Set<String> missingPartitions) {
     return operations;
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
index 66b5612e1ac..d51ee5a12c7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/UnBoundedPartitionAwareCompactionStrategy.java
@@ -20,11 +20,13 @@ package org.apache.hudi.table.action.compact.strategy;
 
 import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -40,26 +42,24 @@ public class UnBoundedPartitionAwareCompactionStrategy 
extends CompactionStrateg
 
   @Override
   public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig 
config,
-      final List<HoodieCompactionOperation> operations, final 
List<HoodieCompactionPlan> pendingCompactionWorkloads) {
+      final List<HoodieCompactionOperation> operations, final 
List<HoodieCompactionPlan> pendingCompactionWorkloads, Set<String> 
missingPartitions) {
     BoundedPartitionAwareCompactionStrategy 
boundedPartitionAwareCompactionStrategy =
         new BoundedPartitionAwareCompactionStrategy();
     List<HoodieCompactionOperation> operationsToExclude =
-        boundedPartitionAwareCompactionStrategy.orderAndFilter(config, 
operations, pendingCompactionWorkloads);
+        boundedPartitionAwareCompactionStrategy.orderAndFilter(config, 
operations, pendingCompactionWorkloads, missingPartitions);
     List<HoodieCompactionOperation> allOperations = new 
ArrayList<>(operations);
     allOperations.removeAll(operationsToExclude);
     return allOperations;
   }
 
   @Override
-  public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> partitionPaths) {
+  public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
partitionPaths) {
     List<String> allPartitionPaths =
         partitionPaths.stream().map(partition -> partition.replace("/", 
"-")).sorted(Comparator.reverseOrder())
             .map(partitionPath -> partitionPath.replace("-", 
"/")).collect(Collectors.toList());
     BoundedPartitionAwareCompactionStrategy 
boundedPartitionAwareCompactionStrategy =
         new BoundedPartitionAwareCompactionStrategy();
-    List<String> partitionsToExclude =
-        
boundedPartitionAwareCompactionStrategy.filterPartitionPaths(writeConfig, 
partitionPaths);
-    allPartitionPaths.removeAll(partitionsToExclude);
-    return allPartitionPaths;
+    Pair<List<String>, List<String>> partitionPair = 
boundedPartitionAwareCompactionStrategy.filterPartitionPaths(writeConfig, 
allPartitionPaths);
+    return Pair.of(partitionPair.getRight(), partitionPair.getLeft());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
index a053a961105..3b579f142a3 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestPartitionAwareClusteringPlanStrategy.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.cluster.strategy;
 
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
@@ -83,11 +84,6 @@ public class TestPartitionAwareClusteringPlanStrategy {
       super(table, engineContext, writeConfig);
     }
 
-    @Override
-    protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List list) {
-      return null;
-    }
-
     @Override
     protected Map<String, String> getStrategyParams() {
       return null;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java
index 10046f84f5b..84232cf64fe 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkSizeBasedClusteringPlanStrategyRecently.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.Lazy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,7 +62,7 @@ public class FlinkSizeBasedClusteringPlanStrategyRecently<T> 
extends FlinkSizeBa
   }
 
   @Override
-  public Option<HoodieClusteringPlan> generateClusteringPlan() {
+  public Option<HoodieClusteringPlan> 
generateClusteringPlan(Lazy<List<String>> partitions) {
     if (!checkPrecondition()) {
       return Option.empty();
     }
@@ -71,7 +72,7 @@ public class FlinkSizeBasedClusteringPlanStrategyRecently<T> 
extends FlinkSizeBa
 
     List<String> partitionPaths = 
getPartitionPathInActiveTimeline(hoodieTable);
 
-    partitionPaths = filterPartitionPaths(partitionPaths);
+    partitionPaths = filterPartitionPaths(getWriteConfig(), 
partitionPaths).getLeft();
 
     if (partitionPaths.isEmpty()) {
       // In case no partitions could be picked, return no clustering plan
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
index 70643a327d4..c048ac0dedd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
@@ -63,7 +63,7 @@ public class TestSparkClusteringPlanPartitionFilter {
     fakeTimeBasedPartitionsPath.add("20210718");
     fakeTimeBasedPartitionsPath.add("20210716");
     fakeTimeBasedPartitionsPath.add("20210719");
-    List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    List list = (List)sg.filterPartitionPaths(null, 
fakeTimeBasedPartitionsPath).getLeft();
     assertEquals(3, list.size());
   }
 
@@ -81,7 +81,7 @@ public class TestSparkClusteringPlanPartitionFilter {
     fakeTimeBasedPartitionsPath.add("20210718");
     fakeTimeBasedPartitionsPath.add("20210716");
     fakeTimeBasedPartitionsPath.add("20210719");
-    List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    List list = (List)sg.filterPartitionPaths(null, 
fakeTimeBasedPartitionsPath).getLeft();
     assertEquals(1, list.size());
     assertSame("20210718", list.get(0));
   }
@@ -101,7 +101,7 @@ public class TestSparkClusteringPlanPartitionFilter {
     fakeTimeBasedPartitionsPath.add("20211221");
     fakeTimeBasedPartitionsPath.add("20211222");
     fakeTimeBasedPartitionsPath.add("20211224");
-    List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    List list = (List)sg.filterPartitionPaths(table.getConfig(), 
fakeTimeBasedPartitionsPath).getLeft();
     assertEquals(1, list.size());
     assertSame("20211222", list.get(0));
   }
@@ -117,7 +117,7 @@ public class TestSparkClusteringPlanPartitionFilter {
     for (int i = 0; i < 24; i++) {
       fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? 
String.valueOf(i) : "0" + i));
     }
-    List filterPartitions = 
sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    List filterPartitions = (List)sg.filterPartitionPaths(null, 
fakeTimeBasedPartitionsPath).getLeft();
     assertEquals(1, filterPartitions.size());
     
assertEquals(fakeTimeBasedPartitionsPath.get(DateTime.now().getHourOfDay()), 
filterPartitions.get(0));
     fakeTimeBasedPartitionsPath = new ArrayList<>();
@@ -125,7 +125,7 @@ public class TestSparkClusteringPlanPartitionFilter {
       fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? 
String.valueOf(i) : "0" + i));
       fakeTimeBasedPartitionsPath.add("20220302" + (i >= 10 ? 
String.valueOf(i) : "0" + i));
     }
-    filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    filterPartitions =(List)sg.filterPartitionPaths(null, 
fakeTimeBasedPartitionsPath).getLeft();
     assertEquals(2, filterPartitions.size());
 
     int hourOfDay = DateTime.now().getHourOfDay();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index a368d43da25..64d57bfe395 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -63,7 +63,7 @@ public class TestHoodieCompactionStrategy {
     HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp")
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).build()).build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>(), Collections.emptySet());
     assertEquals(operations, returned, "UnBounded should not re-order or 
filter");
   }
 
@@ -79,7 +79,7 @@ public class TestHoodieCompactionStrategy {
             
HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
         .build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>(), Collections.emptySet());
 
     assertTrue(returned.size() < operations.size(), "BoundedIOCompaction 
should have resulted in fewer compactions");
     assertEquals(2, returned.size(), "BoundedIOCompaction should have resulted 
in 2 compactions being chosen");
@@ -103,7 +103,7 @@ public class TestHoodieCompactionStrategy {
                 .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 
1024).build())
         .build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>(), Collections.emptySet());
 
     assertTrue(returned.size() < operations.size(),
         "LogFileSizeBasedCompactionStrategy should have resulted in fewer 
compactions");
@@ -137,7 +137,7 @@ public class TestHoodieCompactionStrategy {
         
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(1).build()).build();
 
-    List<String> filterPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths));
+    List<String> filterPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths)).getLeft();
     assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy 
should have resulted in fewer partitions");
 
     List<HoodieCompactionOperation> operations = 
createCompactionOperationsForPartition(writeConfig, sizesMap, 
keyToPartitionMap, filterPartitions);
@@ -182,11 +182,11 @@ public class TestHoodieCompactionStrategy {
             .build())
         .build();
 
-    List<String> filterPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths));
+    List<String> filterPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
Arrays.asList(partitionPaths)).getLeft();
     assertEquals(1, filterPartitions.size(), "DayBasedCompactionStrategy 
should have resulted in fewer partitions");
 
     List<HoodieCompactionOperation> operations = 
createCompactionOperationsForPartition(writeConfig, sizesMap, 
keyToPartitionMap, filterPartitions);
-    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>(), Collections.emptySet());
 
     assertEquals(1, returned.size(),
         "DayBasedAndBoundedIOCompactionStrategy should have resulted in fewer 
compactions");
@@ -241,7 +241,7 @@ public class TestHoodieCompactionStrategy {
         
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
-    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>(), Collections.emptySet());
 
     assertTrue(returned.size() < operations.size(),
         "BoundedPartitionAwareCompactionStrategy should have resulted in fewer 
compactions");
@@ -290,7 +290,7 @@ public class TestHoodieCompactionStrategy {
         
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(HoodieCompactionConfig.newBuilder()
             
.withCompactionStrategy(strategy).withTargetPartitionsPerDayBasedCompaction(2).build()).build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap, keyToPartitionMap);
-    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>(), Collections.emptySet());
 
     assertTrue(returned.size() < operations.size(),
         "UnBoundedPartitionAwareCompactionStrategy should not include last "
@@ -312,7 +312,7 @@ public class TestHoodieCompactionStrategy {
                 .withCompactionLogFileNumThreshold(2).build())
         .build();
     List<HoodieCompactionOperation> operations = 
createCompactionOperations(writeConfig, sizesMap);
-    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>());
+    List<HoodieCompactionOperation> returned = 
writeConfig.getCompactionStrategy().orderAndFilter(writeConfig, operations, new 
ArrayList<>(), Collections.emptySet());
 
     assertTrue(returned.size() < operations.size(),
         "LogFileLengthBasedCompactionStrategy should have resulted in fewer 
compactions");
@@ -341,7 +341,7 @@ public class TestHoodieCompactionStrategy {
     List<String> allPartitionPaths = Arrays.asList(
         "2017/01/01", "2018/01/02", "2017/02/01"
     );
-    List<String> returned = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
allPartitionPaths);
+    List<String> returned = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
allPartitionPaths).getLeft();
     // filter by num first and then filter by prefix
     assertEquals(1, returned.size());
     assertEquals("2017/01/01", returned.get(0));
@@ -349,7 +349,7 @@ public class TestHoodieCompactionStrategy {
     writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
         HoodieCompactionConfig.newBuilder().withCompactionStrategy(new 
PrefixStrategy(), new NumStrategy()).withTargetIOPerCompactionInMB(1024)
             .withCompactionLogFileNumThreshold(2).build()).build();
-    returned = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
allPartitionPaths);
+    returned = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
allPartitionPaths).getLeft();
     // filter by prefix first and then filter by num
     assertEquals(2, returned.size());
     assertEquals("2017/01/01", returned.get(0));
@@ -358,15 +358,28 @@ public class TestHoodieCompactionStrategy {
 
   public static class NumStrategy extends CompactionStrategy {
     @Override
-    public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
-      return allPartitionPaths.stream().limit(2).collect(Collectors.toList());
+    public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
allPartitionPaths) {
+      List<String> partitionToProcess = 
allPartitionPaths.stream().limit(2).collect(Collectors.toList());
+      List<String> missingPartitions = 
allPartitionPaths.stream().skip(2).collect(Collectors.toList());
+      return Pair.of(partitionToProcess, missingPartitions);
     }
   }
 
   public static class PrefixStrategy extends CompactionStrategy {
     @Override
-    public List<String> filterPartitionPaths(HoodieWriteConfig writeConfig, 
List<String> allPartitionPaths) {
-      return allPartitionPaths.stream().filter(s -> 
s.startsWith("2017")).collect(Collectors.toList());
+    public Pair<List<String>, List<String>> 
filterPartitionPaths(HoodieWriteConfig writeConfig, List<String> 
allPartitionPaths) {
+      ArrayList<String> partitionsToProcess = new ArrayList<>();
+      ArrayList<String> missingPartitions = new ArrayList<>();
+
+      allPartitionPaths.forEach(partition -> {
+        if (partition.startsWith("2017")) {
+          partitionsToProcess.add(partition);
+        } else {
+          missingPartitions.add(partition);
+        }
+      });
+
+      return Pair.of(partitionsToProcess, missingPartitions);
     }
   }
 
@@ -381,21 +394,21 @@ public class TestHoodieCompactionStrategy {
     );
 
     HoodieWriteConfig writeConfig = updateRegex(".*");
-    List<String> filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    List<String> filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions).getLeft();
     assertEquals(5, filteredPartitions.size());
 
     writeConfig = updateRegex("2020/01/01");
-    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions).getLeft();
     assertEquals(1, filteredPartitions.size());
     assertEquals("2020/01/01", filteredPartitions.get(0));
 
     writeConfig = updateRegex("2020/01/0[1-2]");
-    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions).getLeft();
     assertEquals(2, filteredPartitions.size());
     assertEquals("2020/01/01", filteredPartitions.get(0));
     assertEquals("2020/01/02", filteredPartitions.get(1));
     writeConfig = updateRegex("2020/01/0[1-2]|2020/02/01");
-    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions).getLeft();
     assertEquals(3, filteredPartitions.size());
     assertEquals("2020/01/01", filteredPartitions.get(0));
     assertEquals("2020/01/02", filteredPartitions.get(1));
@@ -403,13 +416,13 @@ public class TestHoodieCompactionStrategy {
 
 
     writeConfig = updateRegex("2020/.*/01");
-    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions).getLeft();
     assertEquals(2, filteredPartitions.size());
     assertEquals("2020/01/01", filteredPartitions.get(0));
     assertEquals("2020/02/01", filteredPartitions.get(1));
 
     writeConfig = updateRegex(".*/01/.*");
-    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions);
+    filteredPartitions = 
writeConfig.getCompactionStrategy().filterPartitionPaths(writeConfig, 
partitions).getLeft();
     assertEquals(4, filteredPartitions.size());
     assertEquals("2020/01/01", filteredPartitions.get(0));
     assertEquals("2020/01/02", filteredPartitions.get(1));
diff --git a/hudi-common/src/main/avro/HoodieClusteringPlan.avsc 
b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc
index 87486267d1c..d4a6c207762 100644
--- a/hudi-common/src/main/avro/HoodieClusteringPlan.avsc
+++ b/hudi-common/src/main/avro/HoodieClusteringPlan.avsc
@@ -50,6 +50,14 @@
       "name":"preserveHoodieMetadata",
       "type":["null", "boolean"],
       "default": null
-    }
+    },
+     {
+       "name":"missingSchedulePartitions",
+       "type":["null", {
+         "type":"array",
+         "items":"string"
+       }],
+       "default": null
+     }
   ]
 }
diff --git a/hudi-common/src/main/avro/HoodieCompactionOperation.avsc 
b/hudi-common/src/main/avro/HoodieCompactionOperation.avsc
index bab7321f29c..61a96faec87 100644
--- a/hudi-common/src/main/avro/HoodieCompactionOperation.avsc
+++ b/hudi-common/src/main/avro/HoodieCompactionOperation.avsc
@@ -96,6 +96,14 @@
        "name":"preserveHoodieMetadata",
        "type":["boolean", "null"],
        "default": false
+    },
+    {
+       "name":"missingSchedulePartitions",
+       "type":["null", {
+          "type":"array",
+          "items":"string"
+       }],
+       "default": null
     }
   ]
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
index 2bc8f7fba62..eaed3668c9f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
@@ -192,7 +192,7 @@ public class CompactionTestUtils {
       }
     }).collect(Collectors.toList());
     return new HoodieCompactionPlan(ops.isEmpty() ? null : ops, new 
HashMap<>(),
-        CompactionUtils.LATEST_COMPACTION_METADATA_VERSION, null, null);
+        CompactionUtils.LATEST_COMPACTION_METADATA_VERSION, null, null, null);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
index f35a2792731..c40dd2350e5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
@@ -131,7 +131,7 @@ public class TestClusteringUtil {
   private String generateClusteringPlan() {
     HoodieClusteringGroup group = new HoodieClusteringGroup();
     HoodieClusteringPlan plan = new 
HoodieClusteringPlan(Collections.singletonList(group),
-        HoodieClusteringStrategy.newBuilder().build(), Collections.emptyMap(), 
1, false);
+        HoodieClusteringStrategy.newBuilder().build(), Collections.emptyMap(), 
1, false, null);
     HoodieRequestedReplaceMetadata metadata = new 
HoodieRequestedReplaceMetadata(WriteOperationType.CLUSTER.name(),
         plan, Collections.emptyMap(), 1);
     String instantTime = table.getMetaClient().createNewInstantTime();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 111680bdcdb..ccfd5965cad 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -174,7 +174,7 @@ public class TestCompactionUtil {
    */
   private String generateCompactionPlan() {
     HoodieCompactionOperation operation = new HoodieCompactionOperation();
-    HoodieCompactionPlan plan = new 
HoodieCompactionPlan(Collections.singletonList(operation), 
Collections.emptyMap(), 1, null, null);
+    HoodieCompactionPlan plan = new 
HoodieCompactionPlan(Collections.singletonList(operation), 
Collections.emptyMap(), 1, null, null, null);
     String instantTime = table.getMetaClient().createNewInstantTime();
     HoodieInstant compactionInstant =
         INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, instantTime);

Reply via email to