This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch flink-incremental-compaction
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e18fc63d42e843fbcc4f304821dadff1d4400a3e
Author: zhangyue143 <[email protected]>
AuthorDate: Mon Dec 9 11:21:32 2024 +0800

    flink schedule compaction with incremental partitions
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |   9 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |   4 +-
 .../compact/ScheduleCompactionActionExecutor.java  |  13 +-
 .../BaseHoodieCompactionPlanGenerator.java         |  16 ++-
 .../generators/HoodieCompactionPlanGenerator.java  |   7 +-
 .../HoodieLogCompactionPlanGenerator.java          |   4 +-
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |   4 +-
 .../hudi/table/HoodieFlinkMergeOnReadTable.java    |  13 +-
 .../hudi/table/HoodieJavaCopyOnWriteTable.java     |   4 +-
 .../hudi/table/HoodieJavaMergeOnReadTable.java     |   8 +-
 .../hudi/table/HoodieSparkCopyOnWriteTable.java    |   4 +-
 .../hudi/table/HoodieSparkMergeOnReadTable.java    |   8 +-
 .../apache/hudi/configuration/FlinkOptions.java    |   6 +
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 140 ++++++++++++++++++---
 .../java/org/apache/hudi/util/CompactionUtil.java  |  14 ++-
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |  14 +++
 ...CoordinatorWithIncrementalCompactionEnable.java |  63 ++++++++++
 ...eadWithIncrementalScheduleCompactionEnable.java |  29 +++++
 .../org/apache/hudi/utils/TestCompactionUtil.java  |   4 +-
 .../table/action/compact/TestHoodieCompactor.java  |   4 +-
 20 files changed, 313 insertions(+), 55 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index e96a6707247..6328dfe6d65 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -116,6 +116,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
   protected transient AsyncArchiveService asyncArchiveService;
 
   protected Set<String> pendingInflightAndRequestedInstants;
+  protected Option<Set<String>> specificPartitions = Option.empty();
 
   protected BaseHoodieTableServiceClient(HoodieEngineContext context,
                                          HoodieWriteConfig clientConfig,
@@ -162,6 +163,10 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
     }
   }
 
+  public void setSpecificPartitions(Option<Set<String>> specificPartitions) {
+    this.specificPartitions = specificPartitions;
+  }
+
   protected void setPendingInflightAndRequestedInstants(Set<String> 
pendingInflightAndRequestedInstants) {
     this.pendingInflightAndRequestedInstants = 
pendingInflightAndRequestedInstants;
   }
@@ -663,8 +668,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
         break;
       case COMPACT:
         LOG.info("Scheduling compaction at instant time: {}", instantTime);
-        Option<HoodieCompactionPlan> compactionPlan = table
-            .scheduleCompaction(context, instantTime, extraMetadata);
+        Option<HoodieCompactionPlan> compactionPlan =
+            table.scheduleCompaction(context, instantTime, extraMetadata, 
specificPartitions);
         option = compactionPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
         break;
       case LOG_COMPACT:
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 19b6624922a..e36b36ef78c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -452,11 +452,13 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * @param context HoodieEngineContext
    * @param instantTime Instant Time for scheduling compaction
    * @param extraMetadata additional metadata to write into plan
+   * @param specificPartitions specific partitions to do compaction
    * @return
    */
   public abstract Option<HoodieCompactionPlan> 
scheduleCompaction(HoodieEngineContext context,
                                                                   String 
instantTime,
-                                                                  
Option<Map<String, String>> extraMetadata);
+                                                                  
Option<Map<String, String>> extraMetadata,
+                                                                  
Option<Set<String>> specificPartitions);
 
   /**
    * Run Compaction on the table. Compaction arranges the data so that it is 
optimized for data access.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
index e83800e45da..7124e17ec0d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java
@@ -46,6 +46,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.hudi.common.util.CollectionUtils.nonEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@@ -56,26 +57,26 @@ public class ScheduleCompactionActionExecutor<T, I, K, O> 
extends BaseActionExec
   private WriteOperationType operationType;
   private final Option<Map<String, String>> extraMetadata;
   private BaseHoodieCompactionPlanGenerator planGenerator;
-
   public ScheduleCompactionActionExecutor(HoodieEngineContext context,
                                           HoodieWriteConfig config,
                                           HoodieTable<T, I, K, O> table,
                                           String instantTime,
                                           Option<Map<String, String>> 
extraMetadata,
-                                          WriteOperationType operationType) {
+                                          WriteOperationType operationType,
+                                          Option<Set<String>> 
specificPartitions) {
     super(context, config, table, instantTime);
     this.extraMetadata = extraMetadata;
     this.operationType = operationType;
     checkArgument(operationType == WriteOperationType.COMPACT || operationType 
== WriteOperationType.LOG_COMPACT,
         "Only COMPACT and LOG_COMPACT is supported");
-    initPlanGenerator(context, config, table);
+    initPlanGenerator(context, config, table, specificPartitions);
   }
 
-  private void initPlanGenerator(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable<T, I, K, O> table) {
+  private void initPlanGenerator(HoodieEngineContext context, 
HoodieWriteConfig config, HoodieTable<T, I, K, O> table, Option<Set<String>> 
specificPartitions) {
     if (WriteOperationType.COMPACT.equals(operationType)) {
-      planGenerator = new HoodieCompactionPlanGenerator(table, context, 
config);
+      planGenerator = new HoodieCompactionPlanGenerator(table, context, 
config, specificPartitions);
     } else {
-      planGenerator = new HoodieLogCompactionPlanGenerator(table, context, 
config);
+      planGenerator = new HoodieLogCompactionPlanGenerator(table, context, 
config, specificPartitions);
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 79059c2ca34..115f05b2381 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -50,6 +50,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -64,11 +65,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
   protected final HoodieTable<T, I, K, O> hoodieTable;
   protected final HoodieWriteConfig writeConfig;
   protected final transient HoodieEngineContext engineContext;
+  private final Option<Set<String>> specificPartitions;
 
-  public BaseHoodieCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+  public BaseHoodieCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig,
+                                           Option<Set<String>> 
specificPartitions) {
     this.hoodieTable = table;
     this.writeConfig = writeConfig;
     this.engineContext = engineContext;
+    this.specificPartitions = specificPartitions;
   }
 
   @Nullable
@@ -82,8 +86,14 @@ public abstract class BaseHoodieCompactionPlanGenerator<T 
extends HoodieRecordPa
     // TODO - rollback any compactions in flight
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
     CompletionTimeQueryView completionTimeQueryView = 
metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient);
-    List<String> partitionPaths = FSUtils.getAllPartitionPaths(
-        engineContext, metaClient.getStorage(), 
writeConfig.getMetadataConfig(), metaClient.getBasePath());
+    List<String> partitionPaths;
+    if (specificPartitions != null && specificPartitions.isPresent() && 
!specificPartitions.get().isEmpty()) {
+      LOG.info("Schedule compaction based on specific partitions " + 
specificPartitions.get());
+      partitionPaths = new ArrayList<>(specificPartitions.get());
+    } else {
+      LOG.info("Get all table partitions.");
+      partitionPaths = FSUtils.getAllPartitionPaths(engineContext, 
metaClient.getStorage(), writeConfig.getMetadataConfig(), 
metaClient.getBasePath());
+    }
 
     int allPartitionSize = partitionPaths.size();
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
index a93ece710b0..8fd983f939b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieCompactionPlanGenerator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
@@ -33,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Set;
 
 import static java.util.stream.Collectors.toList;
 
@@ -43,8 +45,9 @@ public class HoodieCompactionPlanGenerator<T extends 
HoodieRecordPayload, I, K,
 
   private final CompactionStrategy compactionStrategy;
 
-  public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
+  public HoodieCompactionPlanGenerator(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig,
+                                       Option<Set<String>> specificPartitions) 
{
+    super(table, engineContext, writeConfig, specificPartitions);
     this.compactionStrategy = writeConfig.getCompactionStrategy();
     LOG.info("Compaction Strategy used is: " + compactionStrategy.toString());
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index a81ee663fa9..27a347f670b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -46,8 +46,8 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogCompactionPlanGenerator.class);
 
-  public HoodieLogCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
+  public HoodieLogCompactionPlanGenerator(HoodieTable table, 
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig, 
Option<Set<String>> specificPartitions) {
+    super(table, engineContext, writeConfig, specificPartitions);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 4fd217ce4bd..9106eab85b7 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -75,6 +75,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Implementation of a very heavily read-optimized Hoodie Table where, all 
data is stored in base files, with
@@ -318,7 +319,8 @@ public class HoodieFlinkCopyOnWriteTable<T>
   }
 
   @Override
-  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
+  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata,
+                                                         Option<Set<String>> 
specificPartitions) {
     throw new HoodieNotSupportedException("Compaction is not supported on a 
CopyOnWrite table");
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
index 361ea7a6e6f..7ee793eb6c8 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java
@@ -47,6 +47,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Flink MERGE_ON_READ table.
@@ -100,12 +101,12 @@ public class HoodieFlinkMergeOnReadTable<T>
   }
 
   @Override
-  public Option<HoodieCompactionPlan> scheduleCompaction(
-      HoodieEngineContext context,
-      String instantTime,
-      Option<Map<String, String>> extraMetadata) {
+  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context,
+                                                         String instantTime,
+                                                         Option<Map<String, 
String>> extraMetadata,
+                                                         Option<Set<String>> 
specificPartitions) {
     ScheduleCompactionActionExecutor scheduleCompactionExecutor = new 
ScheduleCompactionActionExecutor(
-        context, config, this, instantTime, extraMetadata, 
WriteOperationType.COMPACT);
+        context, config, this, instantTime, extraMetadata, 
WriteOperationType.COMPACT, specificPartitions);
     return scheduleCompactionExecutor.execute();
   }
 
@@ -121,7 +122,7 @@ public class HoodieFlinkMergeOnReadTable<T>
   @Override
   public Option<HoodieCompactionPlan> 
scheduleLogCompaction(HoodieEngineContext context, String instantTime, 
Option<Map<String, String>> extraMetadata) {
     ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new 
ScheduleCompactionActionExecutor(
-        context, config, this, instantTime, extraMetadata, 
WriteOperationType.LOG_COMPACT);
+        context, config, this, instantTime, extraMetadata, 
WriteOperationType.LOG_COMPACT, Option.empty());
     return scheduleLogCompactionExecutor.execute();
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 16e194e2b66..bf02b3d9e7f 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -80,6 +80,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class HoodieJavaCopyOnWriteTable<T>
     extends HoodieJavaTable<T> implements HoodieCompactionHandler<T> {
@@ -185,7 +186,8 @@ public class HoodieJavaCopyOnWriteTable<T>
   @Override
   public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context,
                                                          String instantTime,
-                                                         Option<Map<String, 
String>> extraMetadata) {
+                                                         Option<Map<String, 
String>> extraMetadata,
+                                                         Option<Set<String>> 
specificPartitions) {
     throw new HoodieNotSupportedException("ScheduleCompaction is not supported 
on a CopyOnWrite table");
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 8c2edc4f3d3..065ee747a7b 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -42,6 +42,7 @@ import 
org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class HoodieJavaMergeOnReadTable<T> extends 
HoodieJavaCopyOnWriteTable<T> {
   protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, 
HoodieEngineContext context, HoodieTableMetaClient metaClient) {
@@ -72,9 +73,10 @@ public class HoodieJavaMergeOnReadTable<T> extends 
HoodieJavaCopyOnWriteTable<T>
   }
 
   @Override
-  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
+  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata,
+                                                         Option<Set<String>> 
specificPartitions) {
     ScheduleCompactionActionExecutor scheduleCompactionExecutor = new 
ScheduleCompactionActionExecutor(
-        context, config, this, instantTime, extraMetadata, 
WriteOperationType.COMPACT);
+        context, config, this, instantTime, extraMetadata, 
WriteOperationType.COMPACT, specificPartitions);
     return scheduleCompactionExecutor.execute();
   }
 
@@ -90,7 +92,7 @@ public class HoodieJavaMergeOnReadTable<T> extends 
HoodieJavaCopyOnWriteTable<T>
   @Override
   public Option<HoodieCompactionPlan> 
scheduleLogCompaction(HoodieEngineContext context, String instantTime, 
Option<Map<String, String>> extraMetadata) {
     ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new 
ScheduleCompactionActionExecutor(
-        context, config, this, instantTime, extraMetadata, 
WriteOperationType.LOG_COMPACT);
+        context, config, this, instantTime, extraMetadata, 
WriteOperationType.LOG_COMPACT, Option.empty());
     return scheduleLogCompactionExecutor.execute();
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 67a7658a309..de75583fcaa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -89,6 +89,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 
@@ -171,7 +172,8 @@ public class HoodieSparkCopyOnWriteTable<T>
   }
 
   @Override
-  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
+  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata,
+                                                         Option<Set<String>> 
specificPartitions) {
     throw new HoodieNotSupportedException("Compaction is not supported on a 
CopyOnWrite table");
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index f5929fdc667..761e4a78f48 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -66,6 +66,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.deleteMetadataTable;
 
@@ -140,9 +141,10 @@ public class HoodieSparkMergeOnReadTable<T> extends 
HoodieSparkCopyOnWriteTable<
   }
 
   @Override
-  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata) {
+  public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext 
context, String instantTime, Option<Map<String, String>> extraMetadata,
+                                                         Option<Set<String>> 
specificPartitions) {
     ScheduleCompactionActionExecutor scheduleCompactionExecutor = new 
ScheduleCompactionActionExecutor(
-        context, config, this, instantTime, extraMetadata, 
WriteOperationType.COMPACT);
+        context, config, this, instantTime, extraMetadata, 
WriteOperationType.COMPACT, specificPartitions);
     return scheduleCompactionExecutor.execute();
   }
 
@@ -163,7 +165,7 @@ public class HoodieSparkMergeOnReadTable<T> extends 
HoodieSparkCopyOnWriteTable<
   @Override
   public Option<HoodieCompactionPlan> 
scheduleLogCompaction(HoodieEngineContext context, String instantTime, 
Option<Map<String, String>> extraMetadata) {
     ScheduleCompactionActionExecutor scheduleLogCompactionExecutor = new 
ScheduleCompactionActionExecutor(
-        context, config, this, instantTime, extraMetadata, 
WriteOperationType.LOG_COMPACT);
+        context, config, this, instantTime, extraMetadata, 
WriteOperationType.LOG_COMPACT, Option.empty());
     return scheduleLogCompactionExecutor.execute();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 36af35435fe..7ef794e6395 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -699,6 +699,12 @@ public class FlinkOptions extends HoodieConfig {
       .noDefaultValue()
       .withDescription("Parallelism of tasks that do actual compaction, 
default same as the write task parallelism");
 
+  public static final ConfigOption<Boolean> 
COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS = ConfigOptions
+      .key("compaction.schedule.incremental.partitions")
+      .booleanType()
+      .defaultValue(false)
+      .withDescription("Schedule the compaction plan based on all the 
partitions related to commit meta since last compete compaction.");
+
   public static final String NUM_COMMITS = "num_commits";
   public static final String TIME_ELAPSED = "time_elapsed";
   public static final String NUM_AND_TIME = "num_and_time";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index fbb2a5ff391..e1fd9fad19f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -21,13 +21,20 @@ package org.apache.hudi.sink;
 import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -39,6 +46,7 @@ import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.meta.CkpMetadata;
 import org.apache.hudi.sink.meta.CkpMetadataFactory;
+import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -49,6 +57,7 @@ import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -58,8 +67,13 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -70,8 +84,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
@@ -165,6 +183,8 @@ public class StreamWriteOperatorCoordinator
    * The client id heartbeats.
    */
   private ClientIds clientIds;
+  private Map<String, String> cacheWrittenPartitions;
+  private String lastInstant = "";
 
   /**
    * Constructs a StreamingSinkOperatorCoordinator.
@@ -231,6 +251,9 @@ public class StreamWriteOperatorCoordinator
     if (this.clientIds != null) {
       this.clientIds.close();
     }
+    if (cacheWrittenPartitions != null) {
+      cacheWrittenPartitions.clear();
+    }
   }
 
   @Override
@@ -238,7 +261,7 @@ public class StreamWriteOperatorCoordinator
     executor.execute(
         () -> {
           try {
-            result.complete(new byte[0]);
+            result.complete(doCheckpointCoordinatorStatus(this.lastInstant));
           } catch (Throwable throwable) {
             // when a checkpoint fails, throws directly.
             result.completeExceptionally(
@@ -260,9 +283,10 @@ public class StreamWriteOperatorCoordinator
           // for streaming mode, commits the ever received events anyway,
           // the stream write task snapshot and flush the data buffer 
synchronously in sequence,
           // so a successful checkpoint subsumes the old one(follows the 
checkpoint subsuming contract)
-          final boolean committed = commitInstant(this.instant, checkpointId);
+          final Pair<Boolean, List<WriteStatus>> committedPair = 
commitInstant(this.instant, checkpointId);
           // schedules the compaction or clustering if it is enabled in stream 
execution mode
-          scheduleTableServices(committed);
+          scheduleTableServices(committedPair);
+          boolean committed = committedPair.getLeft();
 
           if (committed) {
             // start new instant.
@@ -275,8 +299,11 @@ public class StreamWriteOperatorCoordinator
   }
 
   @Override
-  public void resetToCheckpoint(long checkpointID, byte[] checkpointData) {
-    // no operation
+  public void resetToCheckpoint(long checkpointID, byte[] checkpointData) 
throws Exception {
+    if (checkpointData != null && checkpointData.length > 0) {
+      this.lastInstant = resetCheckpointCoordinatorStatus(checkpointData);
+      LOG.info("Reset to checkpoint last instant is " + lastInstant);
+    }
   }
 
   @Override
@@ -321,6 +348,81 @@ public class StreamWriteOperatorCoordinator
   //  Utilities
   // -------------------------------------------------------------------------
 
+  private void scheduleCompactionInternal(Pair<Boolean, List<WriteStatus>> 
committedPair) {
+    boolean committed = committedPair.getLeft();
+    if (committed && tableState.scheduleIncrementalPartitions) {
+      updateCachePartitions(committedPair.getRight());
+    }
+    boolean res = CompactionUtil.scheduleCompaction(writeClient, 
tableState.isDeltaTimeCompaction,
+        committed, Option.of(cacheWrittenPartitions.keySet()));
+    if (res && tableState.scheduleIncrementalPartitions) {
+      this.cacheWrittenPartitions.clear();
+      this.lastInstant = this.instant;
+      LOG.info("Compaction scheduled , clear cacheWrittenPartitions. And reset 
last instant to " + this.lastInstant);
+    }
+  }
+
+  private void updateCachePartitions(List<WriteStatus> writeResults) {
+    initCachePartitions();
+    writeResults.forEach(status -> {
+      
this.cacheWrittenPartitions.putIfAbsent(status.getStat().getPartitionPath(), 
"");
+    });
+  }
+
+  // take lastInstant as state only
+  public byte[] doCheckpointCoordinatorStatus(String lastInstant) throws 
IOException {
+    if (tableState.scheduleIncrementalPartitions) {
+      ValidationUtils.checkArgument(lastInstant != null);
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream out = new ObjectOutputStream(baos);
+      out.writeInt(lastInstant.length());
+      out.write(lastInstant.getBytes());
+      out.flush();
+      LOG.info("Finish to do checkpoint last instant : " + lastInstant);
+      return baos.toByteArray();
+    } else {
+      return new byte[0];
+    }
+  }
+
+  public String resetCheckpointCoordinatorStatus(byte[] checkpointData) throws 
Exception {
+    ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
+    ObjectInputStream ois = new ObjectInputStream(bais);
+    int instantSite = ois.readInt();
+    byte[] instantBytes = new byte[instantSite];
+    ois.readFully(instantBytes);
+    return new String(instantBytes);
+  }
+
+  private void initCachePartitions() {
+    if (cacheWrittenPartitions != null) {
+      return;
+    }
+    long start = System.currentTimeMillis();
+    this.cacheWrittenPartitions = new ConcurrentHashMap<>();
+    HoodieActiveTimeline activeTimeline = 
metaClient.getActiveTimeline().reload();
+    // get active timeline last commit instant or pending compaction instant 
as millstone instant
+    // only works for mor table
+    HoodieTimeline resTimeline;
+    if (StringUtils.nonEmpty(this.lastInstant)) {
+      String millstoneInstant = this.lastInstant;
+      resTimeline = 
activeTimeline.getDeltaCommitTimeline().filterCompletedInstants().findInstantsAfter(millstoneInstant);
+    } else {
+      Option<HoodieInstant> activeLastInstant = 
activeTimeline.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
COMPACTION_ACTION)).lastInstant();
+      String millstoneInstant = activeLastInstant.isPresent() ? 
activeLastInstant.get().requestedTime() : INIT_INSTANT_TS;
+      resTimeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants().findInstantsAfter(millstoneInstant);
+    }
+    resTimeline.getInstantsAsStream().forEach(instant -> {
+      LOG.info("Reading " + instant);
+      HoodieCommitMetadata metadata = 
WriteProfiles.getCommitMetadata(conf.getString(FlinkOptions.TABLE_NAME), new 
Path(conf.getString(FlinkOptions.PATH)), instant, activeTimeline);
+      for (HoodieWriteStat writeStat : metadata.getWriteStats()) {
+        cacheWrittenPartitions.putIfAbsent(writeStat.getPartitionPath(), "");
+      }
+    });
+    long end = System.currentTimeMillis();
+    LOG.info("Finish to init cache partitions, using " + (end - start) + " 
mills. " + cacheWrittenPartitions);
+  }
+
   private void initHiveSync() {
     this.hiveSyncExecutor = 
NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
     this.hiveSyncContext = HiveSyncContext.create(conf, this.storageConf);
@@ -420,7 +522,10 @@ public class StreamWriteOperatorCoordinator
       if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy()) {
         writeClient.getHeartbeatClient().start(instant);
       }
-      commitInstant(instant);
+      Pair<Boolean, List<WriteStatus>> pair = commitInstant(instant);
+      if (pair.getLeft() && tableState.scheduleIncrementalPartitions) {
+        updateCachePartitions(pair.getRight());
+      }
     }
     // stop the heartbeat for old instant
     if (writeClient.getConfig().getFailedWritesCleanPolicy().isLazy() && 
!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
@@ -460,7 +565,8 @@ public class StreamWriteOperatorCoordinator
     addEventToBuffer(event);
     if (allEventsReceived()) {
       // start to commit the instant.
-      boolean committed = commitInstant(this.instant);
+      Pair<Boolean, List<WriteStatus>> commitPair = 
commitInstant(this.instant);
+      boolean committed = commitPair.getLeft();
       if (committed) {
         // The executor thread inherits the classloader of the 
#handleEventFromOperator
         // caller, which is a AppClassLoader.
@@ -468,19 +574,19 @@ public class StreamWriteOperatorCoordinator
         // sync Hive synchronously if it is enabled in batch mode.
         syncHive();
         // schedules the compaction or clustering if it is enabled in batch 
execution mode
-        scheduleTableServices(true);
+        scheduleTableServices(commitPair);
       }
     }
   }
 
-  private void scheduleTableServices(Boolean committed) {
+  private void scheduleTableServices(Pair<Boolean, List<WriteStatus>> 
committedPair) {
     // if compaction is on, schedule the compaction
     if (tableState.scheduleCompaction) {
-      CompactionUtil.scheduleCompaction(writeClient, 
tableState.isDeltaTimeCompaction, committed);
+      scheduleCompactionInternal(committedPair);
     }
     // if clustering is on, schedule the clustering
     if (tableState.scheduleClustering) {
-      ClusteringUtil.scheduleClustering(conf, writeClient, committed);
+      ClusteringUtil.scheduleClustering(conf, writeClient, 
committedPair.getLeft());
     }
   }
 
@@ -524,7 +630,7 @@ public class StreamWriteOperatorCoordinator
   /**
    * Commits the instant.
    */
-  private boolean commitInstant(String instant) {
+  private Pair<Boolean, List<WriteStatus>> commitInstant(String instant) {
     return commitInstant(instant, -1);
   }
 
@@ -533,10 +639,10 @@ public class StreamWriteOperatorCoordinator
    *
    * @return true if the write statuses are committed successfully.
    */
-  private boolean commitInstant(String instant, long checkpointId) {
+  private Pair<Boolean, List<WriteStatus>> commitInstant(String instant, long 
checkpointId) {
     if (Arrays.stream(eventBuffer).allMatch(Objects::isNull)) {
       // The last checkpoint finished successfully.
-      return false;
+      return Pair.of(false, new ArrayList<>());
     }
 
     List<WriteStatus> writeResults = Arrays.stream(eventBuffer)
@@ -554,10 +660,10 @@ public class StreamWriteOperatorCoordinator
       if (checkpointId != -1) {
         sendCommitAckEvents(checkpointId);
       }
-      return false;
+      return Pair.of(false, writeResults);
     }
     doCommit(instant, writeResults);
-    return true;
+    return Pair.of(true, writeResults);
   }
 
   /**
@@ -679,6 +785,7 @@ public class StreamWriteOperatorCoordinator
     final boolean syncHive;
     final boolean syncMetadata;
     final boolean isDeltaTimeCompaction;
+    final boolean scheduleIncrementalPartitions;
 
     private TableState(Configuration conf) {
       this.operationType = 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
@@ -690,6 +797,7 @@ public class StreamWriteOperatorCoordinator
       this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
       this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
       this.isDeltaTimeCompaction = OptionsResolver.isDeltaTimeCompaction(conf);
+      this.scheduleIncrementalPartitions  = 
conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS);
     }
 
     public static TableState create(Configuration conf) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index 657bbdbea60..89c1a790c86 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Locale;
+import java.util.Set;
 
 /**
  * Utilities for flink hudi compaction.
@@ -52,17 +53,22 @@ public class CompactionUtil {
    * @param deltaTimeCompaction Whether the compaction is trigger by elapsed 
delta time
    * @param committed           Whether the last instant was committed 
successfully
    */
-  public static void scheduleCompaction(
+  public static boolean scheduleCompaction(
       HoodieFlinkWriteClient<?> writeClient,
       boolean deltaTimeCompaction,
-      boolean committed) {
+      boolean committed,
+      Option<Set<String>> specificPartitions) {
+    if (specificPartitions.isPresent() && !specificPartitions.get().isEmpty()) 
{
+      
writeClient.getTableServiceClient().setSpecificPartitions(specificPartitions);
+    }
     if (committed) {
-      writeClient.scheduleCompaction(Option.empty());
+      return writeClient.scheduleCompaction(Option.empty()).isPresent();
     } else if (deltaTimeCompaction) {
       // if there are no new commits and the compaction trigger strategy is 
based on elapsed delta time,
       // schedules the compaction anyway.
-      writeClient.scheduleCompaction(Option.empty());
+      return writeClient.scheduleCompaction(Option.empty()).isPresent();
     }
+    return false;
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index a5d3b3ece4d..96ba7f7ad1f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -171,6 +171,20 @@ public class ITTestDataStreamWrite extends TestLogger {
     testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED);
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"BUCKET", "FLINK_STATE"})
+  public void testWriteMergeOnReadWithCompactionIncremental(String indexType) 
throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+    conf.setString(FlinkOptions.INDEX_TYPE, indexType);
+    conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS, 
true);
+    conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+    conf.setString(FlinkOptions.RECORD_KEY_FIELD, "uuid");
+    conf.setString(FlinkOptions.INDEX_KEY_FIELD, "uuid");
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
HoodieTableType.MERGE_ON_READ.name());
+    testWriteToHoodie(conf, "mor_write_with_compact", 1, EXPECTED);
+  }
+
   @Test
   public void testWriteCopyOnWriteWithClustering() throws Exception {
     testWriteCopyOnWriteWithClustering(false);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable.java
new file mode 100644
index 00000000000..241929042aa
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable.java
@@ -0,0 +1,63 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hudi.sink;
+ import org.apache.hudi.configuration.FlinkOptions;
+ import org.apache.hudi.sink.event.WriteMetadataEvent;
+ import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
+ import org.apache.hudi.utils.TestConfigurations;
+ import org.apache.flink.configuration.Configuration;
+ import org.apache.flink.runtime.jobgraph.OperatorID;
+ import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+ import org.junit.jupiter.api.AfterEach;
+ import org.junit.jupiter.api.BeforeEach;
+ import org.junit.jupiter.api.io.TempDir;
+ import org.junit.jupiter.params.ParameterizedTest;
+ import org.junit.jupiter.params.provider.ValueSource;
+ import java.io.File;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ /**
+  * Test cases for StreamingSinkOperatorCoordinator.
+  */
+ public class 
TestStreamWriteOperatorCoordinatorWithIncrementalCompactionEnable {
+   private StreamWriteOperatorCoordinator coordinator;
+   @TempDir
+   File tempFile;
+   @BeforeEach
+   public void before() throws Exception {
+     OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), 2);
+     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+     conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS, 
true);
+     coordinator = new StreamWriteOperatorCoordinator(conf, context);
+     coordinator.start();
+     coordinator.setExecutor(new MockCoordinatorExecutor(context));
+     coordinator.handleEventFromOperator(0, 
WriteMetadataEvent.emptyBootstrap(0));
+     coordinator.handleEventFromOperator(1, 
WriteMetadataEvent.emptyBootstrap(1));
+   }
+   @AfterEach
+   public void after() throws Exception {
+     coordinator.close();
+   }
+   @ParameterizedTest
+   @ValueSource(strings = {"", "20241028145729909"})
+   void testStreamOperatorCheckpoint(String lastInstant) throws Exception {
+     byte[] cpStatus = coordinator.doCheckpointCoordinatorStatus(lastInstant);
+     String res = coordinator.resetCheckpointCoordinatorStatus(cpStatus);
+     assertEquals(res, lastInstant);
+   }
+ }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable.java
new file mode 100644
index 00000000000..a1e42fb8a04
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable.java
@@ -0,0 +1,29 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hudi.sink;
+ import org.apache.hudi.configuration.FlinkOptions;
+ import org.apache.flink.configuration.Configuration;
+ /**
+  * Test cases for delta stream write.
+  */
+ public class TestWriteMergeOnReadWithIncrementalScheduleCompactionEnable 
extends TestWriteMergeOnRead {
+   @Override
+   protected void setUp(Configuration conf) {
+     conf.setBoolean(FlinkOptions.COMPACTION_SCHEDULE_INCREMENTAL_PARTITIONS, 
true);
+   }
+ }
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
index 111680bdcdb..e5bddf725fc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java
@@ -142,7 +142,7 @@ public class TestCompactionUtil {
     TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf);
 
     try (HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(conf)) {
-      CompactionUtil.scheduleCompaction(writeClient, true, true);
+      CompactionUtil.scheduleCompaction(writeClient, true, true, 
Option.empty());
 
       Option<HoodieInstant> pendingCompactionInstant = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant();
       assertTrue(pendingCompactionInstant.isPresent(), "A compaction plan 
expects to be scheduled");
@@ -152,7 +152,7 @@ public class TestCompactionUtil {
       TimeUnit.SECONDS.sleep(3); // in case the instant time interval is too 
close
       writeClient.startCommit();
 
-      CompactionUtil.scheduleCompaction(writeClient, true, false);
+      CompactionUtil.scheduleCompaction(writeClient, true, false, 
Option.empty());
       int numCompactionCommits = 
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants();
       assertThat("Two compaction plan expects to be scheduled", 
numCompactionCommits, is(2));
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c32d0c6739a..21634fbe3f3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -135,7 +135,7 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
       HoodieTable table = HoodieSparkTable.create(getConfig(), context, 
metaClient);
       String compactionInstantTime = writeClient.createNewInstantTime();
       assertThrows(HoodieNotSupportedException.class, () -> {
-        table.scheduleCompaction(context, compactionInstantTime, 
Option.empty());
+        table.scheduleCompaction(context, compactionInstantTime, 
Option.empty(), Option.empty());
         table.compact(context, compactionInstantTime);
       });
 
@@ -158,7 +158,7 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
       writeClient.insert(recordsRDD, newCommitTime).collect();
 
       String compactionInstantTime = writeClient.createNewInstantTime();
-      Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, 
compactionInstantTime, Option.empty());
+      Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, 
compactionInstantTime, Option.empty(), Option.empty());
       assertFalse(plan.isPresent(), "If there is nothing to compact, result 
will be empty");
 
       // Verify compaction.requested, compaction.completed metrics counts.

Reply via email to